Skip to content

Commit

Permalink
calamari:add the cache pool api for calamari
Browse files Browse the repository at this point in the history
Signed-off-by: song baisen <[email protected]>
  • Loading branch information
renhwztetecs committed Apr 13, 2016
1 parent d363ecf commit 87c0ba4
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 6 deletions.
2 changes: 2 additions & 0 deletions calamari-common/calamari_common/types.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ class BucketNotEmptyError(Exception):
CRUSH_TYPE = 'crush_type'
CLUSTER = 'cluster'
SERVER = 'server'
CACHE_POOL = 'cache_pool'

# The objects that ClusterMonitor keeps copies of from the mon
SYNC_OBJECT_TYPES = [MdsMap, OsdMap, MonMap, MonStatus, PgSummary, Health, Config]
Expand All @@ -303,3 +304,4 @@ class BucketNotEmptyError(Exception):
# List of allowable things to send as ceph commands to OSDs
OSD_IMPLEMENTED_COMMANDS = ('scrub', 'deep_scrub', 'repair')
OSD_FLAGS = ('pause', 'noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norecover', 'noscrub', 'nodeep-scrub')
CACHE_MOD = ('none', 'writeback', 'forward', 'readonly', 'readforward', 'readproxy')
57 changes: 57 additions & 0 deletions cthulhu/cthulhu/manager/cache_pool_request_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from cthulhu.manager.request_factory import RequestFactory
from cthulhu.manager.user_request import OsdMapModifyingRequest
from cthulhu.log import log
from calamari_common.types import OsdMap


class CachePoolRequestFactory(RequestFactory):

def create(self, attributes):
commands = [('osd tier add', {"pool": attributes['pool'], "tierpool": attributes['tier_pool']})]
if 'mode' in attributes and attributes['mode'] is not None:
commands.append(('osd tier cache-mode', {'pool': attributes['tier_pool'], 'mode': attributes['mode']}))
if attributes['mode'] == 'write_back':
commands.append(
('osd tier set-overlay', {"pool": attributes['pool'], "overlaypool": attributes['tier_pool']}))
else:
log.warn("there is not cache-mode")
message = "Creating cache pool {cache_pool_name} for pool {pool_name}".format(
pool_name=attributes['pool'], cache_pool_name=attributes['tier_pool'])
return OsdMapModifyingRequest(message, self._cluster_monitor.fsid, self._cluster_monitor.name, commands)

def delete(self, attributes):
pool_id = attributes['pool']
tier_pool_id = attributes['tier_pool']
tier_name = self._resolve_pool(tier_pool_id)['pool_name']
pool_name = self._resolve_pool(pool_id)['pool_name']
cache_mode = self._resolve_pool(pool_id)['cache_mode']
commands = []
if cache_mode == "write_back":
commands.append(('osd tier remove-overlay', {'pool': pool_name}))
commands.append(('osd tier remove', {"pool": pool_name, "tierpool": tier_name}))
message = "Remove cache pool {cache_pool_name} for pool {pool_name}".format(
pool_name=pool_name, cache_pool_name=tier_name)
return OsdMapModifyingRequest(message, self._cluster_monitor.fsid, self._cluster_monitor.name, commands)

def _resolve_pool(self, pool_id):
osd_map = self._cluster_monitor.get_sync_object(OsdMap)
if pool_id not in osd_map.pools_by_id:
log.error("there is not pool id %s in osd_map" % pool_id)
raise "there is not pool id %s in osd_map" % pool_id
return osd_map.pools_by_id[pool_id]

def update(self, object_id, attributes):
commands = []
self._set_cache_mode(attributes, commands)
message = "Update cache pool id {pool_name} for cache_pool mode {cache_mode}".format(
pool_name=object_id, cache_mode=attributes['mode'])
return OsdMapModifyingRequest(message, self._cluster_monitor.fsid, self._cluster_monitor.name, commands)

def _set_cache_mode(self, attributes, commands=[]):
if 'mode' in attributes and attributes['mode'] is not None:
commands.append(('osd tier cache-mode', {'pool': attributes['tierpool'], 'mode': attributes['mode']}))
if attributes['mode'] == 'write_back':
commands.append(('osd tier set-overlay', {
"pool": attributes['pool'], "overlaypool": attributes['tier_pool']}))
else:
log.warn("there is not cache-mode")
21 changes: 19 additions & 2 deletions cthulhu/cthulhu/manager/pool_request_factory.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,15 @@ def _pool_attribute_commands(self, pool_name, attributes):

