Skip to content

Commit

Permalink
Use librbd for rbd management in ceph-integration
Browse files Browse the repository at this point in the history
tendrl-bug-id: Tendrl#150

Signed-off-by: root <[email protected]>
  • Loading branch information
root committed Jun 12, 2017
1 parent e12af61 commit 955e5d5
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 136 deletions.
126 changes: 88 additions & 38 deletions tendrl/ceph_integration/ceph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
import subprocess
import tempfile
import time
import rbd
import multiprocessing

import msgpack

from tendrl.commons.event import Event
from tendrl.commons.message import Message
from tendrl.commons.utils import cmd_utils


# Note: do not import ceph modules at this scope, otherwise this module won't
# be able to cleanly talk to us about systems where ceph isn't installed yet.
Expand Down Expand Up @@ -375,45 +379,91 @@ def ceph_command(cluster_name, command_args):
}


def rbd_command(cluster_name, command_args, pool_name=None):
"""Run a rbd CLI operation directly. This is a fallback to allow
manual execution of arbitrary commands in case the user wants to
do something that is absent or broken in Calamari proper.
:param pool_name: Ceph pool name, or None to run without --pool argument
:param command_args: Command line, excluding the leading 'rbd' part.
"""

if pool_name:
args = ["rbd", "--pool", pool_name, "--cluster", cluster_name] + \
command_args
def rbd_operation(cluster_name, operation, attr, pool_name, result, queue):
import rados
cluster = rados.Rados(name=RADOS_NAME,
clustername=cluster_name,
conffile='')
cluster.connect()

try:
ioctx = cluster.open_ioctx(pool_name)
try:
if operation == "create":
rbd_inst = rbd.RBD()
rbd_inst.create(ioctx, attr["name"], int(attr["size"]))
if operation == "delete":
rbd_inst = rbd.RBD()
rbd_inst.remove(ioctx, attr["name"])
if operation == "update":
try:
image = rbd.Image(ioctx, attr["name"])
size = int(attr['size'])
image.resize(size)
finally:
image.close()
if operation == "list":
rbd_inst = rbd.RBD()
result['out'] = rbd_inst.list(ioctx)
if operation == "info":
try:
result['out'] = {}
image = rbd.Image(ioctx, attr['name'])
size = int(image.stat()['size'])
result['out']['size'] = size
flags = int(image.flags())
result['out']['flags'] = flags
finally:
image.close()
if operation == "usage":
args = "du --image %s" % attr['name']
cmd_args = "rbd --pool %s --cluster %s " % (pool_name, cluster_name) + \
args
cmd = cmd_utils.Command(cmd_args)
out, err, rc = cmd.run()
if not err:
result['out'] = out
except (TypeError,
rbd.InvalidArgument,
rbd.ImageExists,
rbd.ImageNotFound,
rbd.ImageBusy,
rbd.ImageHasSnapshots,
rbd.FunctionNotSupported,
rbd.ArgumentOutOfRange) as ex:
result['err'] = str(ex)
result['status'] = 1
finally:
ioctx.close()

finally:
cluster.shutdown()
queue.put(result)

def rbd_command(cluster_name, operation, attr, pool_name):
# Starting as a process
result = {}
result['status'] = 0
result['err'] = ''
result['out'] = ''
# Exchanging objects between processes
queue = multiprocessing.Queue()
p = multiprocessing.Process(
target=rbd_operation,
args=(cluster_name, operation, attr, pool_name, result, queue))
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
# Terminate
p.terminate()
p.join()
result['err'] = "rbd command timed out"
result['status'] = 1
else:
args = ["rbd", "--cluster", cluster_name] + command_args

p = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=open(os.devnull, "r"),
close_fds=True
)
if p.poll() is None: # Force kill if process is still alive
gevent.sleep(2)
if p.poll() is None:
p.kill()
return {'out': "", 'err': "rbd command timed out", 'status': 1}

stdout, stderr = p.communicate()
status = p.returncode
return {
'out': stdout,
'err': stderr,
'status': status
}
result = queue.get()
return result


def radosgw_admin_command(command_args):
Expand Down
27 changes: 9 additions & 18 deletions tendrl/ceph_integration/manager/rbd_request_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,39 +31,30 @@ def delete_rbd(self, pool_id=None, rbd_name=None):
# e.g.
# if the name is wrong we should be sending a structured errors dict
# that they can use to associate the complaint with the 'name' field.
commands = [
'rm', rbd_name
]
attr = {}
attr = {"name": rbd_name}
return RbdMapModifyingRequest(
"Deleting image '{name}'".format(name=rbd_name),
pool_name, commands
"delete","Deleting image '{name}'".format(name=rbd_name),
pool_name, attr
)

def update(self, rbd_name, attributes):
pool = self._resolve_pool(attributes['pool_id'])
pool_name = pool['pool_name']

