Skip to content

Commit

Permalink
asdf
Browse files Browse the repository at this point in the history
  • Loading branch information
liewegas committed Jul 28, 2017
1 parent 62b47ed commit 82e379d
Showing 1 changed file with 210 additions and 18 deletions.
228 changes: 210 additions & 18 deletions src/pybind/mgr/balancer/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,47 @@
default_sleep_interval = 60 # seconds
default_max_misplaced = .03 # max ratio of pgs replaced at a time

class Plan:
def __init__(self, name, osdmap):
self.mode = 'unknown'
self.name = name
TIME_FORMAT = '%Y-%m-%d %H:%M:%S %Z'


class MappingState:
def __init__(self, osdmap, pg_dump, desc=''):
self.desc = desc
self.osdmap = osdmap
self.osdmap_dump = self.osdmap.dump()
self.crush = osdmap.get_crush()
self.crush_dump = self.crush.dump()
self.pg_dump = pg_dump
self.pg_stat = {
i['pgid']: i['stat_sum'] for i in pg_dump.get('pg_stats', [])
}


class Plan:
def __init__(self, name, ms):
self.mode = 'unknown'
self.name = name
self.initial = ms

self.osd_weights = {}
self.compat_ws = {}
self.inc = osdmap.new_incremental()
self.inc = ms.osdmap.new_incremental()

def get_osdmap(self):
return self.osdmap

def apply(self):
def final_state(self):
inc.set_osd_reweights(self.osd_weights)
inc.set_crush_compat_weight_set_weights(self.compat_ws)
return osdmap.apply_incremental(inc)
return MappingState(self.initial.osdmap.apply_incremental(inc),
self.pg_dump,
'plan %s final' % self.name)

def dump(self):
return json.dumps(self.inc.dump(), indent=4)

def show(self):
ls = []
ls.append('# starting osdmap epoch %d' % self.osdmap.get_epoch())
ls.append('# starting crush version %d' % self.osdmap.get_crush_version())
ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
ls.append('# starting crush version %d' %
self.initial.osdmap.get_crush_version())
ls.append('# mode %s' % self.mode)
if len(self.compat_ws) and \
'-1' in self.crush_dump.get('choose_args', {}):
Expand All @@ -62,6 +77,37 @@ def show(self):
return '\n'.join(ls)


class Eval:
pool_name = {} # pool id -> pool name
pool_id = {} # pool name -> id
pool_roots = {} # pool name -> root name
target_by_root = {} # root name -> target weight map

actual_by_pool = {} # pool -> by_foo -> actual weight map
actual_by_root = {} # pool -> by_* -> actual weight map
total_by_pool = {} # pool -> by_* -> total
total_by_root = {} # root -> by_* -> total

score_by_pool = {}
score_by_root = {}

score = 0.0

def __init__(self, ms):
self.ms = ms

def show(self):
r = self.ms.desc + '\n'
r += 'target_by_root %s\n' % self.target_by_root
r += 'actual_by_pool %s\n' % self.actual_by_pool
r += 'actual_by_root %s\n' % self.actual_by_root
r += 'total_by_pool %s\n' % self.total_by_pool
r += 'total_by_root %s\n' % self.total_by_root
r += 'score_by_pool %s\n' % self.score_by_pool
r += 'score_by_root %s\n' % self.score_by_root
r += 'score %f (lower is better)\n' % self.score
return r


