Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce LDMS Profiling Capability #1460

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ldms/python/ldmsd/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pkgpythondir=${pythondir}/ldmsd
pkgpython_PYTHON = __init__.py ldmsd_setup.py ldmsd_util.py ldmsd_communicator.py ldmsd_config.py
pkgpython_PYTHON = __init__.py ldmsd_setup.py ldmsd_util.py ldmsd_communicator.py ldmsd_config.py ldmsd_profiling.py
dist_bin_SCRIPTS = ldmsd_controller
21 changes: 21 additions & 0 deletions ldms/python/ldmsd/ldmsd_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
'set_sec_mod' : {'req_attr': ['regex'], 'opt_attr': ['uid', 'gid', 'perm']},
'log_status' : {'req_attr' : [], 'opt_attr' : ['name']},
'stats_reset' : {'req_attr' : [], 'opt_attr' : ['list']},
'profiling' : {'req_attr' : [], 'opt_attr' : ['enable', 'reset']},
##### Failover. #####
'failover_config': {
'req_attr': [
Expand Down Expand Up @@ -616,6 +617,7 @@ class LDMSD_Request(object):
SET_SEC_MOD = 0x600 + 19
LOG_STATUS = 0x600 + 20
STATS_RESET = 0x600 + 21
PROFILING = 0x600 + 31
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nichamon, is there some reasons we are bumping by 31 instead of 22?


FAILOVER_CONFIG = 0x700
FAILOVER_PEERCFG_START = 0x700 + 1
Expand Down Expand Up @@ -731,6 +733,7 @@ class LDMSD_Request(object):
'failover_stop' : {'id' : FAILOVER_STOP},
'set_route' : {'id': SET_ROUTE},
'xprt_stats' : {'id' : XPRT_STATS},
'profiling' : {'id' : PROFILING},
'thread_stats' : {'id' : THREAD_STATS},
'prdcr_stats' : {'id' : PRDCR_STATS},
'set_stats' : {'id' : SET_STATS},
Expand Down Expand Up @@ -3386,6 +3389,24 @@ def xprt_stats(self, reset=False, level=0):
self.close()
return errno.ENOTCONN, str(e)

def profiling(self, enable = None, reset = None):
attrs = []
if enable is not None:
attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.TYPE,
value = enable))
if reset is not None:
attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.RESET,
value = reset))
req = LDMSD_Request(
command_id=LDMSD_Request.PROFILING, attrs=attrs)
try:
req.send(self)
resp = req.receive(self)
return resp['errcode'], resp['msg']
except Exception as e:
self.close()
return errno.ENOTCONN, str(e)

def thread_stats(self, reset=False):
"""Query the daemon's I/O thread utilization data"""
if reset is None:
Expand Down
25 changes: 24 additions & 1 deletion ldms/python/ldmsd/ldmsd_controller
Original file line number Diff line number Diff line change
Expand Up @@ -2407,6 +2407,30 @@ class LdmsdCmdParser(cmd.Cmd):
def complete_xprt_stats(self, text, line, begidx, endidx):
return self.__complete_attr_list('xprt_stats', text)

def do_profiling(self, arg):
"""
Enable/disable and query the LDMS operation profiling data

The command was intended for diagnostic or study to improve ldmsd performance.

The command always reports the cached profiling data if exists.

Parameters:
[enabled=] True to enable LDMS profiling
[reset=] True to reset and free cached profiling data after the report
"""
arg = self.handle_args('profiling', arg)
if not arg:
return
rc, msg = self.comm.profiling(**arg)
if msg == "":
return
if rc != 0:
print(f"Error: {rc} {msg}")
return
stats = fmt_status(msg)
print(stats)

