-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use librbd for rbd management in ceph-integration #273
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
import multiprocessing | ||
import os | ||
import rados | ||
import rbd | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @r0h4n ceph already have rbd as dependency |
||
|
||
from gevent import Timeout | ||
from Queue import Empty | ||
|
||
RADOS_TIMEOUT = 20 | ||
RADOS_NAME = 'client.admin' | ||
SRC_DIR = '/etc/ceph' | ||
|
||
|
||
class RbdOperationTimout(Exception): | ||
pass | ||
|
||
|
||
class ClusterHandle(): | ||
|
||
def __init__(self, cluster_name): | ||
self.cluster_name = cluster_name | ||
|
||
def __enter__(self): | ||
if SRC_DIR: | ||
conf_file = os.path.join(SRC_DIR, self.cluster_name + ".conf") | ||
else: | ||
conf_file = '' | ||
|
||
self.cluster_handle = rados.Rados( | ||
name=RADOS_NAME, | ||
clustername=self.cluster_name, | ||
conffile=conf_file) | ||
self.cluster_handle.connect(timeout=RADOS_TIMEOUT) | ||
|
||
return self.cluster_handle | ||
|
||
def __exit__(self, *args): | ||
self.cluster_handle.shutdown() | ||
|
||
|
||
def rbd_operation(cluster_name, attributes): | ||
err = None | ||
p = None | ||
queue = multiprocessing.Queue() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @r0h4n i am using multiprocessing for sub-process so i used multiprocessing queue |
||
if attributes['operation'] == "update": | ||
p = multiprocessing.Process(target=_update_rbd, args=( | ||
cluster_name, attributes)) | ||
if attributes['operation'] == "create": | ||
p = multiprocessing.Process(target=_create_rbd, args=( | ||
cluster_name, attributes, queue)) | ||
if attributes['operation'] == "delete": | ||
p = multiprocessing.Process(target=_delete_rbd, args=( | ||
cluster_name, attributes, queue)) | ||
p.start() | ||
# Wait for 20 seconds or until process finishes | ||
p.join(20) | ||
# If thread is still active | ||
if p.is_alive(): | ||
# Terminate | ||
p.terminate() | ||
err = "rbd command timed out" | ||
else: | ||
try: | ||
err = queue.get_nowait() | ||
except Empty: | ||
pass | ||
if err: | ||
result = dict({'status': 1, | ||
'err': err}) | ||
else: | ||
result = dict({'status': 0, | ||
'err': ''}) | ||
return result | ||
|
||
|
||
def _update_rbd(cluster_name, attributes): | ||
with ClusterHandle(cluster_name) as cluster: | ||
with cluster.open_ioctx(attributes['pool_name']) as ioctx: | ||
with rbd.Image(ioctx, attributes["name"]) as image: | ||
# converting bytes to gb | ||
size = int(attributes['size']) * 1024 * 1024 | ||
image.resize(size) | ||
|
||
|
||
def _create_rbd(cluster_name, attributes, queue): | ||
with ClusterHandle(cluster_name) as cluster: | ||
with cluster.open_ioctx(attributes['pool_name']) as ioctx: | ||
try: | ||
# converting bytes to gb | ||
size = int(attributes['size']) * 1024 * 1024 | ||
rbd_inst = rbd.RBD() | ||
rbd_inst.create(ioctx, attributes['name'], size) | ||
except (rbd.ImageExists, TypeError, | ||
rbd.InvalidArgument, rbd.FunctionNotSupported) as ex: | ||
queue.put(ex.message) | ||
|
||
|
||
def _delete_rbd(cluster_name, attributes, queue): | ||
with ClusterHandle(cluster_name) as cluster: | ||
with cluster.open_ioctx(attributes['pool_name']) as ioctx: | ||
try: | ||
rbd_inst = rbd.RBD() | ||
rbd_inst.remove(ioctx, attributes['name']) | ||
except (rbd.ImageNotFound, rbd.ImageBusy, | ||
rbd.ImageHasSnapshots) as ex: | ||
queue.put(ex.message) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,8 +20,9 @@ def _resolve_pool(self, pool_id): | |
|
||
def delete_rbd(self, pool_id=None, rbd_name=None): | ||
# Resolve pool ID to name | ||
pool_name = self._resolve_pool(pool_id)['pool_name'] | ||
|
||
attributes = {} | ||
attributes['pool_name'] = self._resolve_pool(pool_id)['pool_name'] | ||
attributes['operation'] = 'delete' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. attributes['operation'] = 'delete' to call specific function in librbd_util.py |
||
# TODO(Rohan) perhaps the REST API should have something in the body to | ||
# make it slightly harder to accidentally delete a pool, to respect | ||
# the severity of this operation since we're hiding the | ||
|
@@ -31,39 +32,31 @@ def delete_rbd(self, pool_id=None, rbd_name=None): | |
# e.g. | ||
# if the name is wrong we should be sending a structured errors dict | ||
# that they can use to associate the complaint with the 'name' field. | ||
commands = [ | ||
'rm', rbd_name | ||
] | ||
attributes['name'] = rbd_name | ||
return RbdMapModifyingRequest( | ||
"Deleting image '{name}'".format(name=rbd_name), | ||
pool_name, commands | ||
attributes | ||
) | ||
|
||
def update(self, rbd_name, attributes): | ||
pool = self._resolve_pool(attributes['pool_id']) | ||
pool_name = pool['pool_name'] | ||
attributes['pool_name'] = pool['pool_name'] | ||
attributes['name'] = rbd_name | ||
attributes['operation'] = "update" | ||
|
||
if 'size' in attributes: | ||
commands = [ | ||
'resize', '--image', rbd_name, '--size', attributes.get('size') | ||
] | ||
return RbdMapModifyingRequest( | ||
"Modifying image '{name}' ({attrs})".format( | ||
name=rbd_name, attrs=", ".join( | ||
"%s=%s" % (k, v) for k, v in attributes.items()) | ||
), | ||
pool_name, | ||
commands | ||
) | ||
return RbdMapModifyingRequest( | ||
"Modifying image '{name}' ({attrs})".format( | ||
name=rbd_name, attrs=", ".join( | ||
"%s=%s" % (k, v) for k, v in attributes.items()) | ||
), | ||
attributes | ||
) | ||
|
||
def create(self, attributes): | ||
pool = self._resolve_pool(attributes['pool_id']) | ||
pool_name = pool['pool_name'] | ||
|
||
commands = [ | ||
'create', attributes['name'], '--size', attributes['size'] | ||
] | ||
attributes['pool_name'] = pool['pool_name'] | ||
attributes['operation'] = "create" | ||
|
||
return RbdCreatingRequest( | ||
"Creating image '{name}'".format(name=attributes['name']), | ||
attributes['name'], pool_name, commands) | ||
attributes) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -164,8 +164,8 @@ namespace.ceph: | |
mandatory: | ||
- Rbd.name | ||
- Rbd.size | ||
optional: | ||
- Rbd.pool_id | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rbd.pool_id is must because librbd requires pool_name compulsory, so if we pass pool_id then only pool_name is identified and passed. |
||
optional: | ||
- Rbd.pool_crush_ruleset | ||
- Rbd.pool_erasure_code_profile | ||
- Rbd.pool_min_size | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@r0h4n i have tried with gevent, problem with gevent is when cluster goes to error state then all librbd function call are not responding so i tried to timeout that function using gevent but gevent can't time out that subprocess. Because gevent required gevent.sleep in sub process function but librbd is built in library so we can't use it. i have tried all the option using gevent but it is not work for this case. Multi-process can timeout the the subprocess in this case.