summaryrefslogtreecommitdiffstats
path: root/tests/functional/common/heketi/test_heketi_volume_operations.py
blob: 3e8b4f65aaa57ab0842ac5a094f928694f5f5d86 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/python

from glusto.core import Glusto as g
from cnslibs.common.heketi_ops import (heketi_create_topology,
                                       heketi_topology_load,
                                       heketi_volume_delete,
                                       heketi_volume_create,
                                       heketi_volume_expand,
                                       heketi_volume_info)
from cnslibs.common.heketi_libs import HeketiClientSetupBaseClass
from cnslibs.common.exceptions import ExecutionError, ConfigError


class TestHeketiVolumeOperations(HeketiClientSetupBaseClass):
    """
    Class to test heketi volume operations - create, expand
    """

    @classmethod
    def setUpClass(cls):
        super(TestHeketiVolumeOperations, cls).setUpClass()

        if cls.deployment_type == "crs_heketi_outside_openshift":
            ret = heketi_create_topology(cls.heketi_client_node,
                                         cls.topology_info)
            if not ret:
                raise ConfigError("Failed to create heketi topology file on %s"
                                  % cls.heketi_client_node)

            ret = heketi_topology_load(cls.heketi_client_node,
                                       cls.heketi_server_url)
            if not ret:
                raise ConfigError("Failed to load heketi topology on %s"
                                  % cls.heketi_client_node)

        cls.volume_id = None

    def volume_cleanup(self):
        """
        Method to cleanup volume in self.addCleanup()
        """
        if self.volume_id is not None:
            out = heketi_volume_delete(self.heketi_client_node,
                                       self.heketi_server_url, self.volume_id)
            output_str = 'Volume %s deleted' % self.volume_id
            if output_str not in out:
                raise ExecutionError("Failed to delete heketi volume of id %s"
                                     % self.volume_id)

    def test_heketi_with_default_options(self):
        """
        Test to create volume with default options.
        """

        volume_size = self.heketi_volume['size']
        kwargs = {"json": True}
        vol_info = heketi_volume_create(self.heketi_client_node,
                                        self.heketi_server_url, volume_size,
                                        **kwargs)
        self.assertTrue(vol_info, ("Failed to create heketi volume of size %s"
                                   % str(volume_size)))
        self.volume_id = vol_info['id']
        self.addCleanup(self.volume_cleanup)

        self.assertEqual(vol_info['size'], int(volume_size),
                         ("Failed to create volume with default options."
                          "Expected Size: %s, Actual Size: %s"
                          % (str(volume_size), str(vol_info['size']))))

    def test_heketi_with_expand_volume(self):
        """
        Test volume expand and size if updated correctly in heketi-cli info
        """

        kwargs = {"json": True}
        volume_size = self.heketi_volume['size']
        vol_info = heketi_volume_create(self.heketi_client_node,
                                        self.heketi_server_url, volume_size,
                                        **kwargs)
        self.assertTrue(vol_info, ("Failed to create heketi volume of size %s"
                                   % str(volume_size)))
        self.volume_id = vol_info['id']
        self.addCleanup(self.volume_cleanup)
        self.assertEqual(vol_info['size'], int(volume_size),
                         ("Failed to create volume."
                          "Expected Size: %s, Actual Size: %s"
                          % (str(volume_size), str(vol_info['size']))))

        expand_size = self.heketi_volume['expand_size']
        ret = heketi_volume_expand(self.heketi_client_node,
                                   self.heketi_server_url, self.volume_id,
                                   expand_size)
        self.assertTrue(ret, ("Failed to expand heketi volume of id %s"
                              % self.volume_id))
        volume_info = heketi_volume_info(self.heketi_client_node,
                                         self.heketi_server_url,
                                         self.volume_id, **kwargs)
        expected_size = int(volume_size) + int(expand_size)
        self.assertEqual(volume_info['size'], expected_size,
                         ("Volume Expansion failed Expected Size: %s, Actual "
                          "Size: %s" % (str(expected_size),
                                        str(volume_info['size']))))