def do_updtr_task(self, arg):
"""
Report the updater tasks
Expand Down Expand Up @@ -3209,7 +3233,6 @@ class LdmsdCmdParser(cmd.Cmd):
cmd, arg = line[:i], line[i:].strip()
return cmd, arg, line


if __name__ == "__main__":
is_debug = True
try:
Expand Down
45 changes: 45 additions & 0 deletions ldms/python/ldmsd/ldmsd_profiling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import json
import pandas as pd

from ldmsd.ldmsd_communicator import *

def profiling_as_json(xprt, host, port):
comm = Communicator(xprt=xprt, host=host, port=port)
comm.connect()
o = comm.profiling()
return json.loads(o[1])

def get_hosts(o):
return o['xprt'].keys()

def get_streams(o):
return o['stream'].keys()

def lookup_df(o, host):
df = pd.DataFrame(o['xprt'][host]['LOOKUP'])
return df

def update_df(o, host):
df = pd.DataFrame(o['xprt'][host]['UPDATE'])
return df

def send_df(o, host):
df = pd.DataFrame(o['xprt'][host]['SEND'])
return df

def set_delete_df(o, host):
df = pd.DataFrame(o['xprt'][host]['SET_DELETE'])
return df

def stream_publish_df(o, host):
df = pd.DataFrame(o['xprt'][host]['STREAM_PUBLISH'])
return df

def stream_by_stream_df(o, stream_name = None, src = None):
d = o['stream']
if stream_name is not None:
d = d[stream_name]
if src is not None:
d = d[src]
df = pd.DataFrame(d)
return df
99 changes: 97 additions & 2 deletions ldms/src/core/ldms.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,83 @@ const char *ldms_xprt_op_names[] = {
"DIR_REP",
"SEND",
"RECV",
"STREAM_PUBLISH",
"STREAM_SUBSCRIBE",
"STREAM_UNSUBSCRIBE"
};
static char *type_names[];

/* -ENOSYS means that LDMS doesn't support profiling for those operation. */
int __enable_profiling[LDMS_XPRT_OP_COUNT] = {
PROFILING_CFG_DISABLED, /* lookup */
PROFILING_CFG_DISABLED, /* update */
PROFILING_CFG_UNSUPPORTED, /* Publish */
PROFILING_CFG_DISABLED, /* set_delete */
PROFILING_CFG_UNSUPPORTED, /* dir_req */
PROFILING_CFG_UNSUPPORTED, /* dir_rep */
PROFILING_CFG_DISABLED, /* send */
PROFILING_CFG_UNSUPPORTED, /* receive */
PROFILING_CFG_DISABLED, /* stream_publish */
PROFILING_CFG_UNSUPPORTED, /* stream_subscribe */
PROFILING_CFG_UNSUPPORTED /* stream_unsubscribe */
};

int ldms_profiling_enable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err)
{
int i;
int rc = 0;

if (ops_cnt < 0) {
for (i = 0; i < LDMS_XPRT_OP_COUNT; i++) {
if (__enable_profiling[i] != PROFILING_CFG_UNSUPPORTED)
__enable_profiling[i] = PROFILING_CFG_ENABLED;
}
} else {
if (ops_err)
bzero(ops_err, sizeof(int) * ops_cnt);
for (i = 0; i < ops_cnt; i++) {
if (ops[i] >= LDMS_XPRT_OP_COUNT) {
ops_err[i] = EINVAL;
continue;
}
if (__enable_profiling[ops[i]] == PROFILING_CFG_UNSUPPORTED) {
rc = -1;
if (ops_err)
ops_err[i] = ENOSYS;
} else {
__enable_profiling[ops[i]] = PROFILING_CFG_ENABLED;
}
}
}
return rc;
}

int ldms_profiling_disable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err)
{
int i;
int rc = 0;

if (ops_cnt < 0) {
for (i = 0; i < LDMS_XPRT_OP_COUNT; i++) {
if (__enable_profiling[i] != PROFILING_CFG_UNSUPPORTED)
__enable_profiling[i] = PROFILING_CFG_DISABLED;
}
} else {
if (ops_err)
bzero(ops_err, sizeof(int) * ops_cnt);
for (i = 0; i < ops_cnt; i++) {
if (__enable_profiling[ops[i]] == PROFILING_CFG_UNSUPPORTED) {
rc = -1;
if (ops_err)
ops_err[i] = ENOSYS;
} else {
__enable_profiling[ops[i]] = PROFILING_CFG_DISABLED;
}
}
}
return rc;
}

static struct ldms_digest_s null_digest;

/* This function is useful for displaying data structures stored in
Expand Down Expand Up @@ -739,7 +813,8 @@ static void sync_update_cb(ldms_t x, ldms_set_t s, int status, void *arg)
sem_post(&x->sem);
}

int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg)
int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb,
void *arg, struct ldms_op_ctxt *op_ctxt)
{
ldms_t xprt = ldms_xprt_get(x);
int rc;
Expand Down Expand Up @@ -777,6 +852,21 @@ int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void
ldms_t __ldms_xprt_to_rail(ldms_t x);
int ldms_xprt_update(struct ldms_set *set, ldms_update_cb_t cb, void *arg)
{
int rc;
struct ldms_op_ctxt *op_ctxt = NULL;

assert(set);

if (set->curr_updt_ctxt)
return EBUSY;

if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) {
op_ctxt = calloc(1, sizeof(*op_ctxt));
if (!op_ctxt)
return ENOMEM;
op_ctxt->op_type = LDMS_XPRT_OP_UPDATE;
(void)clock_gettime(CLOCK_REALTIME, &(op_ctxt->update_profile.app_req_ts));
}
/*
* We convert the transport handle to a rail handle using
* __ldms_xprt_to_rail() and pass it to x->ops.update().
Expand All @@ -786,7 +876,12 @@ int ldms_xprt_update(struct ldms_set *set, ldms_update_cb_t cb, void *arg)
* when the update completes.
*/
ldms_t r = __ldms_xprt_to_rail(set->xprt);
return r->ops.update(r, set, cb, arg);
rc = r->ops.update(r, set, cb, arg, op_ctxt);
if (rc) {
set->curr_updt_ctxt = NULL;
free(op_ctxt);
}
return rc;
}

void __ldms_set_on_xprt_term(ldms_set_t set, ldms_t xprt)
Expand Down
Loading
Loading