def delete(self, pool_id):
# Resolve pool ID to name
commands=[]
pool_name = self._resolve_pool(pool_id)['pool_name']
master_pool_id = self._resolve_pool(pool_id)['tier_of']
if -1 != self._resolve_pool(pool_id)['tier_of']:
self._del_cache_pool(master_pool_id, pool_id, commands)

cache_pools = self._resolve_pool(pool_id)['tiers']
for cache_id in cache_pools:
self._del_cache_pool(pool_id, cache_id, commands)

# TODO: perhaps the REST API should have something in the body to
# make it slightly harder to accidentally delete a pool, to respect
Expand All @@ -61,11 +69,20 @@ def delete(self, pool_id):
# TODO: handle errors in a way that caller can show to a user, e.g.
# if the name is wrong we should be sending a structured errors dict
# that they can use to associate the complaint with the 'name' field.
commands = [
('osd pool delete', {'pool': pool_name, 'pool2': pool_name, 'sure': '--yes-i-really-really-mean-it'})]
commands.append(('osd pool delete',
{'pool': pool_name, 'pool2': pool_name, 'sure': '--yes-i-really-really-mean-it'}))

return OsdMapModifyingRequest("Deleting pool '{name}'".format(name=pool_name),
self._cluster_monitor.fsid, self._cluster_monitor.name, commands)

def _del_cache_pool(self, pool_id, cache_pool_id, commands=[]):
pool_name = self._resolve_pool(pool_id)['pool_name']
cache_pool = self._resolve_pool(cache_pool_id)['pool_name']
mode = self._resolve_pool(cache_pool_id)['cache_mode']
if mode == "write_back":
commands.append(('osd tier remove-overlay',{'pool':pool_name}))
commands.append(('osd tier remove', {'pool': pool_name, 'tierpool': cache_pool}))

