Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use librbd for rbd management in ceph-integration #273

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions tendrl/ceph_integration/librbd_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import multiprocessing
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@r0h4n i have tried with gevent, problem with gevent is when cluster goes to error state then all librbd function call are not responding so i tried to timeout that function using gevent but gevent can't time out that subprocess. Because gevent required gevent.sleep in sub process function but librbd is built in library so we can't use it. i have tried all the option using gevent but it is not work for this case. Multi-process can timeout the the subprocess in this case.

import os
import rados
import rbd
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@r0h4n ceph already have rbd as dependency


from gevent import Timeout
from Queue import Empty

RADOS_TIMEOUT = 20
RADOS_NAME = 'client.admin'
SRC_DIR = '/etc/ceph'


class RbdOperationTimout(Exception):
pass


class ClusterHandle():

def __init__(self, cluster_name):
self.cluster_name = cluster_name

def __enter__(self):
if SRC_DIR:
conf_file = os.path.join(SRC_DIR, self.cluster_name + ".conf")
else:
conf_file = ''

self.cluster_handle = rados.Rados(
name=RADOS_NAME,
clustername=self.cluster_name,
conffile=conf_file)
self.cluster_handle.connect(timeout=RADOS_TIMEOUT)

return self.cluster_handle

def __exit__(self, *args):
self.cluster_handle.shutdown()


def rbd_operation(cluster_name, attributes):
err = None
p = None
queue = multiprocessing.Queue()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@r0h4n i am using multiprocessing for sub-process so i used multiprocessing queue

if attributes['operation'] == "update":
p = multiprocessing.Process(target=_update_rbd, args=(
cluster_name, attributes))
if attributes['operation'] == "create":
p = multiprocessing.Process(target=_create_rbd, args=(
cluster_name, attributes, queue))
if attributes['operation'] == "delete":
p = multiprocessing.Process(target=_delete_rbd, args=(
cluster_name, attributes, queue))
p.start()
# Wait for 20 seconds or until process finishes
p.join(20)
# If thread is still active
if p.is_alive():
# Terminate
p.terminate()
err = "rbd command timed out"
else:
try:
err = queue.get_nowait()
except Empty:
pass
if err:
result = dict({'status': 1,
'err': err})
else:
result = dict({'status': 0,
'err': ''})
return result


def _update_rbd(cluster_name, attributes):
with ClusterHandle(cluster_name) as cluster:
with cluster.open_ioctx(attributes['pool_name']) as ioctx:
with rbd.Image(ioctx, attributes["name"]) as image:
# converting bytes to gb
size = int(attributes['size']) * 1024 * 1024
image.resize(size)


def _create_rbd(cluster_name, attributes, queue):
with ClusterHandle(cluster_name) as cluster:
with cluster.open_ioctx(attributes['pool_name']) as ioctx:
try:
# converting bytes to gb
size = int(attributes['size']) * 1024 * 1024
rbd_inst = rbd.RBD()
rbd_inst.create(ioctx, attributes['name'], size)
except (rbd.ImageExists, TypeError,
rbd.InvalidArgument, rbd.FunctionNotSupported) as ex:
queue.put(ex.message)


def _delete_rbd(cluster_name, attributes, queue):
with ClusterHandle(cluster_name) as cluster:
with cluster.open_ioctx(attributes['pool_name']) as ioctx:
try:
rbd_inst = rbd.RBD()
rbd_inst.remove(ioctx, attributes['name'])
except (rbd.ImageNotFound, rbd.ImageBusy,
rbd.ImageHasSnapshots) as ex:
queue.put(ex.message)
43 changes: 18 additions & 25 deletions tendrl/ceph_integration/manager/rbd_request_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ def _resolve_pool(self, pool_id):

def delete_rbd(self, pool_id=None, rbd_name=None):
# Resolve pool ID to name
pool_name = self._resolve_pool(pool_id)['pool_name']

attributes = {}
attributes['pool_name'] = self._resolve_pool(pool_id)['pool_name']
attributes['operation'] = 'delete'
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attributes['operation'] = 'delete' to call specific function in librbd_util.py