attributes['name'] = rbd_name
if 'size' in attributes:
commands = [
'resize', '--image', rbd_name, '--size', attributes.get('size')
]
return RbdMapModifyingRequest(
"Modifying image '{name}' ({attrs})".format(
"update", "Modifying image '{name}' ({attrs})".format(
name=rbd_name, attrs=", ".join(
"%s=%s" % (k, v) for k, v in attributes.items())
),
pool_name,
commands
attributes
)

def create(self, attributes):
pool = self._resolve_pool(attributes['pool_id'])
pool_name = pool['pool_name']

commands = [
'create', attributes['name'], '--size', attributes['size']
]

return RbdCreatingRequest(
"Creating image '{name}'".format(name=attributes['name']),
attributes['name'], pool_name, commands)
"create", "Creating image '{name}'".format(name=attributes['name']),
pool_name, attributes)
28 changes: 15 additions & 13 deletions tendrl/ceph_integration/manager/user_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,15 @@ class RbdRequest(UserRequest):
"""

def __init__(self, headline, pool_name, commands):
self._commands = commands
def __init__(self, operation, headline, pool_name, attr):
self._attributes = attr
self.operation = operation
self._pool_name = pool_name
super(RbdRequest, self).__init__(headline)

def _submit(self, commands=None):
if commands is None:
commands = self._commands
def _submit(self, attr=None):
if attr is None:
attr = self._attributes

Event(
Message(
Expand All @@ -317,15 +318,16 @@ def _submit(self, commands=None):
payload={"message": "%s._submit: %s/%s" %
(self.__class__.__name__,
NS.state_sync_thread.name,
commands
self.operation
)
}
)
)

return ceph.rbd_command(
NS.state_sync_thread.name,
commands,
self.operation,
attr,
self._pool_name
)

Expand Down Expand Up @@ -418,9 +420,9 @@ class RbdMapModifyingRequest(RbdRequest):
"""

def __init__(self, headline, pool_name, commands):
def __init__(self, operation, headline, pool_name, attr):
super(RbdMapModifyingRequest, self).__init__(
headline, pool_name, commands)
operation, headline, pool_name, attr)

@property
def associations(self):
Expand Down Expand Up @@ -514,11 +516,11 @@ def __init__(self, headline, ec_profile_name,


class RbdCreatingRequest(RbdMapModifyingRequest):
def __init__(self, headline, rbd_name, pool_name,
commands):
def __init__(self, operation, headline, pool_name,
attr):
super(RbdCreatingRequest, self).__init__(
headline, pool_name, commands)
self._rbd_name = rbd_name
operation, headline, pool_name, attr)
self._rbd_name = attr['name']


class PgProgress(object):
Expand Down
2 changes: 1 addition & 1 deletion tendrl/ceph_integration/objects/definition/ceph.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ namespace.ceph:
mandatory:
- Rbd.name
- Rbd.size
optional:
- Rbd.pool_id
optional:
- Rbd.pool_crush_ruleset
- Rbd.pool_erasure_code_profile
- Rbd.pool_min_size
Expand Down
86 changes: 20 additions & 66 deletions tendrl/ceph_integration/sds_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,89 +629,43 @@ def on_sync_object(self, data):
)

def _get_rbds(self, pool_name):
"""Invokes the below CLI commands
1.
```rbd ls --pool {name}```
and required output format is a list of rbds separated with new
lines as below
```
mmrbd1
mmdrbd2
```
2.
```rbd --image {image-name} --pool {pool-name} info```
and the required output format is as below
```
rbd image 'mmrbd1':
size 1024 MB in 256
order 22 (4096 kB objects)
block_name_prefix: rbd_data.1e31238e1f29
format: 2
features: layering, exclusive-lock, object-map, fast-diff, deep-flatten
flags:
```
"""
rbd_details = {}

commands = [
"ls"
]
cmd_out = ceph.rbd_command(
NS.tendrl_context.cluster_name,
commands,
"list",
{},
pool_name
)
if cmd_out['err'] == "":
if cmd_out['err'] == '':
rbd_list = []
for item in cmd_out['out'].split('\n'):
if item != "":
rbd_list.append(item)
for item in cmd_out['out']:
rbd_list.append(item)

for rbd in rbd_list:
commands = [
"info", "--image", rbd
]
rbd_details[rbd] = {}
cmd_out = ceph.rbd_command(
NS.tendrl_context.cluster_name,
commands,
"info",
{"name": rbd},
pool_name
)
if cmd_out['err'] == "":
rbd_info = {}
for item in cmd_out['out'].split('\n')[1:]:
if item != "":
if ":" in item:
key = item.split(':')[0]
if '\t' in key:
key = key[1:]
rbd_info[key] = item.split(':')[1].strip()
else:
key = item.split()[0]
if '\t' in key:
key = key[1:]
rbd_info[key] = item.split()[1].strip()
rbd_details[rbd] = rbd_info

commands = [
"du", "--image", rbd
]
rbd_details[rbd]['size'] = cmd_out['out']['size']
rbd_details[rbd]['flags'] = cmd_out['out']['flags']
cmd_out = ceph.rbd_command(
NS.tendrl_context.cluster_name,
commands,
"usage",
{"name": rbd},
pool_name
)
if cmd_out['err'] == "":
rbd_details[rbd]['provisioned'] = \
cmd_out['out'].split('\n')[1].split()[1]
rbd_details[rbd]['used'] = \
cmd_out['out'].split('\n')[1].split()[2]

out = cmd_out['out']
for i in range(0, len(out.split("\n"))):
if "PROVISIONED" in out.split("\n")[i]:
rbd_details[rbd]['provisioned'] = \
out.split('\n')[i+1].split()[1]
rbd_details[rbd]['used'] = \
out.split('\n')[i+1].split()[2]
return rbd_details

def _get_utilization_data(self):
Expand Down

0 comments on commit 955e5d5

Please sign in to comment.