def _pool_min_size(self, req_size, req_min_size):
'''
Find an appropriate "min_size" parameter for a pool create operation
Expand Down
6 changes: 5 additions & 1 deletion cthulhu/cthulhu/manager/rpc.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from cthulhu.manager import config
from cthulhu.log import log
from calamari_common.types import OsdMap, SYNC_OBJECT_STR_TYPE, OSD, OSD_MAP, POOL, CLUSTER, CRUSH_NODE, CRUSH_MAP, CRUSH_RULE, CRUSH_TYPE, ServiceId,\
NotFound, SERVER
NotFound, SERVER, CACHE_POOL
from cthulhu.manager.user_request import SaltRequest


Expand Down Expand Up @@ -203,6 +203,8 @@ def create(self, fs_id, object_type, attributes):
return cluster.request_create(POOL, attributes)
elif object_type == CRUSH_NODE:
return cluster.request_create(CRUSH_NODE, attributes)
elif object_type == CACHE_POOL:
return cluster.request_create(CACHE_POOL, attributes)
else:
raise NotImplementedError(object_type)

Expand All @@ -213,6 +215,8 @@ def delete(self, fs_id, object_type, object_id):
return cluster.request_delete(POOL, object_id)
elif object_type == CRUSH_NODE:
return cluster.request_delete(CRUSH_NODE, object_id)
elif object_type == CACHE_POOL:
return cluster.request_delete(CACHE_POOL, object_id)
else:
raise NotImplementedError(object_type)

Expand Down
17 changes: 17 additions & 0 deletions rest-api/calamari_rest/serializers/v2.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,23 @@ class Meta:
quota_max_bytes = serializers.IntegerField(required=False,
help_text="Quota limit on usage in bytes (0 is unlimited)")

class CachePoolSerializer(ValidatingSerializer):
class Meta:
fields = ( 'cache_pool_id', 'cache_pool_name', 'cache_mode')
create_allowed = ('cache_pool_id', 'cache_mode')
create_required = ('cache_pool_id',)
modify_allowed = ('cache_mode',)
modify_required = ()

# Required in creation
cache_pool_id = serializers.IntegerField(required=False, help_text="Cache pool id")
cache_pool_name = serializers.CharField(required=False, help_text="Pool name of cache pool")

# Not required in creation, immutable
# mode value in range ['none', 'writeback', 'forward', 'readonly']
cache_mode = serializers.CharField(required=False,
help_text="Cache mode of cache pool,value must in "
"['none', 'writeback', 'forward', 'readonly','readforward', 'readproxy']")

class OsdSerializer(ValidatingSerializer):
class Meta:
Expand Down
7 changes: 6 additions & 1 deletion rest-api/calamari_rest/urls/v2.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@
'patch': 'update',
'delete': 'destroy'}),
name='cluster-pool-detail'),

url(r'^cluster/(?P<fsid>[a-zA-Z0-9-]+)/pool/(?P<pool_id>\d+)/cache_pool$',
calamari_rest.views.v2.CachePoolViewSet.as_view({'get': 'list','post': 'create'}),
name='cluster-pool-cache-all-detail'),
url(r'^cluster/(?P<fsid>[a-zA-Z0-9-]+)/pool/(?P<pool_id>\d+)/cache_pool/(?P<cache_pool_id>\d+)$',
calamari_rest.views.v2.CachePoolViewSet.as_view({'get': 'retrieve','delete': 'destroy'}),
name='cluster-pool-cache-detail'),
url(r'^cluster/(?P<fsid>[a-zA-Z0-9-]+)/osd$',
calamari_rest.views.v2.OsdViewSet.as_view({'get': 'list'}),
name='cluster-osd-list'),
Expand Down
111 changes: 109 additions & 2 deletions rest-api/calamari_rest/views/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from calamari_rest.serializers.v2 import PoolSerializer, CrushRuleSetSerializer, CrushRuleSerializer, CrushNodeSerializer, CrushTypeSerializer,\
ServerSerializer, SimpleServerSerializer, SaltKeySerializer, RequestSerializer, \
ClusterSerializer, EventSerializer, LogTailSerializer, OsdSerializer, ConfigSettingSerializer, MonSerializer, OsdConfigSerializer, \
CliSerializer
CliSerializer, CachePoolSerializer
from calamari_rest.views.database_view_set import DatabaseViewSet
from calamari_rest.views.exceptions import ServiceUnavailable
from calamari_rest.views.paginated_mixin import PaginatedMixin
Expand All @@ -24,7 +24,7 @@
from calamari_rest.views.crush_node import lookup_ancestry
from calamari_common.config import CalamariConfig
from calamari_common.types import CRUSH_MAP, CRUSH_RULE, CRUSH_NODE, CRUSH_TYPE, POOL, OSD, USER_REQUEST_COMPLETE, USER_REQUEST_SUBMITTED, \
OSD_IMPLEMENTED_COMMANDS, MON, OSD_MAP, SYNC_OBJECT_TYPES, ServiceId
OSD_IMPLEMENTED_COMMANDS, MON, OSD_MAP, SYNC_OBJECT_TYPES, ServiceId, CACHE_MOD, CACHE_POOL
from calamari_common.db.event import Event, severity_from_str, SEVERITIES

from django.views.decorators.csrf import csrf_exempt
Expand Down Expand Up @@ -525,6 +525,113 @@ def _check_name_unique(self, fsid, pool_id, data, errors):
errors['name'].append('Pool with name {name} already exists'.format(name=data['name']))


class CachePoolViewSet(RPCViewSet, RequestReturner):
"""
Manage Ceph storage cache pools.
"""
serializer_class = CachePoolSerializer

def list(self, request, fsid, pool_id):
errors = defaultdict(list)
self._check_pool_exit(fsid, pool_id, errors)
if errors.items():
return Response(errors, status=status.HTTP_400_BAD_REQUEST)
pool = self.client.get(fsid, POOL, int(pool_id))
tires_pools=[]
if len(pool['tiers']) is not 0:
for elem in pool['tiers']:
cache_pool = self.client.get(fsid, POOL, int(elem))
cache_pool.update({'cache_pool_name': cache_pool['pool_name'],
'cache_mode': cache_pool['cache_mode'], 'cache_pool_id': int(elem)})
tires_pools.append(cache_pool)
return Response(CachePoolSerializer([DataObject(o) for o in tires_pools], many=True).data)

def retrieve(self, request, fsid, pool_id, cache_pool_id):
response = self._check_pool_have_cachepool(fsid, pool_id, cache_pool_id)
if response is not None:
return response
pool = self.client.get(fsid, POOL, int(pool_id))
if len(pool['tiers']) is not 0:
for elem in pool['tiers']:
if int(elem) == int(cache_pool_id):
cache_pool = self.client.get(fsid, POOL, int(elem))
cache_pool.update({'cache_pool_name': cache_pool['pool_name'],
'cache_mode': cache_pool['cache_mode'], 'cache_pool_id': int(elem)})
break
return Response(CachePoolSerializer(cache_pool).data)

def create(self, request, fsid, pool_id):
serializer = self.serializer_class(data=request.DATA)
if serializer.is_valid(request.method):
data = serializer.get_data()
pool = self.client.get(fsid, POOL, int(pool_id))
response = self._validate_semantics(fsid, pool_id, data)
if response is not None:
return response
cache_pool = self.client.get(fsid, POOL, int(data['cache_pool_id']))
attributes = {'pool': pool['pool_name'], 'tier_pool': cache_pool['pool_name']}
if 'cache_mode' in data:
attributes['mode'] = data['cache_mode']
create_response = self.client.create(fsid, CACHE_POOL, attributes)
assert 'request_id' in create_response
return Response(create_response, status=status.HTTP_202_ACCEPTED)
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

def destroy(self, request, fsid, pool_id, cache_pool_id):
response = self._check_pool_have_cachepool(fsid, pool_id, cache_pool_id)
if response is not None:
return response
attributes = {'pool': int(pool_id), 'tier_pool': int(cache_pool_id)}
delete_response = self.client.delete(fsid, CACHE_POOL, attributes,
status=status.HTTP_202_ACCEPTED)
return Response(delete_response, status=status.HTTP_202_ACCEPTED)

def _check_pool_exit(self, fsid, pool_id, errors):
pool = self.client.get(fsid, POOL, int(pool_id))
if not pool:
errors['pool'].append('pool with id {pool_id} is not exists'.format(pool_id=pool_id))

def _check_pool_have_cachepool(self,fsid, pool_id, cache_pool_id):
errors = defaultdict(list)
self._check_pool_exit(fsid, pool_id, errors)
pool = self.client.get(fsid, POOL, int(pool_id))
if int(cache_pool_id) not in pool['tiers']:
errors['pool'].append('pool {pool_id} do not have the cache_pool {cache_pool_id}'.format(
pool_id=pool_id, cache_pool_id=cache_pool_id))
if errors.items():
return Response(errors, status=status.HTTP_400_BAD_REQUEST)

def _check_pool_without_cachepool(self,fsid, pool_id, cache_pool_id, errors):
self._check_pool_exit(fsid, pool_id, errors)
pool = self.client.get(fsid, POOL, int(pool_id))
if int(cache_pool_id) in pool['tiers']:
errors['pool'].append('pool {pool_id} already have the cache_pool {cache_pool_id}'.format(
pool_id=pool_id, cache_pool_id=cache_pool_id))
if errors.items():
return Response(errors, status=status.HTTP_400_BAD_REQUEST)

def _check_pool_tiers_empty(self, fsid, cache_pool_id, errors):
cache_pool = self.client.get(fsid, POOL, int(cache_pool_id))
if int(cache_pool['tier_of']) != -1:
errors['pool'].append('pool {cache_pool_id} already tiers by pool {pool_id}'.format(
cache_pool_id=cache_pool['pool'],pool_id=cache_pool['tier_of']))

def _validate_semantics(self, fsid, pool_id, data):
errors = defaultdict(list)
self._check_pool_exit(fsid, pool_id, errors)
self._check_pool_exit(fsid, data['cache_pool_id'], errors)
self._check_pool_without_cachepool(fsid, pool_id, data['cache_pool_id'], errors)
self._check_pool_tiers_empty(fsid, data['cache_pool_id'], errors)
self._check_cache_mode_valid(data, errors)
if errors.items():
return Response(errors, status=status.HTTP_400_BAD_REQUEST)

def _check_cache_mode_valid(self, data, errors):
if 'cache_mode' in data and data['cache_mode'] not in CACHE_MOD:
errors['cache_mode'].append("cachemode({cache_mode}) out of range ('none', 'writeback', 'forward', 'readonly')".format(cache_mode=data['cache_mode']))

class OsdViewSet(RPCViewSet, RequestReturner):
"""
Manage Ceph OSDs.
Expand Down

0 comments on commit 87c0ba4

Please sign in to comment.