# TODO(Rohan) perhaps the REST API should have something in the body to
# make it slightly harder to accidentally delete a pool, to respect
# the severity of this operation since we're hiding the
Expand All @@ -31,39 +32,31 @@ def delete_rbd(self, pool_id=None, rbd_name=None):
# e.g.
# if the name is wrong we should be sending a structured errors dict
# that they can use to associate the complaint with the 'name' field.
commands = [
'rm', rbd_name
]
attributes['name'] = rbd_name
return RbdMapModifyingRequest(
"Deleting image '{name}'".format(name=rbd_name),
pool_name, commands
attributes
)

def update(self, rbd_name, attributes):
pool = self._resolve_pool(attributes['pool_id'])
pool_name = pool['pool_name']
attributes['pool_name'] = pool['pool_name']
attributes['name'] = rbd_name
attributes['operation'] = "update"

if 'size' in attributes:
commands = [
'resize', '--image', rbd_name, '--size', attributes.get('size')
]
return RbdMapModifyingRequest(
"Modifying image '{name}' ({attrs})".format(
name=rbd_name, attrs=", ".join(
"%s=%s" % (k, v) for k, v in attributes.items())
),
pool_name,
commands
)
return RbdMapModifyingRequest(
"Modifying image '{name}' ({attrs})".format(
name=rbd_name, attrs=", ".join(
"%s=%s" % (k, v) for k, v in attributes.items())
),
attributes
)

def create(self, attributes):
pool = self._resolve_pool(attributes['pool_id'])
pool_name = pool['pool_name']

commands = [
'create', attributes['name'], '--size', attributes['size']
]
attributes['pool_name'] = pool['pool_name']
attributes['operation'] = "create"

return RbdCreatingRequest(
"Creating image '{name}'".format(name=attributes['name']),
attributes['name'], pool_name, commands)
attributes)
30 changes: 14 additions & 16 deletions tendrl/ceph_integration/manager/user_request.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import uuid

from tendrl.ceph_integration import ceph
from tendrl.ceph_integration import librbd_utils
from tendrl.ceph_integration.types import OsdMap
from tendrl.ceph_integration.types import PgSummary
from tendrl.ceph_integration.types import USER_REQUEST_COMPLETE
Expand Down Expand Up @@ -301,14 +302,13 @@ class RbdRequest(UserRequest):

"""

def __init__(self, headline, pool_name, commands):
self._commands = commands
self._pool_name = pool_name
def __init__(self, headline, attributes):
self._attributes = attributes
super(RbdRequest, self).__init__(headline)

def _submit(self, commands=None):
if commands is None:
commands = self._commands
def _submit(self, attributes=None):
if attributes is None:
attributes = self._attributes

Event(
Message(
Expand All @@ -317,16 +317,15 @@ def _submit(self, commands=None):
payload={"message": "%s._submit: %s/%s" %
(self.__class__.__name__,
NS.state_sync_thread.name,
commands
attributes
)
}
)
)

return ceph.rbd_command(
return librbd_utils.rbd_operation(
NS.state_sync_thread.name,
commands,
self._pool_name
attributes,
)


Expand Down Expand Up @@ -418,9 +417,9 @@ class RbdMapModifyingRequest(RbdRequest):

"""

def __init__(self, headline, pool_name, commands):
def __init__(self, headline, attributes):
super(RbdMapModifyingRequest, self).__init__(
headline, pool_name, commands)
headline, attributes)

@property
def associations(self):
Expand Down Expand Up @@ -514,11 +513,10 @@ def __init__(self, headline, ec_profile_name,


class RbdCreatingRequest(RbdMapModifyingRequest):
def __init__(self, headline, rbd_name, pool_name,
commands):
def __init__(self, headline, attributes):
super(RbdCreatingRequest, self).__init__(
headline, pool_name, commands)
self._rbd_name = rbd_name
headline, attributes)
self._rbd_name = attributes['name']


class PgProgress(object):
Expand Down
2 changes: 1 addition & 1 deletion tendrl/ceph_integration/objects/definition/ceph.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ namespace.ceph:
mandatory:
- Rbd.name
- Rbd.size
optional:
- Rbd.pool_id
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rbd.pool_id is must because librbd requires pool_name compulsory, so if we pass pool_id then only pool_name is identified and passed.

optional:
- Rbd.pool_crush_ruleset
- Rbd.pool_erasure_code_profile
- Rbd.pool_min_size
Expand Down