summaryrefslogtreecommitdiffstats
path: root/tests/functional/heketi/test_heketi_node_operations.py
blob: 72267c35d94861e5a80ef325c9df76b744ba20be (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
import ddt
from glusto.core import Glusto as g
from glustolibs.gluster import peer_ops
import pytest
import six

from openshiftstoragelibs import baseclass
from openshiftstoragelibs import exceptions
from openshiftstoragelibs import gluster_ops
from openshiftstoragelibs import heketi_ops
from openshiftstoragelibs import openshift_ops
from openshiftstoragelibs import openshift_storage_version
from openshiftstoragelibs import podcmd
from openshiftstoragelibs import utils


@ddt.ddt
class TestHeketiNodeOperations(baseclass.BaseClass):
    """Class to test heketi node operations
    """

    def setUp(self):
        super(TestHeketiNodeOperations, self).setUp()
        self.node = self.ocp_master_node[0]
        self.h_node = self.heketi_client_node
        self.h_url = self.heketi_server_url

    @pytest.mark.tier1
    @podcmd.GlustoPod()
    def test_heketi_node_list(self):
        """Test node list operation
        """
        h_client, h_server = self.heketi_client_node, self.heketi_server_url

        # List heketi nodes
        node_ips = []
        heketi_node_id_list = heketi_ops.heketi_node_list(h_client, h_server)

        for node_id in heketi_node_id_list:
            node_info = heketi_ops.heketi_node_info(
                h_client, h_server, node_id, json=True)
            node_ips.append(node_info["hostnames"]["storage"])

        # Compare the node listed in previous step
        hostnames = []
        list_of_pools = peer_ops.get_pool_list('auto_get_gluster_endpoint')
        self.assertTrue(
            list_of_pools,
            "Failed to get the pool list from gluster pods/nodes")
        for pool in list_of_pools:
            hostnames.append(pool["hostname"])
        self.assertEqual(
            len(heketi_node_id_list), len(list_of_pools),
            "Heketi volume list %s is not equal to gluster volume list %s"
            % (node_ips, hostnames))

    @pytest.mark.tier2
    def test_heketi_node_info(self):
        """Test heketi node info operation
        """
        h_client, h_server = self.heketi_client_node, self.heketi_server_url

        # List heketi node
        heketi_node_id_list = heketi_ops.heketi_node_list(h_client, h_server)
        self.assertTrue(heketi_node_id_list, "Node Id list is empty.")

        for node_id in heketi_node_id_list:
            node_info = heketi_ops.heketi_node_info(
                h_client, h_server, node_id, json=True)
            self.assertTrue(node_info, "Failed to retrieve the node info")
            self.assertEqual(
                node_info["id"], node_id,
                "Failed to match node ID. Exp: %s, Act: %s" % (
                    node_id, node_info["id"]))

    @pytest.mark.tier1
    def test_heketi_node_states_enable_disable(self):
        """Test node enable and disable functionality
        """
        h_client, h_server = self.heketi_client_node, self.heketi_server_url

        node_list = heketi_ops.heketi_node_list(h_client, h_server)
        online_hosts = []
        for node_id in node_list:
            node_info = heketi_ops.heketi_node_info(
                h_client, h_server, node_id, json=True)
            if node_info["state"] == "online":
                online_hosts.append(node_info)

        if len(online_hosts) < 3:
            raise self.skipTest(
                "This test can run only if online hosts are more than 2")

        #  Disable n-3 nodes, in case we have n nodes
        for node_info in online_hosts[3:]:
            node_id = node_info["id"]
            heketi_ops.heketi_node_disable(h_client, h_server, node_id)
            self.addCleanup(
                heketi_ops.heketi_node_enable, h_client, h_server, node_id)

        # Create volume when 3 nodes are online
        vol_size = 1
        vol_info = heketi_ops.heketi_volume_create(
            h_client, h_server, vol_size, json=True)
        self.addCleanup(
            heketi_ops.heketi_volume_delete,
            h_client, h_server, vol_info['id'])

        node_id = online_hosts[0]['id']
        try:
            heketi_ops.heketi_node_disable(h_client, h_server, node_id)

            # Try to create a volume, volume creation should fail
            with self.assertRaises(AssertionError):
                heketi_volume = heketi_ops.heketi_volume_create(
                    h_client, h_server, vol_size)
                self.addCleanup(
                    heketi_ops.heketi_volume_delete,
                    h_client, h_server, heketi_volume["id"])
        finally:
            # Enable heketi node
            heketi_ops.heketi_node_enable(h_client, h_server, node_id)

        # Create volume when heketi node is enabled
        vol_info = heketi_ops.heketi_volume_create(
            h_client, h_server, vol_size, json=True)
        heketi_ops.heketi_volume_delete(h_client, h_server, vol_info['id'])

    def add_heketi_node_to_cluster(self, cluster_id):
        """Add new node to a cluster"""
        storage_host_info = g.config.get("additional_gluster_servers")
        if not storage_host_info:
            self.skipTest(
                "Skip test case as 'additional_gluster_servers' option is "
                "not provided in config file")

        storage_host_info = list(storage_host_info.values())[0]
        try:
            storage_hostname = storage_host_info["manage"]
            storage_ip = storage_host_info["storage"]
        except KeyError:
            msg = ("Config options 'additional_gluster_servers.manage' "
                   "and 'additional_gluster_servers.storage' must be set.")
            g.log.error(msg)
            raise exceptions.ConfigError(msg)

        h_client, h_server = self.heketi_client_node, self.heketi_server_url
        storage_zone = 1

        self.configure_node_to_run_gluster(storage_hostname)

        heketi_node_info = heketi_ops.heketi_node_add(
            h_client, h_server, storage_zone, cluster_id,
            storage_hostname, storage_ip, json=True)
        heketi_node_id = heketi_node_info["id"]
        self.addCleanup(
            heketi_ops.heketi_node_delete, h_client, h_server, heketi_node_id)
        self.addCleanup(
            heketi_ops.heketi_node_remove, h_client, h_server, heketi_node_id)
        self.addCleanup(
            heketi_ops.heketi_node_disable, h_client, h_server, heketi_node_id)
        self.assertEqual(
            heketi_node_info["cluster"], cluster_id,
            "Node got added in unexpected cluster exp: %s, act: %s" % (
                cluster_id, heketi_node_info["cluster"]))

        return storage_hostname, storage_ip

    @podcmd.GlustoPod()
    def heketi_node_add_with_valid_cluster(self):
        h_client, h_server = self.heketi_client_node, self.heketi_server_url
        ocp_node = self.ocp_master_node[0]

        # Get heketi endpoints before adding node
        h_volume_ids = heketi_ops.heketi_volume_list(
            h_client, h_server, json=True)
        h_endpoints_before_new_node = heketi_ops.heketi_volume_endpoint_patch(
            h_client, h_server, h_volume_ids["volumes"][0])

        cluster_info = heketi_ops.heketi_cluster_list(
            h_client, h_server, json=True)
        storage_hostname, storage_ip = self.add_heketi_node_to_cluster(
            cluster_info["clusters"][0])

        # Get heketi nodes and validate for newly added node
        h_node_ids = heketi_ops.heketi_node_list(h_client, h_server, json=True)
        for h_node_id in h_node_ids:
            node_hostname = heketi_ops.heketi_node_info(
                h_client, h_server, h_node_id, json=True)
            if node_hostname["hostnames"]["manage"][0] == storage_hostname:
                break
            node_hostname = None

        err_msg = ("Newly added heketi node %s not found in heketi node "
                   "list %s" % (storage_hostname, h_node_ids))
        self.assertTrue(node_hostname, err_msg)

        # Check gluster peer status for newly added node
        if self.is_containerized_gluster():
            gluster_pods = openshift_ops.get_ocp_gluster_pod_details(ocp_node)
            gluster_pod = [
                gluster_pod["pod_name"]
                for gluster_pod in gluster_pods
                if gluster_pod["pod_hostname"] == storage_hostname][0]

            gluster_peer_status = peer_ops.get_peer_status(
                podcmd.Pod(ocp_node, gluster_pod))
        else:
            gluster_peer_status = peer_ops.get_peer_status(
                storage_hostname)
        self.assertEqual(
            len(gluster_peer_status), len(self.gluster_servers))

        err_msg = "Expected peer status is 1 and actual is %s"
        for peer in gluster_peer_status:
            peer_status = int(peer["connected"])
            self.assertEqual(peer_status, 1, err_msg % peer_status)

        # Get heketi endpoints after adding node
        h_endpoints_after_new_node = heketi_ops.heketi_volume_endpoint_patch(
            h_client, h_server, h_volume_ids["volumes"][0])

        # Get openshift openshift endpoints and patch with heketi endpoints
        heketi_db_endpoint = openshift_ops.oc_get_custom_resource(
            ocp_node, "dc", name=self.heketi_dc_name,
            custom=".:spec.template.spec.volumes[*].glusterfs.endpoints")[0]
        openshift_ops.oc_patch(
            ocp_node, "ep", heketi_db_endpoint, h_endpoints_after_new_node)
        self.addCleanup(
            openshift_ops.oc_patch, ocp_node, "ep", heketi_db_endpoint,
            h_endpoints_before_new_node)
        ep_addresses = openshift_ops.oc_get_custom_resource(
            ocp_node, "ep", name=heketi_db_endpoint,
            custom=".:subsets[*].addresses[*].ip")[0].split(",")

        err_msg = "Hostname %s not present in endpoints %s" % (
            storage_ip, ep_addresses)
        self.assertIn(storage_ip, ep_addresses, err_msg)

    @pytest.mark.tier1
    def test_heketi_node_add_with_valid_cluster(self):
        """Test heketi node add operation with valid cluster id"""
        if not self.is_containerized_gluster():
            self.skipTest(
                "Skipping this test case as CRS version check "
                "is not implemented")

        if (openshift_storage_version.get_openshift_storage_version()
                < "3.11.4"):
            self.skipTest(
                "This test case is not supported for < OCS 3.11.4 builds due "
                "to bug BZ-1732831")

        # Add node to valid cluster id
        self.heketi_node_add_with_valid_cluster()

    @pytest.mark.tier2
    def test_validate_heketi_node_add_with_db_check(self):
        """Test heketi db check after node add operation"""
        if not self.is_containerized_gluster():
            self.skipTest(
                "Skipping this test case as CRS version check "
                "is not implemented")

        if (openshift_storage_version.get_openshift_storage_version()
                < "3.11.4"):
            self.skipTest(
                "This test case is not supported for < OCS 3.11.4 builds due "
                "to bug BZ-1732831")

        # Get the total number of nodes in heketi db
        intial_db_info = heketi_ops.heketi_db_check(
            self.heketi_client_node, self.heketi_server_url, json=True)
        initial_node_count = intial_db_info['nodes']['total']

        # Add node to valid cluster id
        self.heketi_node_add_with_valid_cluster()

        # Verify the addition of node in heketi db
        final_db_info = heketi_ops.heketi_db_check(
            self.heketi_client_node, self.heketi_server_url, json=True)
        final_node_count = final_db_info['nodes']['total']
        msg = (
            "Initial node count {} and final node count {} in heketi db is"
            " not as expected".format(initial_node_count, final_node_count))
        self.assertEqual(initial_node_count + 1, final_node_count, msg)

    @pytest.mark.tier2
    def test_heketi_node_add_with_invalid_cluster(self):
        """Test heketi node add operation with invalid cluster id"""
        storage_hostname, cluster_id = None, utils.get_random_str(size=33)
        try:
            storage_hostname, _ = self.add_heketi_node_to_cluster(cluster_id)
        except AssertionError as e:
            err_msg = ("validation failed: cluster: %s is not a valid UUID" %
                       cluster_id)
            if err_msg not in six.text_type(e):
                raise

        err_msg = ("Unexpectedly node %s got added to cluster %s" % (
            storage_hostname, cluster_id))
        self.assertFalse(storage_hostname, err_msg)

    def get_node_to_be_added(self):
        try:
            # Initializes additional gluster nodes variables
            self.additional_gluster_servers = list(
                g.config['additional_gluster_servers'].keys())
            self.additional_gluster_servers_info = (
                g.config['additional_gluster_servers'])
            return list(self.additional_gluster_servers_info.values())[0]
        except (KeyError, AttributeError, IndexError):
            self.skipTest("Required 'additional_gluster_servers' option is "
                          "not set in the config file.")

    def get_vol_ids_by_pvc_names(self, pvc_names):
        vol_ids = []
        custom = (r':.metadata.annotations."gluster\.kubernetes\.io\/'
                  'heketi-volume-id"')
        for pvc in pvc_names:
            pv = openshift_ops.get_pv_name_from_pvc(self.node, pvc)
            vol_id = openshift_ops.oc_get_custom_resource(
                self.node, 'pv', custom, pv)
            vol_ids.append(vol_id[0])
        return vol_ids

    def get_vol_names_from_vol_ids(self, vol_ids):
        vol_names = []
        for vol_id in vol_ids:
            vol_info = heketi_ops.heketi_volume_info(
                self.h_node, self.h_url, vol_id, json=True)
            vol_names.append(vol_info['name'])
        return vol_names

    def verify_gluster_peer_status(
            self, gluster_node, new_node_manage, new_node_storage,
            state='present'):

        # Verify gluster peer status
        peer_status = openshift_ops.cmd_run_on_gluster_pod_or_node(
            self.node, 'gluster peer status', gluster_node)
        found = (new_node_manage in peer_status
                 or new_node_storage in peer_status)
        if state == 'present':
            msg = ('Node %s did not get attached in gluster peer status %s'
                   % (new_node_manage, peer_status))
            self.assertTrue(found, msg)
        elif state == 'absent':
            msg = ('Node %s did not get deattached in gluster peer status %s'
                   % (new_node_manage, peer_status))
            self.assertFalse(found, msg)
        else:
            msg = "State %s is other than present, absent" % state
            raise AssertionError(msg)

    def verify_node_is_present_or_not_in_heketi(
            self, node_id, manage_hostname, storage_ip, state='present'):

        topology = heketi_ops.heketi_topology_info(
            self.h_node, self.h_url, json=True)
        if state == 'present':
            present = False
            for node in topology['clusters'][0]['nodes']:
                if node_id == node['id']:
                    self.assertEqual(
                        manage_hostname, node['hostnames']['manage'][0])
                    self.assertEqual(
                        storage_ip, node['hostnames']['storage'][0])
                    present = True
                    break
            self.assertTrue(present, 'Node %s not found in heketi' % node_id)

        elif state == 'absent':
            for node in topology['clusters'][0]['nodes']:
                self.assertNotEqual(
                    manage_hostname, node['hostnames']['manage'][0])
                self.assertNotEqual(
                    storage_ip, node['hostnames']['storage'][0])
                self.assertNotEqual(node_id, node['id'])
        else:
            msg = "State %s is other than present, absent" % state
            raise AssertionError(msg)

    def verify_gluster_server_present_in_heketi_vol_info_or_not(
            self, vol_ids, gluster_server, state='present'):

        # Verify gluster servers in vol info
        for vol_id in vol_ids:
            g_servers = heketi_ops.get_vol_file_servers_and_hosts(
                self.h_node, self.h_url, vol_id)
            g_servers = (g_servers['vol_servers'] + g_servers['vol_hosts'])
            if state == 'present':
                self.assertIn(gluster_server, g_servers)
            elif state == 'absent':
                self.assertNotIn(gluster_server, g_servers)
            else:
                msg = "State %s is other than present, absent" % state
                raise AssertionError(msg)

    def verify_volume_bricks_are_present_or_not_on_heketi_node(
            self, vol_ids, node_id, state='present'):

        for vol_id in vol_ids:
            vol_info = heketi_ops.heketi_volume_info(
                self.h_node, self.h_url, vol_id, json=True)
            bricks = vol_info['bricks']
            self.assertFalse((len(bricks) % 3))
            if state == 'present':
                found = False
                for brick in bricks:
                    if brick['node'] == node_id:
                        found = True
                        break
                self.assertTrue(
                    found, 'Bricks of vol %s does not present on node %s'
                    % (vol_id, node_id))
            elif state == 'absent':
                for brick in bricks:
                    self.assertNotEqual(
                        brick['node'], node_id, 'Bricks of vol %s is present '
                        'on node %s' % (vol_id, node_id))
            else:
                msg = "State %s is other than present, absent" % state
                raise AssertionError(msg)

    def get_ready_for_node_add(self, hostname):
        self.configure_node_to_run_gluster(hostname)

        h_nodes = heketi_ops.heketi_node_list(self.h_node, self.h_url)

        # Disable nodes except first two nodes
        for node_id in h_nodes[2:]:
            heketi_ops.heketi_node_disable(self.h_node, self.h_url, node_id)
            self.addCleanup(
                heketi_ops.heketi_node_enable, self.h_node, self.h_url,
                node_id)

    def add_device_on_heketi_node(self, node_id, device_name):

        # Add Devices on heketi node
        heketi_ops.heketi_device_add(
            self.h_node, self.h_url, device_name, node_id)

        # Get device id of newly added device
        node_info = heketi_ops.heketi_node_info(
            self.h_node, self.h_url, node_id, json=True)
        for device in node_info['devices']:
            if device['name'] == device_name:
                return device['id']
        raise AssertionError('Device %s did not found on node %s' % (
            device_name, node_id))

    def delete_node_and_devices_on_it(self, node_id):

        heketi_ops.heketi_node_disable(self.h_node, self.h_url, node_id)
        heketi_ops.heketi_node_remove(self.h_node, self.h_url, node_id)
        node_info = heketi_ops.heketi_node_info(
            self.h_node, self.h_url, node_id, json=True)
        for device in node_info['devices']:
            heketi_ops.heketi_device_delete(
                self.h_node, self.h_url, device['id'])
        heketi_ops.heketi_node_delete(self.h_node, self.h_url, node_id)

    @pytest.mark.tier1
    @ddt.data('remove', 'delete')
    def test_heketi_node_remove_or_delete(self, operation='delete'):
        """Test node remove and delete functionality of heketi and validate
        gluster peer status and heketi topology.
        """
        # Get node info to be added in heketi cluster
        new_node = self.get_node_to_be_added()
        new_node_manage = new_node['manage']
        new_node_storage = new_node['storage']

        self.get_ready_for_node_add(new_node_manage)

        h, h_node, h_url = heketi_ops, self.h_node, self.h_url

        # Get cluster id where node needs to be added.
        cluster_id = h.heketi_cluster_list(h_node, h_url, json=True)
        cluster_id = cluster_id['clusters'][0]

        h_nodes_list = h.heketi_node_list(h_node, h_url)

        node_needs_cleanup = False
        try:
            # Add new node
            h_new_node = h.heketi_node_add(
                h_node, h_url, 1, cluster_id, new_node_manage,
                new_node_storage, json=True)['id']
            node_needs_cleanup = True

            # Get hostname of first gluster node
            g_node = h.heketi_node_info(
                h_node, h_url, h_nodes_list[0],
                json=True)['hostnames']['manage'][0]

            # Verify gluster peer status
            self.verify_gluster_peer_status(
                g_node, new_node_manage, new_node_storage)

            # Add Devices on newly added node
            device_id = self.add_device_on_heketi_node(
                h_new_node, new_node['devices'][0])

            # Create PVC's and DC's
            vol_count = 5
            pvcs = self.create_and_wait_for_pvcs(pvc_amount=vol_count)
            dcs = self.create_dcs_with_pvc(pvcs)

            # Get vol id's
            vol_ids = self.get_vol_ids_by_pvc_names(pvcs)

            # Get bricks count on newly added node
            bricks = h.get_bricks_on_heketi_node(
                h_node, h_url, h_new_node)
            self.assertGreaterEqual(len(bricks), vol_count)

            # Enable the nodes back, which were disabled earlier
            for node_id in h_nodes_list[2:]:
                h.heketi_node_enable(h_node, h_url, node_id)

            if operation == 'remove':
                # Remove the node
                h.heketi_node_disable(h_node, h_url, h_new_node)
                h.heketi_node_remove(h_node, h_url, h_new_node)
                # Delete the device
                h.heketi_device_delete(h_node, h_url, device_id)
            elif operation == 'delete':
                # Remove and delete device
                h.heketi_device_disable(h_node, h_url, device_id)
                h.heketi_device_remove(h_node, h_url, device_id)
                h.heketi_device_delete(h_node, h_url, device_id)
                # Remove node
                h.heketi_node_disable(h_node, h_url, h_new_node)
                h.heketi_node_remove(h_node, h_url, h_new_node)
            else:
                msg = "Operation %s is other than remove, delete." % operation
                raise AssertionError(msg)

            # Delete Node
            h.heketi_node_delete(h_node, h_url, h_new_node)
            node_needs_cleanup = False

            # Verify node is deleted from heketi
            self.verify_node_is_present_or_not_in_heketi(
                h_new_node, new_node_manage, new_node_storage, state='absent')

            # Verify gluster peer status
            self.verify_gluster_peer_status(
                g_node, new_node_manage, new_node_storage, state='absent')

            # Verify gluster servers are not present in vol info
            self.verify_gluster_server_present_in_heketi_vol_info_or_not(
                vol_ids, new_node_storage, state='absent')

            # Verify vol bricks are not present on deleted nodes
            self.verify_volume_bricks_are_present_or_not_on_heketi_node(
                vol_ids, new_node_storage, state='absent')

            # Wait for heal to complete
            gluster_ops.wait_to_heal_complete(g_node=g_node)

            for _, pod in dcs.values():
                openshift_ops.wait_for_pod_be_ready(self.node, pod, timeout=1)
        finally:
            # Cleanup newly added Node
            if node_needs_cleanup:
                self.addCleanup(self.delete_node_and_devices_on_it, h_new_node)

            # Enable the nodes back, which were disabled earlier
            for node_id in h_nodes_list[2:]:
                self.addCleanup(h.heketi_node_enable, h_node, h_url, node_id)

    @pytest.mark.tier4
    @ddt.data(
        ("volume", "create"),
        ("volume", "delete"),
        ("volume", "expand"),
        ("blockvolume", "create"),
        ("blockvolume", "delete"),
    )
    @ddt.unpack
    def test_volume_operations_while_node_removal_is_running(
            self, vol_type, vol_operation):
        """Test volume operations like create, delete and expand while node
        removal is running parallely at backend.
        """
        # Get node info to be added in heketi cluster
        new_node = self.get_node_to_be_added()
        new_node_manage = new_node['manage']
        new_node_storage = new_node['storage']

        self.get_ready_for_node_add(new_node_manage)

        h, h_node, h_url = heketi_ops, self.h_node, self.h_url

        # Get cluster id where node needs to be added.
        cluster_id = h.heketi_cluster_list(h_node, h_url, json=True)
        cluster_id = cluster_id['clusters'][0]

        h_nodes_list = h.heketi_node_list(h_node, h_url)

        node_needs_cleanup = False
        try:
            # Add new node
            h_new_node = h.heketi_node_add(
                h_node, h_url, 1, cluster_id, new_node_manage,
                new_node_storage, json=True)['id']
            node_needs_cleanup = True

            # Get hostname of first gluster node
            g_node = h.heketi_node_info(
                h_node, h_url, h_nodes_list[0],
                json=True)['hostnames']['manage'][0]

            # Verify gluster peer status
            self.verify_gluster_peer_status(
                g_node, new_node_manage, new_node_storage)

            # Add Devices on newly added node
            device_id = self.add_device_on_heketi_node(
                h_new_node, new_node['devices'][0])

            # Create Volumes
            vol_count = 5
            for i in range(vol_count):
                vol_info = h.heketi_volume_create(
                    h_node, h_url, 1, clusters=cluster_id, json=True)
                self.addCleanup(
                    h.heketi_volume_delete, h_node, h_url, vol_info['id'])

            # Get bricks count on newly added node
            bricks = h.get_bricks_on_heketi_node(
                h_node, h_url, h_new_node)
            self.assertGreaterEqual(len(bricks), vol_count)

            # Enable the nodes back, which were disabled earlier
            for node_id in h_nodes_list[2:]:
                h.heketi_node_enable(h_node, h_url, node_id)

            # Disable the node
            h.heketi_node_disable(h_node, h_url, h_new_node)

            vol_count = 2 if vol_type == 'volume' else 5
            if vol_operation in ('delete', 'expand'):
                # Create file/block volumes to delete or expand while node
                # removal is running
                vol_list = []
                for i in range(vol_count):
                    vol_info = getattr(h, "heketi_%s_create" % vol_type)(
                        h_node, h_url, 1, clusters=cluster_id, json=True)
                    self.addCleanup(
                        getattr(h, "heketi_%s_delete" % vol_type),
                        h_node, h_url, vol_info['id'], raise_on_error=False)
                    vol_list.append(vol_info)

            # Remove the node and devices on it
            cmd = 'heketi-cli node remove %s -s %s --user %s --secret %s' % (
                h_new_node, h_url, self.heketi_cli_user, self.heketi_cli_key)
            proc = g.run_async(h_node, cmd)

            if vol_operation == 'create':
                # Create file/block volume while node removal is running
                for i in range(vol_count):
                    vol_info = getattr(h, "heketi_%s_create" % vol_type)(
                        h_node, h_url, 1, clusters=cluster_id, json=True)
                    self.addCleanup(
                        getattr(h, "heketi_%s_delete" % vol_type),
                        h_node, h_url, vol_info['id'])

            elif vol_operation == 'delete':
                # Delete file/block volume while node removal is running
                for vol in vol_list:
                    getattr(h, "heketi_%s_delete" % vol_type)(
                        h_node, h_url, vol['id'])

            elif vol_operation == 'expand':
                # Expand file volume while node removal is running
                for vol in vol_list:
                    vol_info = h.heketi_volume_expand(
                        h_node, h_url, vol['id'], '1', json=True)
                    self.assertEqual(2, vol_info['size'])

            else:
                msg = "Invalid vol_operation %s" % vol_operation
                raise AssertionError(msg)

            # Get the status of node removal command
            retcode, stdout, stderr = proc.async_communicate()
            msg = 'Failed to remove node %s from heketi with error %s' % (
                h_new_node, stderr)
            self.assertFalse(retcode, msg)

            h.heketi_device_delete(h_node, h_url, device_id)
            h.heketi_node_delete(h_node, h_url, h_new_node)
            node_needs_cleanup = False

            # Verify node is deleted from heketi
            self.verify_node_is_present_or_not_in_heketi(
                h_new_node, new_node_manage, new_node_storage, state='absent')

            # Verify gluster peer status
            self.verify_gluster_peer_status(
                g_node, new_node_manage, new_node_storage, state='absent')
        finally:
            # Cleanup newly added Node
            if node_needs_cleanup:
                self.addCleanup(self.delete_node_and_devices_on_it, h_new_node)

            # Enable the nodes back, which were disabled earlier
            for node_id in h_nodes_list[2:]:
                self.addCleanup(h.heketi_node_enable, h_node, h_url, node_id)