class Module(MgrModule):
COMMANDS = [
Expand All @@ -85,6 +131,11 @@ class Module(MgrModule):
"desc": "Disable automatic balancing",
"perm": "rw",
},
{
"cmd": "balancer eval name=plan,type=CephString,req=false",
"desc": "Evaluate data distribution for the current cluster or specific plan",
"perm": "r",
},
{
"cmd": "balancer optimize name=plan,type=CephString",
"desc": "Run optimizer to create a new plan",
Expand Down Expand Up @@ -149,6 +200,18 @@ def handle_command(self, command):
self.active = False
self.event.set()
return (0, '', '')
elif command['prefix'] == 'balancer eval':
if 'plan' in command:
plan = self.plans.get(command['plan'])
if not plan:
return (-errno.ENOENT, '', 'plan %s not found' %
command['plan'])
ms = plan.final_state()
else:
ms = MappingState(self.get_osdmap(),
self.get("pg_dump"),
'current cluster')
return (0, self.evaluate(ms), '')
elif command['prefix'] == 'balancer optimize':
plan = self.plan_create(command['plan'])
self.optimize(plan)
Expand Down Expand Up @@ -202,14 +265,143 @@ def serve(self):
self.event.clear()

def plan_create(self, name):
plan = Plan(name, self.get_osdmap())
plan = Plan(name, MappingState(self.get_osdmap(),
self.get("pg_dump"),
'plan %s initial' % name))
self.plans[name] = plan
return plan

def plan_rm(self, name):
if name in self.plans:
del self.plans[name]

def calc_eval(self, ms):
pe = Eval(ms)
pool_rule = {}
for p in ms.osdmap_dump.get('pools',[]):
pe.pool_name[p['pool']] = p['pool_name']
pe.pool_id[p['pool_name']] = p['pool']
pool_rule[p['pool_name']] = p['crush_rule']
pe.pool_roots[p['pool_name']] = []
pools = pe.pool_id.keys()
if len(pools) == 0:
return pe
self.log.debug('pool_name %s' % pe.pool_name)
self.log.debug('pool_id %s' % pe.pool_id)
self.log.debug('pools %s' % pools)
self.log.debug('pool_rule %s' % pool_rule)

# get expected distributions by root
actual_by_root = {}
roots = ms.crush.find_takes()
for root in roots:
rname = ms.crush.get_item_name(root)
ls = ms.osdmap.get_pools_by_take(root)
for poolid in ls:
pe.pool_roots[pe.pool_name[poolid]].append(rname)
pe.target_by_root[rname] = ms.crush.get_take_weight_osd_map(root)
actual_by_root[rname] = {
'pgs': {},
'objects': {},
'bytes': {},
}
for osd in pe.target_by_root[rname].iterkeys():
actual_by_root[rname]['pgs'][osd] = 0
actual_by_root[rname]['objects'][osd] = 0
actual_by_root[rname]['bytes'][osd] = 0
pe.total_by_root[rname] = {
'pgs': 0,
'objects': 0,
'bytes': 0,
}
self.log.debug('pool_roots %s' % pe.pool_roots)
self.log.debug('target_by_root %s' % pe.target_by_root)

# pool and root actual
for pool in pools:
pi = [p for p in ms.osdmap_dump.get('pools',[])
if p['pool_name'] == pool][0]
poolid = pi['pool']
pm = ms.osdmap.map_pool_pgs_up(poolid)
pgs = 0
objects = 0
bytes = 0
pgs_by_osd = {}
objects_by_osd = {}
bytes_by_osd = {}
for root in pe.pool_roots[pool]:
for osd in pe.target_by_root[root].iterkeys():
pgs_by_osd[osd] = 0
objects_by_osd[osd] = 0
bytes_by_osd[osd] = 0
for pgid, up in pm.iteritems():
for osd in [int(osd) for osd in up]:
pgs_by_osd[osd] += 1
objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
# pick a root to associate this pg instance with.
# note that this is imprecise if the roots have
# overlapping children.
# FIXME: divide bytes by k for EC pools.
for root in pe.pool_roots[pool]:
if osd in pe.target_by_root[root]:
actual_by_root[root]['pgs'][osd] += 1
actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
pgs += 1
objects += ms.pg_stat[pgid]['num_objects']
bytes += ms.pg_stat[pgid]['num_bytes']
pe.total_by_root[root]['pgs'] += 1
pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
break
pe.actual_by_pool[pool] = {
'pgs': {
k: float(v) / float(max(pgs, 1))
for k, v in pgs_by_osd.iteritems()
},
'objects': {
k: float(v) / float(max(objects, 1))
for k, v in objects_by_osd.iteritems()
},
'bytes': {
k: float(v) / float(max(bytes, 1))
for k, v in bytes_by_osd.iteritems()
},
}
pe.total_by_pool[pool] = {
'pgs': pgs,
'objects': objects,
'bytes': bytes,
}
for root, m in pe.total_by_root.iteritems():
pe.actual_by_root[root] = {
'pgs': {
k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
for k, v in actual_by_root[root]['pgs'].iteritems()
},
'objects': {
k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
for k, v in actual_by_root[root]['objects'].iteritems()
},
'bytes': {
k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
for k, v in actual_by_root[root]['bytes'].iteritems()
},
}
self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
self.log.debug('actual_by_root %s' % pe.actual_by_root)

# average and stddev

# aggregate score

return pe

def evaluate(self, ms):
pe = self.calc_eval(ms)
return pe.show()

def optimize(self, plan):
self.log.info('Optimize plan %s' % plan.name)
plan.mode = self.get_config('mode', default_mode)
Expand Down Expand Up @@ -248,14 +440,15 @@ def optimize(self, plan):
else:
self.log.info('Unrecognized mode %s' % plan.mode)

##

def do_upmap(self, plan):
self.log.info('do_upmap')
max_iterations = self.get_config('upmap_max_iterations', 10)
max_deviation = self.get_config('upmap_max_deviation', .01)

osdmap = plan.get_osdmap()
osdmap_dump = osdmap.dump()
pools = [str(i['pool_name']) for i in osdmap_dump.get('pools',[])]
ms = plan.initial
pools = [str(i['pool_name']) for i in ms.osdmap_dump.get('pools',[])]
if len(pools) == 0:
self.log.info('no pools, nothing to do')
return
Expand All @@ -267,14 +460,13 @@ def do_upmap(self, plan):
total_did = 0
left = max_iterations
for pool in pools:
did = osdmap.calc_pg_upmaps(inc, max_deviation, left, [pool])
did = ms.osdmap.calc_pg_upmaps(inc, max_deviation, left, [pool])
total_did += did
left -= did
if left <= 0:
break
self.log.info('prepared %d/%d changes' % (total_did, max_iterations))


def do_crush_compat(self):
self.log.info('do_crush_compat')
osdmap = self.get_osdmap()
Expand Down

0 comments on commit 82e379d

Please sign in to comment.