1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
from glusto.core import Glusto as g
import pytest
from openshiftstoragelibs.baseclass import BaseClass
from openshiftstoragelibs.heketi_ops import (
heketi_volume_create,
heketi_volume_delete,
)
from openshiftstoragelibs import openshift_ops
class TestHeketiVolume(BaseClass):
"""Check volume bricks presence in fstab files on Gluster PODs."""
def _find_bricks(self, brick_paths, present):
"""Make sure that vol brick paths either exist or not in fstab file."""
oc_node = self.ocp_master_node[0]
cmd = (
'bash -c "'
'if [ -d "%s" ]; then echo present; else echo absent; fi"')
g_hosts = list(g.config.get("gluster_servers", {}).keys())
results = []
assertion_method = self.assertIn if present else self.assertNotIn
for brick_path in brick_paths:
for g_host in g_hosts:
out = openshift_ops.cmd_run_on_gluster_pod_or_node(
oc_node, cmd % brick_path, gluster_node=g_host)
results.append(out)
assertion_method('present', results)
@pytest.mark.tier0
def test_validate_brick_paths_on_gluster_pods_or_nodes(self):
"""Validate brick paths after creation and deletion of a volume."""
# Create heketi volume
vol = heketi_volume_create(
self.heketi_client_node, self.heketi_server_url, size=1, json=True)
self.assertTrue(vol, "Failed to create 1Gb heketi volume")
vol_id = vol["bricks"][0]["volume"]
self.addCleanup(
heketi_volume_delete,
self.heketi_client_node, self.heketi_server_url, vol_id,
raise_on_error=False)
# Gather brick paths
brick_paths = [p['path'] for p in vol["bricks"]]
# Make sure that volume's brick paths exist in the fstab files
self._find_bricks(brick_paths, present=True)
# Delete heketi volume
out = heketi_volume_delete(
self.heketi_client_node, self.heketi_server_url, vol_id)
self.assertTrue(out, "Failed to delete heketi volume %s" % vol_id)
# Make sure that volume's brick paths are absent in the fstab file
self._find_bricks(brick_paths, present=False)
|