1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
import ddt
import pytest
from glusto.core import Glusto as g
from openshiftstoragelibs import baseclass
from openshiftstoragelibs import heketi_ops
from openshiftstoragelibs import node_ops
from openshiftstoragelibs import openshift_storage_libs
from openshiftstoragelibs import podcmd
@ddt.ddt
class TestDevPathMapping(baseclass.BaseClass):
'''Class that contain dev path mapping test cases for
gluster file & block volumes
'''
def setUp(self):
super(TestDevPathMapping, self).setUp()
self.node = self.ocp_master_node[0]
self.h_node, self.h_server = (
self.heketi_client_node, self.heketi_server_url)
h_nodes_list = heketi_ops.heketi_node_list(self.h_node, self.h_server)
h_node_count = len(h_nodes_list)
if h_node_count < 3:
self.skipTest(
"At least 3 nodes are required, found {}".format(
h_node_count))
# Disable 4th and other nodes
for node_id in h_nodes_list[3:]:
heketi_ops.heketi_node_disable(
self.h_node, self.h_server, node_id)
self.addCleanup(
heketi_ops.heketi_node_enable,
self.h_node, self.h_server, node_id)
h_info = heketi_ops.heketi_node_info(
self.h_node, self.h_server, h_nodes_list[0], json=True)
self.assertTrue(
h_info, "Failed to get the heketi node info for node id"
" {}".format(h_nodes_list[0]))
self.node_ip = h_info['hostnames']['storage'][0]
self.node_hostname = h_info["hostnames"]["manage"][0]
self.vm_name = node_ops.find_vm_name_by_ip_or_hostname(
self.node_hostname)
self.devices_list = [device['name'] for device in h_info["devices"]]
# Get list of additional devices for one of the Gluster nodes
for gluster_server in list(g.config["gluster_servers"].values()):
if gluster_server['storage'] == self.node_ip:
additional_device = gluster_server.get("additional_devices")
if additional_device:
self.devices_list.extend(additional_device)
# sort the devices list
self.devices_list.sort()
@pytest.mark.tier2
@podcmd.GlustoPod()
def test_dev_path_file_volume_create(self):
"""Validate dev path mapping for file volumes"""
pvc_size, pvc_amount = 2, 5
pvs_info_before = openshift_storage_libs.get_pvs_info(
self.node, self.node_ip, self.devices_list, raise_on_error=False)
self.detach_and_attach_vmdk(
self.vm_name, self.node_hostname, self.devices_list)
pvs_info_after = openshift_storage_libs.get_pvs_info(
self.node, self.node_ip, self.devices_list, raise_on_error=False)
# Compare pvs info before and after
for (path, uuid, vg_name), (_path, _uuid, _vg_name) in zip(
pvs_info_before[:-1], pvs_info_after[1:]):
self.assertEqual(
uuid, _uuid, "pv_uuid check failed. Expected:{},"
"Actual: {}".format(uuid, _uuid))
self.assertEqual(
vg_name, _vg_name, "vg_name check failed. Expected:"
"{}, Actual:{}".format(vg_name, _vg_name))
# Create file volumes
pvcs = self.create_and_wait_for_pvcs(
pvc_size=pvc_size, pvc_amount=pvc_amount)
self.create_dcs_with_pvc(pvcs)
self.validate_file_volumes_count(
self.h_node, self.h_server, self.node_ip)
|