diff --git a/tendrl/ceph_integration/ceph.py b/tendrl/ceph_integration/ceph.py index 578b39f82..d1a7926fd 100644 --- a/tendrl/ceph_integration/ceph.py +++ b/tendrl/ceph_integration/ceph.py @@ -9,11 +9,15 @@ import subprocess import tempfile import time +import rbd +import multiprocessing import msgpack from tendrl.commons.event import Event from tendrl.commons.message import Message +from tendrl.commons.utils import cmd_utils + # Note: do not import ceph modules at this scope, otherwise this module won't # be able to cleanly talk to us about systems where ceph isn't installed yet. @@ -375,45 +379,91 @@ def ceph_command(cluster_name, command_args): } -def rbd_command(cluster_name, command_args, pool_name=None): - """Run a rbd CLI operation directly. This is a fallback to allow - - manual execution of arbitrary commands in case the user wants to - - do something that is absent or broken in Calamari proper. - - :param pool_name: Ceph pool name, or None to run without --pool argument - - :param command_args: Command line, excluding the leading 'rbd' part. - - """ - - if pool_name: - args = ["rbd", "--pool", pool_name, "--cluster", cluster_name] + \ - command_args +def rbd_operation(cluster_name, operation, attr, pool_name, result, queue): + import rados + cluster = rados.Rados(name=RADOS_NAME, + clustername=cluster_name, + conffile='') + cluster.connect() + + try: + ioctx = cluster.open_ioctx(pool_name) + try: + if operation == "create": + rbd_inst = rbd.RBD() + rbd_inst.create(ioctx, attr["name"], int(attr["size"])) + if operation == "delete": + rbd_inst = rbd.RBD() + rbd_inst.remove(ioctx, attr["name"]) + if operation == "update": + try: + image = rbd.Image(ioctx, attr["name"]) + size = int(attr['size']) + image.resize(size) + finally: + image.close() + if operation == "list": + rbd_inst = rbd.RBD() + result['out'] = rbd_inst.list(ioctx) + if operation == "info": + try: + result['out'] = {} + image = rbd.Image(ioctx, attr['name']) + size = int(image.stat()['size']) + result['out']['size'] = size + flags = int(image.flags()) + result['out']['flags'] = flags + finally: + image.close() + if operation == "usage": + args = "du --image %s" % attr['name'] + cmd_args = "rbd --pool %s --cluster %s " % (pool_name, cluster_name) + \ + args + cmd = cmd_utils.Command(cmd_args) + out, err, rc = cmd.run() + if not err: + result['out'] = out + except (TypeError, + rbd.InvalidArgument, + rbd.ImageExists, + rbd.ImageNotFound, + rbd.ImageBusy, + rbd.ImageHasSnapshots, + rbd.FunctionNotSupported, + rbd.ArgumentOutOfRange) as ex: + result['err'] = str(ex) + result['status'] = 1 + finally: + ioctx.close() + + finally: + cluster.shutdown() + queue.put(result) + +def rbd_command(cluster_name, operation, attr, pool_name): + # Starting as a process + result = {} + result['status'] = 0 + result['err'] = '' + result['out'] = '' + # Exchanging objects between processes + queue = multiprocessing.Queue() + p = multiprocessing.Process( + target=rbd_operation, + args=(cluster_name, operation, attr, pool_name, result, queue)) + p.start() + # Wait for 10 seconds or until process finishes + p.join(10) + # If thread is still active + if p.is_alive(): + # Terminate + p.terminate() + p.join() + result['err'] = "rbd command timed out" + result['status'] = 1 else: - args = ["rbd", "--cluster", cluster_name] + command_args - - p = subprocess.Popen( - args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=open(os.devnull, "r"), - close_fds=True - ) - if p.poll() is None: # Force kill if process is still alive - gevent.sleep(2) - if p.poll() is None: - p.kill() - return {'out': "", 'err': "rbd command timed out", 'status': 1} - - stdout, stderr = p.communicate() - status = p.returncode - return { - 'out': stdout, - 'err': stderr, - 'status': status - } + result = queue.get() + return result def radosgw_admin_command(command_args): diff --git a/tendrl/ceph_integration/manager/rbd_request_factory.py b/tendrl/ceph_integration/manager/rbd_request_factory.py index 14096e5bb..4ffddfd39 100644 --- a/tendrl/ceph_integration/manager/rbd_request_factory.py +++ b/tendrl/ceph_integration/manager/rbd_request_factory.py @@ -31,39 +31,30 @@ def delete_rbd(self, pool_id=None, rbd_name=None): # e.g. # if the name is wrong we should be sending a structured errors dict # that they can use to associate the complaint with the 'name' field. - commands = [ - 'rm', rbd_name - ] + attr = {} + attr = {"name": rbd_name} return RbdMapModifyingRequest( - "Deleting image '{name}'".format(name=rbd_name), - pool_name, commands + "delete","Deleting image '{name}'".format(name=rbd_name), + pool_name, attr ) def update(self, rbd_name, attributes): pool = self._resolve_pool(attributes['pool_id']) pool_name = pool['pool_name'] - + attributes['name'] = rbd_name if 'size' in attributes: - commands = [ - 'resize', '--image', rbd_name, '--size', attributes.get('size') - ] return RbdMapModifyingRequest( - "Modifying image '{name}' ({attrs})".format( + "update", "Modifying image '{name}' ({attrs})".format( name=rbd_name, attrs=", ".join( "%s=%s" % (k, v) for k, v in attributes.items()) ), pool_name, - commands + attributes ) def create(self, attributes): pool = self._resolve_pool(attributes['pool_id']) pool_name = pool['pool_name'] - - commands = [ - 'create', attributes['name'], '--size', attributes['size'] - ] - return RbdCreatingRequest( - "Creating image '{name}'".format(name=attributes['name']), - attributes['name'], pool_name, commands) + "create", "Creating image '{name}'".format(name=attributes['name']), + pool_name, attributes) diff --git a/tendrl/ceph_integration/manager/user_request.py b/tendrl/ceph_integration/manager/user_request.py index 25be6ab98..8c850dc87 100644 --- a/tendrl/ceph_integration/manager/user_request.py +++ b/tendrl/ceph_integration/manager/user_request.py @@ -301,14 +301,15 @@ class RbdRequest(UserRequest): """ - def __init__(self, headline, pool_name, commands): - self._commands = commands + def __init__(self, operation, headline, pool_name, attr): + self._attributes = attr + self.operation = operation self._pool_name = pool_name super(RbdRequest, self).__init__(headline) - def _submit(self, commands=None): - if commands is None: - commands = self._commands + def _submit(self, attr=None): + if attr is None: + attr = self._attributes Event( Message( @@ -317,7 +318,7 @@ def _submit(self, commands=None): payload={"message": "%s._submit: %s/%s" % (self.__class__.__name__, NS.state_sync_thread.name, - commands + self.operation ) } ) @@ -325,7 +326,8 @@ def _submit(self, commands=None): return ceph.rbd_command( NS.state_sync_thread.name, - commands, + self.operation, + attr, self._pool_name ) @@ -418,9 +420,9 @@ class RbdMapModifyingRequest(RbdRequest): """ - def __init__(self, headline, pool_name, commands): + def __init__(self, operation, headline, pool_name, attr): super(RbdMapModifyingRequest, self).__init__( - headline, pool_name, commands) + operation, headline, pool_name, attr) @property def associations(self): @@ -514,11 +516,11 @@ def __init__(self, headline, ec_profile_name, class RbdCreatingRequest(RbdMapModifyingRequest): - def __init__(self, headline, rbd_name, pool_name, - commands): + def __init__(self, operation, headline, pool_name, + attr): super(RbdCreatingRequest, self).__init__( - headline, pool_name, commands) - self._rbd_name = rbd_name + operation, headline, pool_name, attr) + self._rbd_name = attr['name'] class PgProgress(object): diff --git a/tendrl/ceph_integration/objects/definition/ceph.yaml b/tendrl/ceph_integration/objects/definition/ceph.yaml index bf1edfa61..677c65520 100644 --- a/tendrl/ceph_integration/objects/definition/ceph.yaml +++ b/tendrl/ceph_integration/objects/definition/ceph.yaml @@ -150,8 +150,8 @@ namespace.ceph: mandatory: - Rbd.name - Rbd.size - optional: - Rbd.pool_id + optional: - Rbd.pool_crush_ruleset - Rbd.pool_erasure_code_profile - Rbd.pool_min_size diff --git a/tendrl/ceph_integration/sds_sync/__init__.py b/tendrl/ceph_integration/sds_sync/__init__.py index ec55ecbeb..5494e502b 100644 --- a/tendrl/ceph_integration/sds_sync/__init__.py +++ b/tendrl/ceph_integration/sds_sync/__init__.py @@ -629,89 +629,43 @@ def on_sync_object(self, data): ) def _get_rbds(self, pool_name): - """Invokes the below CLI commands - - 1. - ```rbd ls --pool {name}``` - - and required output format is a list of rbds separated with new - lines as below - - ``` - mmrbd1 - mmdrbd2 - ``` - - 2. - ```rbd --image {image-name} --pool {pool-name} info``` - - and the required output format is as below - - ``` - rbd image 'mmrbd1': - size 1024 MB in 256 - order 22 (4096 kB objects) - block_name_prefix: rbd_data.1e31238e1f29 - format: 2 - features: layering, exclusive-lock, object-map, fast-diff, deep-flatten - flags: - ``` - - """ rbd_details = {} - - commands = [ - "ls" - ] cmd_out = ceph.rbd_command( NS.tendrl_context.cluster_name, - commands, + "list", + {}, pool_name ) - if cmd_out['err'] == "": + if cmd_out['err'] == '': rbd_list = [] - for item in cmd_out['out'].split('\n'): - if item != "": - rbd_list.append(item) + for item in cmd_out['out']: + rbd_list.append(item) + for rbd in rbd_list: - commands = [ - "info", "--image", rbd - ] + rbd_details[rbd] = {} cmd_out = ceph.rbd_command( NS.tendrl_context.cluster_name, - commands, + "info", + {"name": rbd}, pool_name ) if cmd_out['err'] == "": - rbd_info = {} - for item in cmd_out['out'].split('\n')[1:]: - if item != "": - if ":" in item: - key = item.split(':')[0] - if '\t' in key: - key = key[1:] - rbd_info[key] = item.split(':')[1].strip() - else: - key = item.split()[0] - if '\t' in key: - key = key[1:] - rbd_info[key] = item.split()[1].strip() - rbd_details[rbd] = rbd_info - - commands = [ - "du", "--image", rbd - ] + rbd_details[rbd]['size'] = cmd_out['out']['size'] + rbd_details[rbd]['flags'] = cmd_out['out']['flags'] cmd_out = ceph.rbd_command( NS.tendrl_context.cluster_name, - commands, + "usage", + {"name": rbd}, pool_name ) if cmd_out['err'] == "": - rbd_details[rbd]['provisioned'] = \ - cmd_out['out'].split('\n')[1].split()[1] - rbd_details[rbd]['used'] = \ - cmd_out['out'].split('\n')[1].split()[2] - + out = cmd_out['out'] + for i in range(0, len(out.split("\n"))): + if "PROVISIONED" in out.split("\n")[i]: + rbd_details[rbd]['provisioned'] = \ + out.split('\n')[i+1].split()[1] + rbd_details[rbd]['used'] = \ + out.split('\n')[i+1].split()[2] return rbd_details def _get_utilization_data(self):