From ea615faddf0683de319fcbd97ceef87659525255 Mon Sep 17 00:00:00 2001 From: Nichamon Naksinehaboon Date: Wed, 25 Sep 2024 21:39:33 -0500 Subject: [PATCH 1/2] Export ldms_addr_ntop() --- ldms/src/core/ldms.h | 11 ++++++++++- ldms/src/core/ldms_rail.h | 5 ----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/ldms/src/core/ldms.h b/ldms/src/core/ldms.h index 32fd491a6..f97962ce2 100644 --- a/ldms/src/core/ldms.h +++ b/ldms/src/core/ldms.h @@ -928,7 +928,16 @@ struct ldms_addr { int ldms_xprt_addr(ldms_t x, struct ldms_addr *local_addr, struct ldms_addr *remote_addr); -const char *ldms_sockaddr_ntop(struct sockaddr *sa, char *buff, size_t sz); +/** + * \brief Get addresss string + * + * \param addr LDMS address + * \param buff String buffer to receive the address string + * \param sz Size of \c buff + * + * \return \c buff on success; otherwise, NULL is returned. +*/ +const char *ldms_addr_ntop(struct ldms_addr *addr, char *buff, size_t sz); /** * \brief Convert a CIDR IP address string to \c ldms_addr diff --git a/ldms/src/core/ldms_rail.h b/ldms/src/core/ldms_rail.h index adf34180c..63d865477 100644 --- a/ldms/src/core/ldms_rail.h +++ b/ldms/src/core/ldms_rail.h @@ -224,11 +224,6 @@ int sockaddr2ldms_addr(struct sockaddr *sa, struct ldms_addr *la); */ const char *sockaddr_ntop(struct sockaddr *sa, char *buff, size_t sz); -/** - * Wrapper of 'inet_ntop()' for ldms_addr. - */ -const char *ldms_addr_ntop(struct ldms_addr *addr, char *buff, size_t sz); - void __rail_ep_quota_return(struct ldms_rail_ep_s *rep, int quota); int __rep_flush_sbuf_tq(struct ldms_rail_ep_s *rep); From a77173a37fbd456183f6095f4dd0121532986ea7 Mon Sep 17 00:00:00 2001 From: Nichamon Naksinehaboon Date: Thu, 22 Aug 2024 14:22:33 -0500 Subject: [PATCH 2/2] Profiling LDMS Operations This commit introduces profiling capabilities to LDMS operations, including lookup, update, set_delete, and stream_publish. It collects timestamps for key events, such as API calls, request/response exchanges, and completion notifications. The feature is disabled by default but can be configured using the 'profilng' configuration command to enable, disable, or reset data collection. This enhancement will aid in performance analysis and optimization. --- ldms/python/ldmsd/Makefile.am | 2 +- ldms/python/ldmsd/ldmsd_communicator.py | 21 ++ ldms/python/ldmsd/ldmsd_controller | 25 +- ldms/python/ldmsd/ldmsd_profiling.py | 45 ++++ ldms/src/core/ldms.c | 99 +++++++- ldms/src/core/ldms.h | 119 +++++++++- ldms/src/core/ldms_private.h | 12 +- ldms/src/core/ldms_rail.c | 138 +++++++++-- ldms/src/core/ldms_rail.h | 10 + ldms/src/core/ldms_stream.c | 172 ++++++++++++-- ldms/src/core/ldms_stream.h | 12 +- ldms/src/core/ldms_xprt.c | 294 +++++++++++++++++++++--- ldms/src/core/ldms_xprt.h | 23 +- ldms/src/ldmsd/ldmsd_request.c | 255 +++++++++++++++++++- ldms/src/ldmsd/ldmsd_request.h | 1 + ldms/src/ldmsd/ldmsd_request_util.c | 2 + 16 files changed, 1151 insertions(+), 79 deletions(-) create mode 100644 ldms/python/ldmsd/ldmsd_profiling.py diff --git a/ldms/python/ldmsd/Makefile.am b/ldms/python/ldmsd/Makefile.am index f4dd661dd..25fc2d903 100644 --- a/ldms/python/ldmsd/Makefile.am +++ b/ldms/python/ldmsd/Makefile.am @@ -1,3 +1,3 @@ pkgpythondir=${pythondir}/ldmsd -pkgpython_PYTHON = __init__.py ldmsd_setup.py ldmsd_util.py ldmsd_communicator.py ldmsd_config.py +pkgpython_PYTHON = __init__.py ldmsd_setup.py ldmsd_util.py ldmsd_communicator.py ldmsd_config.py ldmsd_profiling.py dist_bin_SCRIPTS = ldmsd_controller diff --git a/ldms/python/ldmsd/ldmsd_communicator.py b/ldms/python/ldmsd/ldmsd_communicator.py index ed6ad1e64..fda45f920 100644 --- a/ldms/python/ldmsd/ldmsd_communicator.py +++ b/ldms/python/ldmsd/ldmsd_communicator.py @@ -160,6 +160,7 @@ 'set_sec_mod' : {'req_attr': ['regex'], 'opt_attr': ['uid', 'gid', 'perm']}, 'log_status' : {'req_attr' : [], 'opt_attr' : ['name']}, 'stats_reset' : {'req_attr' : [], 'opt_attr' : ['list']}, + 'profiling' : {'req_attr' : [], 'opt_attr' : ['enable', 'reset']}, ##### Failover. ##### 'failover_config': { 'req_attr': [ @@ -616,6 +617,7 @@ class LDMSD_Request(object): SET_SEC_MOD = 0x600 + 19 LOG_STATUS = 0x600 + 20 STATS_RESET = 0x600 + 21 + PROFILING = 0x600 + 31 FAILOVER_CONFIG = 0x700 FAILOVER_PEERCFG_START = 0x700 + 1 @@ -731,6 +733,7 @@ class LDMSD_Request(object): 'failover_stop' : {'id' : FAILOVER_STOP}, 'set_route' : {'id': SET_ROUTE}, 'xprt_stats' : {'id' : XPRT_STATS}, + 'profiling' : {'id' : PROFILING}, 'thread_stats' : {'id' : THREAD_STATS}, 'prdcr_stats' : {'id' : PRDCR_STATS}, 'set_stats' : {'id' : SET_STATS}, @@ -3386,6 +3389,24 @@ def xprt_stats(self, reset=False, level=0): self.close() return errno.ENOTCONN, str(e) + def profiling(self, enable = None, reset = None): + attrs = [] + if enable is not None: + attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.TYPE, + value = enable)) + if reset is not None: + attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.RESET, + value = reset)) + req = LDMSD_Request( + command_id=LDMSD_Request.PROFILING, attrs=attrs) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + self.close() + return errno.ENOTCONN, str(e) + def thread_stats(self, reset=False): """Query the daemon's I/O thread utilization data""" if reset is None: diff --git a/ldms/python/ldmsd/ldmsd_controller b/ldms/python/ldmsd/ldmsd_controller index 8d15cf0ba..a4c854c57 100755 --- a/ldms/python/ldmsd/ldmsd_controller +++ b/ldms/python/ldmsd/ldmsd_controller @@ -2407,6 +2407,30 @@ class LdmsdCmdParser(cmd.Cmd): def complete_xprt_stats(self, text, line, begidx, endidx): return self.__complete_attr_list('xprt_stats', text) + def do_profiling(self, arg): + """ + Enable/disable and query the LDMS operation profiling data + + The command was intended for diagnostic or study to improve ldmsd performance. + + The command always reports the cached profiling data if exists. + + Parameters: + [enabled=] True to enable LDMS profiling + [reset=] True to reset and free cached profiling data after the report + """ + arg = self.handle_args('profiling', arg) + if not arg: + return + rc, msg = self.comm.profiling(**arg) + if msg == "": + return + if rc != 0: + print(f"Error: {rc} {msg}") + return + stats = fmt_status(msg) + print(stats) + def do_updtr_task(self, arg): """ Report the updater tasks @@ -3209,7 +3233,6 @@ class LdmsdCmdParser(cmd.Cmd): cmd, arg = line[:i], line[i:].strip() return cmd, arg, line - if __name__ == "__main__": is_debug = True try: diff --git a/ldms/python/ldmsd/ldmsd_profiling.py b/ldms/python/ldmsd/ldmsd_profiling.py new file mode 100644 index 000000000..73b408f2c --- /dev/null +++ b/ldms/python/ldmsd/ldmsd_profiling.py @@ -0,0 +1,45 @@ +import json +import pandas as pd + +from ldmsd.ldmsd_communicator import * + +def profiling_as_json(xprt, host, port): + comm = Communicator(xprt=xprt, host=host, port=port) + comm.connect() + o = comm.profiling() + return json.loads(o[1]) + +def get_hosts(o): + return o['xprt'].keys() + +def get_streams(o): + return o['stream'].keys() + +def lookup_df(o, host): + df = pd.DataFrame(o['xprt'][host]['LOOKUP']) + return df + +def update_df(o, host): + df = pd.DataFrame(o['xprt'][host]['UPDATE']) + return df + +def send_df(o, host): + df = pd.DataFrame(o['xprt'][host]['SEND']) + return df + +def set_delete_df(o, host): + df = pd.DataFrame(o['xprt'][host]['SET_DELETE']) + return df + +def stream_publish_df(o, host): + df = pd.DataFrame(o['xprt'][host]['STREAM_PUBLISH']) + return df + +def stream_by_stream_df(o, stream_name = None, src = None): + d = o['stream'] + if stream_name is not None: + d = d[stream_name] + if src is not None: + d = d[src] + df = pd.DataFrame(d) + return df diff --git a/ldms/src/core/ldms.c b/ldms/src/core/ldms.c index f8ccdabd2..c2cf74ed0 100644 --- a/ldms/src/core/ldms.c +++ b/ldms/src/core/ldms.c @@ -100,9 +100,83 @@ const char *ldms_xprt_op_names[] = { "DIR_REP", "SEND", "RECV", + "STREAM_PUBLISH", + "STREAM_SUBSCRIBE", + "STREAM_UNSUBSCRIBE" }; static char *type_names[]; +/* -ENOSYS means that LDMS doesn't support profiling for those operation. */ +int __enable_profiling[LDMS_XPRT_OP_COUNT] = { + PROFILING_CFG_DISABLED, /* lookup */ + PROFILING_CFG_DISABLED, /* update */ + PROFILING_CFG_UNSUPPORTED, /* Publish */ + PROFILING_CFG_DISABLED, /* set_delete */ + PROFILING_CFG_UNSUPPORTED, /* dir_req */ + PROFILING_CFG_UNSUPPORTED, /* dir_rep */ + PROFILING_CFG_DISABLED, /* send */ + PROFILING_CFG_UNSUPPORTED, /* receive */ + PROFILING_CFG_DISABLED, /* stream_publish */ + PROFILING_CFG_UNSUPPORTED, /* stream_subscribe */ + PROFILING_CFG_UNSUPPORTED /* stream_unsubscribe */ +}; + +int ldms_profiling_enable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err) +{ + int i; + int rc = 0; + + if (ops_cnt < 0) { + for (i = 0; i < LDMS_XPRT_OP_COUNT; i++) { + if (__enable_profiling[i] != PROFILING_CFG_UNSUPPORTED) + __enable_profiling[i] = PROFILING_CFG_ENABLED; + } + } else { + if (ops_err) + bzero(ops_err, sizeof(int) * ops_cnt); + for (i = 0; i < ops_cnt; i++) { + if (ops[i] >= LDMS_XPRT_OP_COUNT) { + ops_err[i] = EINVAL; + continue; + } + if (__enable_profiling[ops[i]] == PROFILING_CFG_UNSUPPORTED) { + rc = -1; + if (ops_err) + ops_err[i] = ENOSYS; + } else { + __enable_profiling[ops[i]] = PROFILING_CFG_ENABLED; + } + } + } + return rc; +} + +int ldms_profiling_disable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err) +{ + int i; + int rc = 0; + + if (ops_cnt < 0) { + for (i = 0; i < LDMS_XPRT_OP_COUNT; i++) { + if (__enable_profiling[i] != PROFILING_CFG_UNSUPPORTED) + __enable_profiling[i] = PROFILING_CFG_DISABLED; + } + } else { + if (ops_err) + bzero(ops_err, sizeof(int) * ops_cnt); + for (i = 0; i < ops_cnt; i++) { + if (__enable_profiling[ops[i]] == PROFILING_CFG_UNSUPPORTED) { + rc = -1; + if (ops_err) + ops_err[i] = ENOSYS; + } else { + __enable_profiling[ops[i]] = PROFILING_CFG_DISABLED; + } + } + } + return rc; +} + static struct ldms_digest_s null_digest; /* This function is useful for displaying data structures stored in @@ -739,7 +813,8 @@ static void sync_update_cb(ldms_t x, ldms_set_t s, int status, void *arg) sem_post(&x->sem); } -int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg) +int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, + void *arg, struct ldms_op_ctxt *op_ctxt) { ldms_t xprt = ldms_xprt_get(x); int rc; @@ -777,6 +852,21 @@ int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void ldms_t __ldms_xprt_to_rail(ldms_t x); int ldms_xprt_update(struct ldms_set *set, ldms_update_cb_t cb, void *arg) { + int rc; + struct ldms_op_ctxt *op_ctxt = NULL; + + assert(set); + + if (set->curr_updt_ctxt) + return EBUSY; + + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_UPDATE; + (void)clock_gettime(CLOCK_REALTIME, &(op_ctxt->update_profile.app_req_ts)); + } /* * We convert the transport handle to a rail handle using * __ldms_xprt_to_rail() and pass it to x->ops.update(). @@ -786,7 +876,12 @@ int ldms_xprt_update(struct ldms_set *set, ldms_update_cb_t cb, void *arg) * when the update completes. */ ldms_t r = __ldms_xprt_to_rail(set->xprt); - return r->ops.update(r, set, cb, arg); + rc = r->ops.update(r, set, cb, arg, op_ctxt); + if (rc) { + set->curr_updt_ctxt = NULL; + free(op_ctxt); + } + return rc; } void __ldms_set_on_xprt_term(ldms_set_t set, ldms_t xprt) diff --git a/ldms/src/core/ldms.h b/ldms/src/core/ldms.h index f97962ce2..8464a275a 100644 --- a/ldms/src/core/ldms.h +++ b/ldms/src/core/ldms.h @@ -939,6 +939,7 @@ int ldms_xprt_addr(ldms_t x, struct ldms_addr *local_addr, */ const char *ldms_addr_ntop(struct ldms_addr *addr, char *buff, size_t sz); + /** * \brief Convert a CIDR IP address string to \c ldms_addr * @@ -1471,6 +1472,8 @@ struct ldms_stream_close_event_s { typedef struct ldms_stream_event_s { ldms_t r; /* rail */ enum ldms_stream_event_type type; + struct timespec recv_ts; + uint32_t hop_num; union { struct ldms_stream_recv_data_s recv; struct ldms_stream_return_status_s status; @@ -1562,11 +1565,28 @@ struct ldms_stream_counters_s { *(p) = LDMS_STREAM_COUNTERS_INITIALIZER; \ } while (0) +struct ldms_stream_hop { + struct timespec recv_ts; + struct timespec send_ts; +}; + +#define STREAM_MAX_PROFILE_HOPS 8 +struct ldms_stream_profile { + uint32_t hop_cnt; + struct ldms_stream_hop hops[OVIS_FLEX]; +}; +struct ldms_stream_profile_ent { + TAILQ_ENTRY(ldms_stream_profile_ent) ent; + struct ldms_stream_profile profiles; +}; +TAILQ_HEAD(ldms_stream_profile_list, ldms_stream_profile_ent); + /* stream statistics by src */ struct ldms_stream_src_stats_s { struct rbn rbn; /* key ==> src */ struct ldms_addr src; struct ldms_stream_counters_s rx; /* total rx from src */ + struct ldms_stream_profile_list profiles; }; /* stats of stream-client pair */ @@ -2063,9 +2083,94 @@ typedef enum ldms_xprt_ops_e { LDMS_XPRT_OP_DIR_REP, LDMS_XPRT_OP_SEND, LDMS_XPRT_OP_RECV, + LDMS_XPRT_OP_STREAM_PUBLISH, + LDMS_XPRT_OP_STREAM_SUBSCRIBE, + LDMS_XPRT_OP_STREAM_UNSUBSCRIBE, LDMS_XPRT_OP_COUNT } ldms_xprt_ops_t; +struct ldms_op_ctxt { + enum ldms_xprt_ops_e op_type; + union { + struct lookup_profile_s { + struct timespec app_req_ts; + struct timespec req_send_ts; + struct timespec req_recv_ts; + struct timespec share_ts; + struct timespec rendzv_ts; + struct timespec read_ts; + struct timespec complete_ts; + struct timespec deliver_ts; + } lookup_profile; + struct update_profile { + struct timespec app_req_ts; + struct timespec read_ts; + struct timespec read_complete_ts; + struct timespec deliver_ts; + } update_profile; + struct set_delete_profile_s { + struct timespec send_ts; + struct timespec recv_ts; + struct timespec ack_ts; + } set_del_profile; + struct send_profile_s { + struct timespec app_req_ts; + struct timespec send_ts; + struct timespec complete_ts; + struct timespec deliver_ts; + } send_profile; + struct strm_publish_profile_s { + uint32_t hop_num; + struct timespec recv_ts; + struct timespec send_ts; /* to remote client */ + } stream_pub_profile; + }; + TAILQ_ENTRY(ldms_op_ctxt) ent; +}; +TAILQ_HEAD(ldms_op_ctxt_list, ldms_op_ctxt); + +#define PROFILING_CFG_DISABLED 0 +#define PROFILING_CFG_ENABLED 1 +#define PROFILING_CFG_UNSUPPORTED 2 + +/** + * Enable/disable LDMS operations' profiling + * + * If profiling is enabled, LDMS collects the following timestamps: + * for LOOKUP: when ldms_xprt_lookup() is called, + * when LDMS sends the lookup request to the peer, + * when the peer receives the lookup request, + * when the peer shares the set memory, + * when LDMS receives the shared memory, + * when LDMS reads the memory, + * when LDMS receives the read completion, + * and when LDMS delivers the lookup data to the application + * for UPDATE: when ldms_xprt_update() is called, + * when LDMS reads the set data, + * when LDMS receives the updated set data, + * when LDMS delivers the update completion to the application + * for SEND: when ldms_xprt_send() is called, + * when LDMS sends the data to the peer, + * when LDMS receives the send completion event, + * when LDMS delivers the send completion to the application + * for STREAM_PUBLISH: when ldms_stream_publish() is called, + * when LDMS publishes the stream data, + * when LDMS delivers the stream data to clients + * NOTE: LDMS collects the timestamps at each hop where stream data gets forwarded + * + * \param ops_cnt Number of operations in \c ops. + * -1 to enable/disable profiling of all operations + * \param ops Array of operations to enable their profiling + * \param ops_err Array to store an error of each given operation + * + * \return 0 on success; Otherwise, -1 is given. + * In this case, an error code will be assigned in the \c ops_err + * ENOSYS if the operation does not support profiling; + * EINVAL if the given operation does not exist. + */ +int ldms_profiling_enable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err); +int ldms_profiling_disable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err); + extern const char *ldms_xprt_op_names[]; struct ldms_xprt_rate_data { @@ -2173,15 +2278,25 @@ typedef struct ldms_xprt_stats { struct timespec disconnected; struct timespec last_op; struct ldms_stats_entry ops[LDMS_XPRT_OP_COUNT]; + struct ldms_op_ctxt_list op_ctxt_lists[LDMS_XPRT_OP_COUNT]; } *ldms_xprt_stats_t; +#define LDMS_PERF_M_STATS 1 +#define LDMS_PERF_M_PROFILNG 2 +#define LDMS_PERF_M_ALL LDMS_PERF_M_STATS | LDMS_PERF_M_PROFILNG + /** * \brief Retrieve transport request statistics * + * The function gets the statistics and then reset it if \c reset is not 0. + * To only reset the statistics, \c stats must be NULL. + * * \param x The transport handle - * \param s Pointer to an ldms_xprt_stats structure + * \param stats Pointer to an ldms_xprt_stats structure + * \param reset Reset the statistics after getting the statistics if not 0 + * */ -extern void ldms_xprt_stats(ldms_t x, ldms_xprt_stats_t stats); +extern void ldms_xprt_stats(ldms_t x, ldms_xprt_stats_t stats, int mask, int reset); /* * Metric template for: diff --git a/ldms/src/core/ldms_private.h b/ldms/src/core/ldms_private.h index 6be464a3c..3fb4cb932 100644 --- a/ldms/src/core/ldms_private.h +++ b/ldms/src/core/ldms_private.h @@ -149,6 +149,15 @@ struct ldms_set { struct ldms_context *notify_ctxt; /* Notify req context */ ldms_heap_t heap; struct ldms_heap_instance heap_inst; + + /* + * Context of the ongoing update operation on the set + * + * This field tracks the context of the current update operation. + * Subsequent updates are blocked until the current operation completes. + * The field is NULL when no update is in progress. + */ + struct ldms_op_ctxt *curr_updt_ctxt; }; /* Convenience macro to roundup a value to a multiple of the _s parameter */ @@ -157,7 +166,8 @@ struct ldms_set { extern int __ldms_xprt_push(ldms_set_t s, int push_flags); extern int __ldms_remote_lookup(ldms_t _x, const char *path, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg); + ldms_lookup_cb_t cb, void *cb_arg, + struct ldms_op_ctxt *op_ctxt); extern int __ldms_remote_dir(ldms_t x, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags); extern int __ldms_remote_dir_cancel(ldms_t x); extern struct ldms_set * diff --git a/ldms/src/core/ldms_rail.c b/ldms/src/core/ldms_rail.c index 16d8ee9a3..cb84882d9 100644 --- a/ldms/src/core/ldms_rail.c +++ b/ldms/src/core/ldms_rail.c @@ -76,6 +76,9 @@ extern ovis_log_t xlog; ovis_log(xlog, OVIS_LERROR, fmt, ## __VA_ARGS__); \ } while (0); +/* The definition is in ldms.c. */ +extern int __enable_profiling[LDMS_XPRT_OP_COUNT]; + static int __rail_connect(ldms_t _r, struct sockaddr *sa, socklen_t sa_len, ldms_event_cb_t cb, void *cb_arg); static int __rail_is_connected(ldms_t _r); @@ -86,13 +89,14 @@ static int __rail_sockaddr(ldms_t _r, struct sockaddr *local_sa, struct sockaddr *remote_sa, socklen_t *sa_len); static void __rail_close(ldms_t _r); -static int __rail_send(ldms_t _r, char *msg_buf, size_t msg_len); +static int __rail_send(ldms_t _r, char *msg_buf, size_t msg_len, + struct ldms_op_ctxt *op_ctxt); static size_t __rail_msg_max(ldms_t x); static int __rail_dir(ldms_t _r, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags); static int __rail_dir_cancel(ldms_t _r); static int __rail_lookup(ldms_t _r, const char *name, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg); -static void __rail_stats(ldms_t _r, ldms_xprt_stats_t stats); + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt); +static void __rail_stats(ldms_t _r, ldms_xprt_stats_t stats, int mask, int is_reset); static ldms_t __rail_get(ldms_t _r); /* ref get */ static void __rail_put(ldms_t _r); /* ref put */ @@ -102,7 +106,8 @@ static uint64_t __rail_conn_id(ldms_t _r); static const char *__rail_type_name(ldms_t _r); static void __rail_priority_set(ldms_t _r, int prio); static void __rail_cred_get(ldms_t _r, ldms_cred_t lcl, ldms_cred_t rmt); -static int __rail_update(ldms_t _r, struct ldms_set *set, ldms_update_cb_t cb, void *arg); +static int __rail_update(ldms_t _r, struct ldms_set *set, ldms_update_cb_t cb, void *arg, + struct ldms_op_ctxt *op_ctxt); static int __rail_get_threads(ldms_t _r, pthread_t *out, int n); static ldms_set_t __rail_set_by_name(ldms_t x, const char *set_name); @@ -199,6 +204,7 @@ ldms_t ldms_xprt_rail_new(const char *xprt_name, ldms_rail_t r; zap_t zap; int i; + enum ldms_xprt_ops_e op_e; if (n <= 0) { errno = EINVAL; @@ -228,6 +234,7 @@ ldms_t ldms_xprt_rail_new(const char *xprt_name, r->recv_quota = recv_quota; r->recv_rate_limit = rate_limit; rbt_init(&r->stream_client_rbt, __str_rbn_cmp); + snprintf(r->name, sizeof(r->name), "%s", xprt_name); snprintf(r->auth_name, sizeof(r->auth_name), "%s", auth_name); if (auth_av_list) { @@ -248,6 +255,9 @@ ldms_t ldms_xprt_rail_new(const char *xprt_name, r->eps[i].remote_is_rail = -1; rbt_init(&r->eps[i].sbuf_rbt, __stream_buf_cmp); TAILQ_INIT(&r->eps[i].sbuf_tq); + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + TAILQ_INIT(&(r->eps[i].op_ctxt_lists[op_e])); + } } zap = __ldms_zap_get(xprt_name); @@ -1066,20 +1076,31 @@ int __rail_rep_send_raw(struct ldms_rail_ep_s *rep, void *data, int len) return rc; } -static int __rail_send(ldms_t _r, char *msg_buf, size_t msg_len) +static int __rail_send(ldms_t _r, char *msg_buf, size_t msg_len, + struct ldms_op_ctxt *op_ctxt) { /* send over ep0 for now */ ldms_rail_t r = (ldms_rail_t)_r; int rc; struct ldms_rail_ep_s *rep; /* an endpoint inside the rail */ + pthread_mutex_lock(&r->mutex); if (r->eps[0].state != LDMS_RAIL_EP_CONNECTED) { rc = ENOTCONN; goto out; } rep = &r->eps[0]; - rc = ldms_xprt_send(rep->ep, msg_buf, msg_len); + + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + TAILQ_INSERT_TAIL(&(rep->op_ctxt_lists[LDMS_XPRT_OP_SEND]), + op_ctxt, ent); + } + rc = rep->ep->ops.send(rep->ep, msg_buf, msg_len, op_ctxt); if (rc) { + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + TAILQ_REMOVE(&(rep->op_ctxt_lists[LDMS_XPRT_OP_SEND]), + op_ctxt, ent); + } /* release the acquired quota if send failed */ __rep_quota_release(rep, msg_len); } @@ -1145,24 +1166,29 @@ struct ldms_rail_lookup_ctxt_s { ldms_lookup_cb_t app_cb; void *cb_arg; enum ldms_lookup_flags flags; + struct ldms_op_ctxt *op_ctxt; } *ldms_rail_lookup_ctxt_t; void __rail_lookup_cb(ldms_t x, enum ldms_lookup_status status, int more, ldms_set_t s, void *arg) { ldms_rail_lookup_ctxt_t lc = arg; + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + (void)clock_gettime(CLOCK_REALTIME, &lc->op_ctxt->lookup_profile.deliver_ts); + } lc->app_cb((void*)lc->r, status, more, s, lc->cb_arg); if (!more) free(lc); } static int __rail_lookup(ldms_t _r, const char *name, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg) + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt) { ldms_rail_t r = (ldms_rail_t)_r; int rc; struct ldms_rail_ep_s *rep; ldms_rail_lookup_ctxt_t lc; + pthread_mutex_lock(&r->mutex); if (r->state != LDMS_RAIL_EP_CONNECTED) { rc = ENOTCONN; @@ -1177,19 +1203,27 @@ static int __rail_lookup(ldms_t _r, const char *name, enum ldms_lookup_flags fla lc->app_cb = cb; lc->cb_arg = cb_arg; lc->flags = flags; + lc->op_ctxt = op_ctxt; rep = &r->eps[r->lookup_rr++]; r->lookup_rr %= r->n_eps; - rc = ldms_xprt_lookup(rep->ep, name, flags, __rail_lookup_cb, lc); + + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + TAILQ_INSERT_TAIL(&(rep->op_ctxt_lists[LDMS_XPRT_OP_LOOKUP]), op_ctxt, ent); + } + rc = rep->ep->ops.lookup(rep->ep, name, flags, __rail_lookup_cb, lc, op_ctxt); if (rc) { /* synchronous error */ free(lc); + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + TAILQ_REMOVE(&rep->op_ctxt_lists[LDMS_XPRT_OP_LOOKUP], op_ctxt, ent); + } } out: pthread_mutex_unlock(&r->mutex); return rc; } -static void __rail_stats(ldms_t _r, ldms_xprt_stats_t stats) +static void __rail_stats(ldms_t _r, ldms_xprt_stats_t stats, int mask, int is_reset) { /* TODO IMPLEMENT ME */ assert(0 == "Not Implemented"); @@ -1263,17 +1297,24 @@ void __rail_update_cb(ldms_t x, ldms_set_t s, int flags, void *arg) { struct ldms_rail_ep_s *rep = ldms_xprt_ctxt_get(x); ldms_rail_update_ctxt_t uc = arg; + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + struct ldms_op_ctxt *op_ctxt = s->curr_updt_ctxt; + + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->update_profile.deliver_ts); + s->curr_updt_ctxt = NULL; + } uc->app_cb((ldms_t)rep->rail, s, flags, uc->cb_arg); if (!(flags & LDMS_UPD_F_MORE)) { free(uc); } } -static int __rail_update(ldms_t _r, struct ldms_set *set, - ldms_update_cb_t cb, void *arg) +static int __rail_update(ldms_t _r, struct ldms_set *set, ldms_update_cb_t cb, + void *arg, struct ldms_op_ctxt *op_ctxt) { ldms_rail_t r = (void*)_r; ldms_rail_update_ctxt_t uc; + struct ldms_rail_ep_s *rep; int rc; uc = calloc(1, sizeof(*uc)); @@ -1282,9 +1323,19 @@ static int __rail_update(ldms_t _r, struct ldms_set *set, uc->r = r; uc->app_cb = cb; uc->cb_arg = arg; - rc = set->xprt->ops.update(set->xprt, set, __rail_update_cb, uc); + + rep = ldms_xprt_ctxt_get(set->xprt); + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + TAILQ_INSERT_TAIL(&(rep->op_ctxt_lists[LDMS_XPRT_OP_UPDATE]), op_ctxt, ent); + set->curr_updt_ctxt = op_ctxt; + } + rc = set->xprt->ops.update(set->xprt, set, __rail_update_cb, uc, op_ctxt); if (rc) { /* synchronously error, clean up the context */ + set->curr_updt_ctxt = NULL; + + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) + TAILQ_REMOVE(&(rep->op_ctxt_lists[LDMS_XPRT_OP_UPDATE]), op_ctxt, ent); free(uc); } return rc; @@ -1404,6 +1455,13 @@ void __rail_process_send_quota(ldms_t x, struct ldms_request *req) __rep_flush_sbuf_tq(rep); } +struct ldms_op_ctxt_list * +__rail_op_ctxt_list(struct ldms_xprt *x, enum ldms_xprt_ops_e op_e) +{ + struct ldms_rail_ep_s *rep = ldms_xprt_ctxt_get(x); + return &(rep->op_ctxt_lists[op_e]); +} + int ldms_xprt_rail_send_quota_get(ldms_t _r, uint64_t *quotas, int n) { ldms_rail_t r; @@ -1463,6 +1521,18 @@ zap_ep_t __rail_get_zap_ep(ldms_t x) return r->eps[0].ep->zap_ep; } +void timespec_hton(struct timespec *ts) +{ + ts->tv_nsec = htobe64(ts->tv_nsec); + ts->tv_sec = htobe64(ts->tv_sec); +} + +void timespec_ntoh(struct timespec *ts) +{ + ts->tv_nsec = be64toh(ts->tv_nsec); + ts->tv_sec = be64toh(ts->tv_sec); +} + int sockaddr2ldms_addr(struct sockaddr *sa, struct ldms_addr *la) { union ldms_sockaddr *lsa = (void*)sa; @@ -1639,8 +1709,6 @@ size_t format_set_delete_req(struct ldms_request *req, uint64_t xid, void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, ldms_set_delete_cb_t cb_fn) { - /* - */ assert(XTYPE_IS_RAIL(_r->xtype)); ldms_rail_t r = (ldms_rail_t)_r; @@ -1651,6 +1719,8 @@ void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, struct xprt_set_coll_entry *ent; int i; ldms_t x; + struct ldms_rail_ep_s *rep; + struct ldms_op_ctxt *op_ctxt; x = NULL; @@ -1697,6 +1767,19 @@ void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, req = (struct ldms_request *)(ctxt + 1); len = format_set_delete_req(req, (uint64_t)(unsigned long)ctxt, ldms_set_instance_name_get(s)); + if (ENABLED_PROFILING(LDMS_XPRT_OP_SET_DELETE)) { + rep = (struct ldms_rail_ep_s *)ldms_xprt_ctxt_get(x); + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) { + ovis_log(xlog, OVIS_LCRIT, "%s:%s:%d Memory allocation failure\n", + __FILE__, __func__, __LINE__); + /* Let the routine continue */ + } else { + ctxt->op_ctxt = op_ctxt; + TAILQ_INSERT_TAIL(&rep->op_ctxt_lists[LDMS_XPRT_OP_SET_DELETE], op_ctxt, ent); + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->set_del_profile.send_ts); + } + } zap_err_t zerr = zap_send(x->zap_ep, req, len); if (zerr) { char name[128]; @@ -1707,6 +1790,8 @@ void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, __FILE__, __func__, __LINE__, zerr, name); x->zerrno = zerr; __ldms_free_ctxt(x, ctxt); + if (ENABLED_PROFILING(LDMS_XPRT_OP_SET_DELETE)) + TAILQ_REMOVE(&rep->op_ctxt_lists[LDMS_XPRT_OP_SET_DELETE], op_ctxt, ent); } pthread_mutex_unlock(&x->lock); } @@ -1714,20 +1799,41 @@ void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, int __rep_flush_sbuf_tq(struct ldms_rail_ep_s *rep) { int rc; + struct ldms_op_ctxt *op_ctxt = NULL; struct __pending_sbuf_s *p; while ((p = TAILQ_FIRST(&rep->sbuf_tq))) { rc = __rep_quota_acquire(rep, p->sbuf->msg->msg_len); if (rc) goto out; + + if (ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) { + rc = ENOMEM; + goto out; + } + op_ctxt->op_type = LDMS_XPRT_OP_STREAM_PUBLISH; + op_ctxt->stream_pub_profile.hop_num = p->hop_num; + op_ctxt->stream_pub_profile.recv_ts = p->recv_ts; + TAILQ_INSERT_TAIL(&(rep->op_ctxt_lists[LDMS_XPRT_OP_STREAM_PUBLISH]), + op_ctxt, ent); + } rc = __rep_publish(rep, p->sbuf->name, p->sbuf->msg->name_hash, p->sbuf->msg->stream_type, &p->sbuf->msg->src, p->sbuf->msg->msg_gn, &p->sbuf->msg->cred, p->sbuf->msg->perm, - p->sbuf->data, - p->sbuf->data_len); + p->sbuf->msg->hop_cnt, + p->sbuf->msg->hops, + p->sbuf->data, p->sbuf->data_len, + &(op_ctxt->stream_pub_profile)); if (rc) { __rep_quota_release(rep, p->sbuf->msg->msg_len); + if (ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + TAILQ_REMOVE(&(rep->op_ctxt_lists[LDMS_XPRT_OP_STREAM_PUBLISH]), + op_ctxt, ent); + free(op_ctxt); + } goto out; } TAILQ_REMOVE(&rep->sbuf_tq, p, entry); diff --git a/ldms/src/core/ldms_rail.h b/ldms/src/core/ldms_rail.h index 63d865477..438a6ef04 100644 --- a/ldms/src/core/ldms_rail.h +++ b/ldms/src/core/ldms_rail.h @@ -114,6 +114,8 @@ struct ldms_rail_rate_quota_s { }; struct __pending_sbuf_s { + uint32_t hop_num; + struct timespec recv_ts; TAILQ_ENTRY(__pending_sbuf_s) entry; struct __stream_buf_s *sbuf; }; @@ -133,6 +135,14 @@ struct ldms_rail_ep_s { uint64_t pending_ret_quota; /* pending return quota */ int in_eps_stq; TAILQ_HEAD(, __pending_sbuf_s) sbuf_tq; /* pending fwd stream msgs */ + /* + * Array of operation context lists, indexed by `ldms_xprt_ops_e`. + * + * Each list stores operation contexts for the corresponding operation type. + * Operation contexts track profiling data for various operations, + * such as lookups, updates, and stream operations. + */ + struct ldms_op_ctxt_list op_ctxt_lists[LDMS_XPRT_OP_COUNT]; }; typedef struct ldms_rail_dir_ctxt_s { diff --git a/ldms/src/core/ldms_stream.c b/ldms/src/core/ldms_stream.c index 73e530631..3861f234e 100644 --- a/ldms/src/core/ldms_stream.c +++ b/ldms/src/core/ldms_stream.c @@ -79,6 +79,9 @@ #include "ldms_stream.h" #include "ldms_qgroup.h" +/* The definition is in ldms.c. */ +extern int __enable_profiling[LDMS_XPRT_OP_COUNT]; + static ovis_log_t __ldms_stream_log = NULL; /* see __ldms_stream_init() below */ #define __LOG(LVL, FMT, ...) ovis_log(__ldms_stream_log, LVL, FMT, ##__VA_ARGS__ ); @@ -227,12 +230,19 @@ static int __part_send(struct ldms_rail_ep_s *rep, return rc; } +/* The implementations are in ldms_rail.c, */ +extern void timespec_ntoh(struct timespec *ts); +extern void timespec_hton(struct timespec *ts); + int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, uint32_t hash, - ldms_stream_type_t stream_type, + ldms_stream_type_t stream_type, struct ldms_addr *src, uint64_t msg_gn, ldms_cred_t cred, int perm, - const char *data, size_t data_len) + uint32_t hop_cnt, + struct ldms_stream_hop * hops, + const char *data, size_t data_len, + struct strm_publish_profile_s *pts) { int rc = 0; int name_len = strlen(stream_name) + 1; @@ -246,6 +256,7 @@ int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, } else { bzero(&msg.src, sizeof(msg.src)); } + msg.msg_gn = htobe64(msg_gn); msg.msg_len = htobe32(name_len + data_len); msg.stream_type = htobe32(stream_type); @@ -253,6 +264,32 @@ int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, msg.cred.gid = htobe32(cred->gid); msg.perm = htobe32(perm); msg.name_hash = hash; + msg.hop_cnt = htobe32(hop_cnt); + if (hops && hop_cnt) { + size_t sz = hop_cnt * sizeof(struct ldms_stream_hop); + memcpy(&msg.hops, hops, sz); + /* + * The timespec in hops are in network order already, + * so don't covert it. + */ + } + + (void)clock_gettime(CLOCK_REALTIME, &(pts->send_ts)); + if (hop_cnt <= STREAM_MAX_PROFILE_HOPS) { + /* + * We store the receive and send time only + * when the stream data has been forwarded + * at most STREAM_MAX_PROFILE_HOPS times. + * + * We ignore the timestamps after + * the STREAM_MAX_PROFILE_HOPS'th. + * + */ + msg.hops[hop_cnt].recv_ts = pts->recv_ts; + msg.hops[hop_cnt].send_ts = pts->send_ts; + timespec_hton(&msg.hops[hop_cnt].recv_ts); + timespec_hton(&msg.hops[hop_cnt].send_ts); + } rc = __part_send(rep, &msg.src, msg_gn, &msg, sizeof(msg), /* msg hdr */ stream_name, name_len, /* name */ @@ -277,11 +314,14 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) int rc; uint64_t addr_port; uint64_t hash; + struct ldms_op_ctxt *op_ctxt = NULL; + if (ev->type == LDMS_STREAM_EVENT_CLOSE) return 0; assert( ev->type == LDMS_STREAM_EVENT_RECV ); if (!XTYPE_IS_RAIL(ev->recv.client->x->xtype)) return ENOTSUP; + r = (ldms_rail_t)ev->recv.client->x; switch (ev->recv.src.sa_family) { case 0: @@ -323,6 +363,8 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) struct __pending_sbuf_s *e; e = malloc(sizeof(*e)); if (e) { + e->hop_num = _ev->pub.hop_num; + e->recv_ts = _ev->pub.recv_ts; e->sbuf = sbuf; ref_get(&sbuf->ref, "pending"); TAILQ_INSERT_TAIL(&r->eps[ep_idx].sbuf_tq, e, entry); @@ -330,14 +372,29 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) goto out; } + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_STREAM_PUBLISH; + op_ctxt->stream_pub_profile.hop_num = _ev->pub.hop_num; + op_ctxt->stream_pub_profile.recv_ts = _ev->pub.recv_ts; rc = __rep_publish(&r->eps[ep_idx], ev->recv.name, ev->recv.name_hash, ev->recv.type, &ev->recv.src, ev->recv.msg_gn, &ev->recv.cred, ev->recv.perm, - ev->recv.data, - ev->recv.data_len); - if (rc) + _ev->sbuf->msg->hop_cnt, + _ev->sbuf->msg->hops, + ev->recv.data, ev->recv.data_len, + &(op_ctxt->stream_pub_profile)); + if (rc || !ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + free(op_ctxt); + } else { + TAILQ_INSERT_TAIL(&(r->eps[ep_idx].op_ctxt_lists[LDMS_XPRT_OP_STREAM_PUBLISH]), + op_ctxt, ent); + } + if (rc) { __rate_quota_release(&ev->recv.client->rate_quota, ev->recv.data_len); + } out: return rc; } @@ -486,13 +543,15 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, uint32_t hash, ldms_stream_type_t stream_type, ldms_cred_t cred, uint32_t perm, - const char *data, size_t data_len) + const char *data, size_t data_len, + uint32_t hop_cnt, struct timespec *recv_ts) { int rc = 0, gc; struct ldms_stream_s *s; struct ldms_stream_client_entry_s *sce, *next_sce; struct ldms_stream_client_s *c; struct timespec now; + size_t sz; s = __stream_get(stream_name, NULL); if (!s) { @@ -502,6 +561,8 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, struct __stream_event_s _ev = { .pub = { + .hop_num = hop_cnt, + .recv_ts = *recv_ts, .type = LDMS_STREAM_EVENT_RECV, .recv = { .src = {0}, @@ -530,10 +591,11 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, pthread_rwlock_wrlock(&s->rwlock); clock_gettime(CLOCK_REALTIME, &now); __counters_update(&s->rx, &now, data_len); - if (__stream_stats_level > 1) { + if ((__stream_stats_level > 1) || ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { /* stats by src */ struct rbn *rbn = rbt_find(&s->src_stats_rbt, &_ev.pub.recv.src); struct ldms_stream_src_stats_s *ss; + struct ldms_stream_profile_ent *prof; if (rbn) { ss = container_of(rbn, struct ldms_stream_src_stats_s, rbn); } else { @@ -548,8 +610,30 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, rbn_init(&ss->rbn, &ss->src); ss->rx = LDMS_STREAM_COUNTERS_INITIALIZER; rbt_ins(&s->src_stats_rbt, &ss->rbn); + TAILQ_INIT(&ss->profiles); } __counters_update(&ss->rx, &now, data_len); + if (sbuf && ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + /* Receive the stream data from remote server, so cache the profile */ + sz = (sbuf->msg->hop_cnt+1) * sizeof(struct ldms_stream_hop); + prof = calloc(1, sizeof(*prof) + sz); + if (!prof) { + /* error in stats shall not break the normal + * operations + */ + pthread_rwlock_unlock(&s->rwlock); + goto skip_stats; + } + prof->profiles.hop_cnt = sbuf->msg->hop_cnt; + memcpy(&(prof->profiles.hops), &(sbuf->msg->hops), sz); + int i; + for (i = 0; i < prof->profiles.hop_cnt; i++) { + timespec_ntoh(&prof->profiles.hops[i].recv_ts); + timespec_ntoh(&prof->profiles.hops[i].send_ts); + } + prof->profiles.hops[prof->profiles.hop_cnt].recv_ts = *recv_ts; + TAILQ_INSERT_TAIL(&ss->profiles, prof, ent); + } } pthread_rwlock_unlock(&s->rwlock); skip_stats: @@ -580,9 +664,7 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, ref_get(&c->ref, "callback"); pthread_rwlock_unlock(&s->rwlock); _ev.pub.recv.client = c; - /* TODO: Start: Get timing for application's stream handling time. */ rc = c->cb_fn(&_ev.pub, c->cb_arg); - /* TODO: End: Get timing for application's stream handling time. */ if (__stream_stats_level > 0) { pthread_rwlock_wrlock(&c->rwlock); if (rc) { @@ -824,8 +906,7 @@ int __publish_cred_check(ldms_cred_t cred) int ldms_stream_publish(ldms_t x, const char *stream_name, ldms_stream_type_t stream_type, - ldms_cred_t cred, - uint32_t perm, + ldms_cred_t cred, uint32_t perm, const char *data, size_t data_len) { ldms_rail_t r; @@ -836,6 +917,11 @@ int ldms_stream_publish(ldms_t x, const char *stream_name, int rc; uint32_t hash; int ep_idx; + struct ldms_op_ctxt *op_ctxt = NULL; + struct ldms_op_ctxt_list *op_ctxt_list; + struct timespec recv_ts; + + (void)clock_gettime(CLOCK_REALTIME, &recv_ts); msg_gn = __atomic_fetch_add(&stream_gn, 1, __ATOMIC_SEQ_CST); @@ -865,14 +951,30 @@ int ldms_stream_publish(ldms_t x, const char *stream_name, rc = __rep_quota_acquire(&r->eps[ep_idx], q); if (rc) return rc; - return __rep_publish(&r->eps[ep_idx], stream_name, hash, - stream_type, 0, msg_gn, cred, perm, data, - data_len); + + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_PUBLISH; + op_ctxt->stream_pub_profile.hop_num = 0; + op_ctxt->stream_pub_profile.recv_ts = recv_ts; + rc = __rep_publish(&r->eps[ep_idx], stream_name, hash, + stream_type, 0, msg_gn, cred, perm, + 0, NULL, data, data_len, + &(op_ctxt->stream_pub_profile)); + + if (rc || !ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + free(op_ctxt); + } else { + op_ctxt_list = &(r->eps[ep_idx].op_ctxt_lists[LDMS_XPRT_OP_PUBLISH]); + TAILQ_INSERT_TAIL(op_ctxt_list, op_ctxt, ent); + } + return rc; } /* else publish locally */ return __stream_deliver(0, msg_gn, stream_name, name_len, hash, - stream_type, cred, perm, data, data_len); + stream_type, cred, perm, data, data_len, 0, &recv_ts); } static void __client_ref_free(void *arg) @@ -1158,6 +1260,7 @@ static void __process_stream_msg(ldms_t x, struct ldms_request *req) { struct ldms_rail_ep_s *rep = ldms_xprt_ctxt_get(x); + struct ldms_thrstat *thrstat; /* no need to take lock ; only one zap thread working on this tree */ struct rbn *rbn; struct __stream_buf_s *sbuf; @@ -1219,7 +1322,7 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) fmsg = (void*)req->stream_part.part_msg; flen = be32toh(fmsg->msg_len); - sbuf = malloc(sizeof(*sbuf) + sizeof(*fmsg) + flen); + sbuf = calloc(1, sizeof(*sbuf) + sizeof(*fmsg) + flen); if (!sbuf) return; ref_init(&sbuf->ref, "init", __stream_buf_s_ref_free, sbuf); @@ -1255,6 +1358,8 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) sbuf->msg->cred.uid = be32toh(sbuf->msg->cred.uid); sbuf->msg->cred.gid = be32toh(sbuf->msg->cred.gid); sbuf->msg->perm = be32toh(sbuf->msg->perm); + sbuf->msg->hop_cnt = be32toh(sbuf->msg->hop_cnt); + sbuf->msg->hop_cnt++; /* sbuf->msg->name_hash does not need byte conversion */ sbuf->name = sbuf->msg->msg; @@ -1262,11 +1367,13 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) sbuf->data = sbuf->msg->msg + sbuf->name_len; sbuf->data_len = sbuf->msg->msg_len - sbuf->name_len; + thrstat = zap_thrstat_ctxt_get(x->zap_ep); __stream_deliver(sbuf, sbuf->msg->msg_gn, sbuf->name, sbuf->name_len, sbuf->msg->name_hash, sbuf->msg->stream_type, &sbuf->msg->cred, sbuf->msg->perm, - sbuf->data, sbuf->data_len); + sbuf->data, sbuf->data_len, + sbuf->msg->hop_cnt, &(thrstat->last_op_start)); cleanup: rbt_del(&rep->sbuf_rbt, &sbuf->rbn); @@ -1401,6 +1508,7 @@ void __stream_req_recv(ldms_t x, int cmd, struct ldms_request *req) { assert(0 == XTYPE_IS_RAIL(x->xtype)); /* x is NOT a rail */ assert(x->event_cb == __rail_cb); + switch (cmd) { case LDMS_CMD_STREAM_MSG: __process_stream_msg(x, req); @@ -1490,6 +1598,15 @@ int ldms_stream_stats_level_get() return __atomic_load_n(&__stream_stats_level, __ATOMIC_SEQ_CST); } +void __stream_profiling_purge(struct ldms_stream_profile_list *profiles) +{ + struct ldms_stream_profile_ent *prf; + while ((prf = TAILQ_FIRST(profiles))) { + TAILQ_REMOVE(profiles, prf, ent); + free(prf); + } +} + void __src_stats_rbt_purge(struct rbt *rbt) { struct rbn *rbn; @@ -1497,6 +1614,7 @@ void __src_stats_rbt_purge(struct rbt *rbt) while ((rbn = rbt_min(rbt))) { rbt_del(rbt, rbn); sss = container_of(rbn, struct ldms_stream_src_stats_s, rbn); + __stream_profiling_purge(&sss->profiles); free(sss); } } @@ -1506,6 +1624,8 @@ int __src_stats_rbt_copy(struct rbt *t0, struct rbt *t1, int is_reset) { struct rbn *rbn; struct ldms_stream_src_stats_s *s0, *s1; + struct ldms_stream_profile_ent *prf0, *prf1; + size_t sz; int rc; for (rbn = rbt_min(t0); rbn; rbn = rbn_succ(rbn)) { s0 = container_of(rbn, struct ldms_stream_src_stats_s, rbn); @@ -1515,8 +1635,24 @@ int __src_stats_rbt_copy(struct rbt *t0, struct rbt *t1, int is_reset) goto err_0; } *s1 = *s0; - if (is_reset) + TAILQ_INIT(&s1->profiles); + + /* Copy the profiles */ + TAILQ_FOREACH(prf0, &s0->profiles, ent) { + sz = prf0->profiles.hop_cnt * sizeof(struct ldms_stream_hop); + prf1 = calloc(1, sizeof(*prf1) + sz); + if (!prf1) { + rc = ENOMEM; + goto err_0; + } + prf1->profiles.hop_cnt = prf0->profiles.hop_cnt; + memcpy(&(prf1->profiles.hops), &(prf0->profiles.hops), sz); + TAILQ_INSERT_TAIL(&s1->profiles, prf1, ent); + } + if (is_reset) { LDMS_STREAM_COUNTERS_INIT(&s0->rx); + __stream_profiling_purge(&s0->profiles); + } rbn_init(&s1->rbn, &s1->src); rbt_ins(t1, &s1->rbn); } diff --git a/ldms/src/core/ldms_stream.h b/ldms/src/core/ldms_stream.h index 7e796b3aa..ba896a23f 100644 --- a/ldms/src/core/ldms_stream.h +++ b/ldms/src/core/ldms_stream.h @@ -135,6 +135,9 @@ struct ldms_stream_full_msg_s { struct ldms_cred cred; /* credential of the originator */ uint32_t perm; /* 0777 style permission */ uint32_t name_hash; + /* Allocate space to collect profile data for 8 hops */ + uint32_t hop_cnt; + struct ldms_stream_hop hops[STREAM_MAX_PROFILE_HOPS+1]; char msg[OVIS_FLEX]; /* `msg` format: * .----------------------. @@ -169,10 +172,11 @@ struct __stream_buf_s { /* for internal use */ int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, - uint32_t hash, - ldms_stream_type_t stream_type, + uint32_t hash, ldms_stream_type_t stream_type, struct ldms_addr *src, uint64_t msg_gn, ldms_cred_t cred, int perm, - const char *data, size_t data_len); - + uint32_t hop_cnt, + struct ldms_stream_hop *hops, + const char *data, size_t data_len, + struct strm_publish_profile_s *pts); #endif /* __LDMS_STREAM_H__ */ diff --git a/ldms/src/core/ldms_xprt.c b/ldms/src/core/ldms_xprt.c index c3e0d3776..fa53f5c97 100644 --- a/ldms/src/core/ldms_xprt.c +++ b/ldms/src/core/ldms_xprt.c @@ -6,8 +6,7 @@ * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU - * General Public License (GPL) Version 2, available from the file - * COPYING in the main directory of this source tree, or the BSD-type +` * COPYING in the main directory of this source tree, or the BSD-type * license below: * * Redistribution and use in source and binary forms, with or without @@ -84,6 +83,9 @@ extern ovis_log_t xlog; ovis_log(xlog, level, fmt, ## __VA_ARGS__); \ } while (0); +/* The definition is in ldms_xprt.c. */ +extern int __enable_profiling[LDMS_XPRT_OP_COUNT]; + /** * zap callback function. */ @@ -95,6 +97,10 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev); */ void ldms_zap_auto_cb(zap_ep_t zep, zap_event_t ev); +/* The implementation is is ldms_rail.c. */ +struct ldms_op_ctxt_list * +__rail_op_ctxt_list(ldms_t x, enum ldms_xprt_ops_e op_e); + #if 0 #define TF() XPRT_LOG(NULL, OVIS_LALWAYS, "%s:%d\n", __FUNCTION__, __LINE__) #else @@ -744,10 +750,15 @@ void ldms_xprt_put(ldms_t x) x->ops.put(x); } +/* The implementations are in ldms_rail.c. */ +extern void timespec_hton(struct timespec *ts); +extern void timespec_ntoh(struct timespec *ts); + static void process_set_delete_request(struct ldms_xprt *x, struct ldms_request *req) { struct ldms_reply reply; struct ldms_set *set; + size_t len; /* * Always notify the application about peer set delete. If we happened @@ -777,8 +788,11 @@ static void process_set_delete_request(struct ldms_xprt *x, struct ldms_request reply.hdr.xid = req->hdr.xid; reply.hdr.cmd = htonl(LDMS_CMD_SET_DELETE_REPLY); reply.hdr.rc = 0; - reply.hdr.len = htonl(sizeof(reply.hdr)); - zap_err_t zerr = zap_send(x->zap_ep, &reply, sizeof(reply.hdr)); + len = sizeof(reply.hdr) + sizeof(reply.set_del); + reply.hdr.len = htonl(len); + (void)clock_gettime(CLOCK_REALTIME, &reply.set_del.recv_ts); + timespec_hton(&reply.set_del.recv_ts); + zap_err_t zerr = zap_send(x->zap_ep, &reply, len); if (zerr != ZAP_ERR_OK) { x->zerrno = zerr; XPRT_LOG(x, OVIS_LERROR, "%s: zap_send synchronously error. " @@ -790,6 +804,12 @@ static void process_set_delete_reply(struct ldms_xprt *x, struct ldms_reply *reply, struct ldms_context *ctxt) { + if (ENABLED_PROFILING(LDMS_XPRT_OP_SET_DELETE)) { + struct ldms_thrstat *thrstat = zap_thrstat_ctxt_get(x->zap_ep); + memcpy(&ctxt->op_ctxt->set_del_profile.ack_ts, &thrstat->last_op_start, sizeof(struct timespec)); + timespec_ntoh(&reply->set_del.recv_ts); + memcpy(&ctxt->op_ctxt->set_del_profile.recv_ts, &reply->set_del.recv_ts, sizeof(struct timespec)); + } ctxt->set_delete.cb(x, reply->hdr.rc, ctxt->set_delete.s, ctxt->set_delete.cb_arg); pthread_mutex_lock(&x->lock); __ldms_free_ctxt(x, ctxt); @@ -1149,7 +1169,7 @@ process_cancel_push_request(struct ldms_xprt *x, struct ldms_request *req) return; } -static void __copy_set_info_to_lookup_msg(char *buffer, ldms_name_t schema, +static void *__copy_set_info_to_lookup_msg(char *buffer, ldms_name_t schema, ldms_name_t inst_name, struct ldms_set *set) { @@ -1200,6 +1220,7 @@ static void __copy_set_info_to_lookup_msg(char *buffer, ldms_name_t schema, str = (ldms_name_t)&(str->name[str->len]); } str->len = 0; + return (void*)str + sizeof(str->len); } /* Caller should hold the set lock */ @@ -1303,6 +1324,7 @@ static int __add_lookup_peer(struct ldms_xprt *x, struct ldms_set *set) return rc; } +#define LU_PARAM_PRFL_MARKER "lu_prflng" static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, uint64_t xid, int more) { @@ -1312,6 +1334,7 @@ static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, ldms_name_t name = get_instance_name(set->meta); ldms_name_t schema = get_schema_name(set->meta); + struct ldms_thrstat *thrstat = zap_thrstat_ctxt_get(x->zap_ep); /* * The lookup.set_info encodes schema name, instance name * and the set info key value pairs as follows. @@ -1349,11 +1372,24 @@ static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, * | last value string | * S S * +---------------------------+ + * | 0 | + * +---------------------------+ + * | LU_PARAM_PRFL_MARKER | + * +---------------------------+ + * | struct timespec | + * | (request receiving ts) | + * +---------------------------+ + * | struct timespec | + * | (sharing ts) | + * +---------------------------+ */ int set_info_cnt; size_t set_info_len; size_t msg_len; struct ldms_rendezvous_msg *msg; + struct timespec *req_recv_ts, *share_ts; + char *prfl_marker; + size_t prfl_marker_len = strlen(LU_PARAM_PRFL_MARKER) + 1; pthread_mutex_lock(&set->lock); __get_set_info_sz(set, &set_info_cnt, &set_info_len); @@ -1364,13 +1400,26 @@ static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, * +1 for the terminating string of length 0 */ + sizeof(struct ldms_name) * (2 + (set_info_cnt) * 2 + 1) - + name->len + schema->len + set_info_len; + + name->len + schema->len + set_info_len + /* + * Encode the request receiving timestamp + * and the sharing timestamp + */ + + prfl_marker_len + sizeof(struct timespec) * 2; + msg = calloc(1, msg_len); if (!msg) { pthread_mutex_unlock(&set->lock); return ENOMEM; } - __copy_set_info_to_lookup_msg(msg->lookup.set_info, schema, name, set); + prfl_marker = __copy_set_info_to_lookup_msg(msg->lookup.set_info, schema, name, set); + /* Embed the profiling timestamps in the lookup reply message */ + strcpy(prfl_marker, LU_PARAM_PRFL_MARKER); + req_recv_ts = (struct timespec *)(prfl_marker + prfl_marker_len); + memcpy(req_recv_ts, &thrstat->last_op_start, sizeof(struct timespec)); + share_ts = req_recv_ts+1; + (void)clock_gettime(CLOCK_REALTIME, share_ts); + /* Fill the set details */ pthread_mutex_unlock(&set->lock); msg->hdr.xid = xid; msg->hdr.cmd = htonl(LDMS_XPRT_RENDEZVOUS_LOOKUP); @@ -1381,7 +1430,6 @@ static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, msg->lookup.meta_len = htonl(__le32_to_cpu(set->meta->meta_sz)); msg->lookup.card = htonl(__le32_to_cpu(set->meta->card)); msg->lookup.array_card = htonl(__le32_to_cpu(set->meta->array_card)); - XPRT_LOG(x, OVIS_LDEBUG, "%s(): x %p: sharing ... remote lookup ctxt %p\n", __func__, x, (void *)xid); zap_err_t zerr = zap_share(x->zap_ep, set->lmap, (const char *)msg, msg_len); @@ -1549,6 +1597,24 @@ static int do_read_all(ldms_t x, ldms_set_t s, ldms_update_cb_t cb, void *arg) goto out; } assert(x == ctxt->x); + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + ctxt->op_ctxt = s->curr_updt_ctxt; + if (0 == ctxt->op_ctxt->update_profile.read_ts.tv_sec) { + /* + * If the data read timestamp is not set, + * record the current time as the start of the read operation. + * + * The read operation may involve reading the entire set at once, + * reading the meta followed by data, + * or reading multiple times to obtain the updated copy of the set. + */ + (void)clock_gettime(CLOCK_REALTIME, &ctxt->op_ctxt->update_profile.read_ts); + } else { + /* + * Continue reading the set. The read operation has already started. + */ + } + } rc = zap_read(x->zap_ep, s->rmap, zap_map_addr(s->rmap), s->lmap, zap_map_addr(s->lmap), len, ctxt); if (rc) { @@ -1575,6 +1641,20 @@ static int do_read_meta(ldms_t x, ldms_set_t s, ldms_update_cb_t cb, void *arg) goto out; } assert(x == ctxt->x); + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + ctxt->op_ctxt = s->curr_updt_ctxt; + if (0 == ctxt->op_ctxt->update_profile.read_ts.tv_sec) { + /* + * If the data read timestamp is not set, + * record the current time as the start of the read operation. + */ + (void)clock_gettime(CLOCK_REALTIME, &ctxt->op_ctxt->update_profile.read_ts); + } else { + /* + * Continue reading the set. The read operation has already started. + */ + } + } rc = zap_read(x->zap_ep, s->rmap, zap_map_addr(s->rmap), s->lmap, zap_map_addr(s->lmap), meta_sz, ctxt); if (rc) { @@ -1598,7 +1678,6 @@ static int do_read_data(ldms_t x, ldms_set_t s, int idx_from, int idx_to, ctxt = __ldms_alloc_ctxt(x, sizeof(*ctxt), LDMS_CONTEXT_UPDATE, s, cb, arg, idx_from, idx_to); - if (!ctxt) { rc = ENOMEM; goto out; @@ -1609,6 +1688,20 @@ static int do_read_data(ldms_t x, ldms_set_t s, int idx_from, int idx_to, dlen = (idx_to - idx_from + 1) * data_sz; assert(x == ctxt->x); + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + ctxt->op_ctxt = s->curr_updt_ctxt; + if (0 == ctxt->op_ctxt->update_profile.read_ts.tv_sec) { + /* + * If the data read timestamp is not set, + * record the current time as the start of the read operation. + */ + (void)clock_gettime(CLOCK_REALTIME, &ctxt->op_ctxt->update_profile.read_ts); + } else { + /* + * Continue reading the set. The read operation has already started. + */ + } + } rc = zap_read(x->zap_ep, s->rmap, zap_map_addr(s->rmap) + doff, s->lmap, zap_map_addr(s->lmap) + doff, dlen, ctxt); if (rc) { @@ -1758,6 +1851,12 @@ static void process_lookup_reply(struct ldms_xprt *x, struct ldms_reply *reply, struct ldms_context *ctxt) { + struct ldms_thrstat *thrstat; + + thrstat = zap_thrstat_ctxt_get(x->zap_ep); + memcpy(&ctxt->op_ctxt->lookup_profile.complete_ts, &thrstat->last_op_start, + sizeof(struct timespec)); + int rc = ntohl(reply->hdr.rc); if (!rc) { /* A peer should only receive error in lookup_reply. @@ -2538,14 +2637,40 @@ static void handle_zap_read_complete(zap_ep_t zep, zap_event_t ev) switch (ctxt->type) { case LDMS_CONTEXT_UPDATE: thrstat->last_op = LDMS_THRSTAT_OP_UPDATE_REPLY; + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + /* + * If read complete timestamp is already set, + * we replace it with a new timestamp. + * + * We collect the timestamp of the beginning of thr first read and + * the timestamp of the completion of the last read. + */ + memcpy(&ctxt->op_ctxt->update_profile.read_complete_ts, + &thrstat->last_op_start, + sizeof(struct timespec)); + } __handle_update_data(x, ctxt, ev); break; case LDMS_CONTEXT_UPDATE_META: - thrstat->last_op = LDMS_THRSTAT_OP_UPDATE_REPLY; + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + /* + * With the same reason as in the LDMS_CONTEXT_UPDATE case, + * we set or reset the read complete timestamp. + */ + memcpy(&ctxt->op_ctxt->update_profile.read_complete_ts, + &thrstat->last_op_start, + sizeof(struct timespec)); + } __handle_update_meta(x, ctxt, ev); break; case LDMS_CONTEXT_LOOKUP_READ: thrstat->last_op = LDMS_THRSTAT_OP_LOOKUP_REPLY; + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + memcpy(&ctxt->op_ctxt->lookup_profile.complete_ts, + &thrstat->last_op_start, + sizeof(struct timespec)); + } + __handle_lookup(x, ctxt, ev); break; default: @@ -2602,7 +2727,7 @@ static const ldms_name_t __lookup_set_info_find(const char *set_info, return NULL; } -static int __process_lookup_set_info(struct ldms_set *lset, char *set_info) +static int __process_lookup_set_info(struct ldms_set *lset, char *set_info, char **_buf) { int rc = 0; ldms_name_t key, value; @@ -2628,6 +2753,7 @@ static int __process_lookup_set_info(struct ldms_set *lset, char *set_info) key = (ldms_name_t)(&value->name[value->len]); value = (ldms_name_t)(&key->name[key->len]); } + *_buf = (char *)value; if (!dir_upd) { /* Check if a key-value pair is removed from the set info or not */ pair = LIST_FIRST(&lset->remote_info); @@ -2659,6 +2785,12 @@ static void handle_rendezvous_lookup(zap_ep_t zep, zap_event_t ev, struct ldms_set *lset; int rc; ldms_name_t schema_name, inst_name; + char *prfl_maker = 0; + struct timespec *req_recv_ts; + struct timespec *share_ts; + + struct ldms_thrstat *thrstat = zap_thrstat_ctxt_get(zep); + struct ldms_op_ctxt *op_ctxt = ctxt->op_ctxt; #ifdef DEBUG if (!__is_lookup_name_good(x, lu, ctxt)) { @@ -2695,13 +2827,28 @@ static void handle_rendezvous_lookup(zap_ep_t zep, zap_event_t ev, rc = errno; goto callback; } - /* Drop when deleting the set TODO: don't forget to drop this */ lset->xprt = ldms_xprt_get(x); lset->rmap = ev->map; /* lset now owns ev->map */ lset->remote_set_id = lm->lookup.set_id; pthread_mutex_lock(&lset->lock); - (void)__process_lookup_set_info(lset, &inst_name->name[inst_name->len]); + (void)__process_lookup_set_info(lset, &inst_name->name[inst_name->len], &prfl_maker); + + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + if (prfl_maker < (char *)lm + lm->hdr.len) { + /* The message is from v4.5.1+ version, + * which includes the lookup profiling timestamps. + */ + if (0 == strcmp(prfl_maker, LU_PARAM_PRFL_MARKER)) { + req_recv_ts = (struct timespec *)(prfl_maker + strlen(LU_PARAM_PRFL_MARKER) + 1); + share_ts = req_recv_ts+1; + memcpy(&op_ctxt->lookup_profile.rendzv_ts, &thrstat->last_op_start, sizeof(struct timespec)); + memcpy(&op_ctxt->lookup_profile.req_recv_ts, req_recv_ts, sizeof(struct timespec)); + memcpy(&op_ctxt->lookup_profile.share_ts, share_ts, sizeof(struct timespec)); + } + } + } + pthread_mutex_unlock(&lset->lock); pthread_mutex_lock(&x->lock); @@ -2719,8 +2866,12 @@ static void handle_rendezvous_lookup(zap_ep_t zep, zap_event_t ev, rd_ctxt->sem = ctxt->sem; rd_ctxt->sem_p = ctxt->sem_p; rd_ctxt->rc = ctxt->rc; + rd_ctxt->op_ctxt = ctxt->op_ctxt; pthread_mutex_unlock(&x->lock); assert((zep == x->zap_ep) && (x == rd_ctxt->x)); + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->lookup_profile.read_ts); + } rc = zap_read(zep, lset->rmap, zap_map_addr(lset->rmap), lset->lmap, zap_map_addr(lset->lmap), @@ -2950,6 +3101,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) struct ldms_xprt *x = zap_get_ucontext(zep); struct ldms_thrstat *thrstat; struct ldms_thrstat_entry *thrstat_e = NULL; + struct ldms_op_ctxt *op_ctxt = NULL; if (x == NULL) return; @@ -3112,7 +3264,13 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) * Applications know only the connection is connecting. */ } else { - if (x->event_cb && (uint64_t)ev->context == LDMS_CMD_SEND_MSG) { + if (x->event_cb && ev->context) { + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + op_ctxt = (struct ldms_op_ctxt *)ev->context; + memcpy(&op_ctxt->send_profile.complete_ts, &thrstat->last_op_start, + sizeof(struct timespec)); + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->send_profile.deliver_ts); + } event.type = LDMS_XPRT_EVENT_SEND_COMPLETE; x->event_cb(x, &event, x->event_cb_arg); } @@ -3244,12 +3402,13 @@ static int __ldms_xprt_sockaddr(ldms_t x, struct sockaddr *local_sa, struct sockaddr *remote_sa, socklen_t *sa_len); static void __ldms_xprt_close(ldms_t x); -static int __ldms_xprt_send(ldms_t x, char *msg_buf, size_t msg_len); +static int __ldms_xprt_send(ldms_t x, char *msg_buf, size_t msg_len, + struct ldms_op_ctxt *op_ctxt); static size_t __ldms_xprt_msg_max(ldms_t x); static int __ldms_xprt_dir(ldms_t x, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags); static int __ldms_xprt_lookup(ldms_t x, const char *path, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg); -static void __ldms_xprt_stats(ldms_t x, ldms_xprt_stats_t stats); + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt); +static void __ldms_xprt_stats(ldms_t x, ldms_xprt_stats_t stats, int mask, int is_reset); static int __ldms_xprt_dir_cancel(ldms_t x); static ldms_t __ldms_xprt_get(ldms_t x); /* ref get */ @@ -3261,7 +3420,8 @@ static const char *__ldms_xprt_type_name(ldms_t x); static void __ldms_xprt_priority_set(ldms_t x, int prio); static void __ldms_xprt_cred_get(ldms_t x, ldms_cred_t lcl, ldms_cred_t rmt); static void __ldms_xprt_event_cb_set(ldms_t x, ldms_event_cb_t cb, void *cb_arg); -int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg); +int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg, + struct ldms_op_ctxt *op_ctxt); int __ldms_xprt_get_threads(ldms_t x, pthread_t *out, int n); zap_ep_t __ldms_xprt_get_zap_ep(ldms_t x); static ldms_set_t __ldms_xprt_set_by_name(ldms_t x, const char *set_name); @@ -3511,7 +3671,8 @@ size_t format_cancel_notify_req(struct ldms_request *req, uint64_t xid, return len; } -static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) +static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len, + struct ldms_op_ctxt *op_ctxt) { struct ldms_xprt *x = _x; struct ldms_request *req; @@ -3536,6 +3697,7 @@ static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) rc = ENOMEM; goto err_0; } + ctxt->op_ctxt = op_ctxt; req = (struct ldms_request *)(ctxt + 1); req->hdr.xid = 0; req->hdr.cmd = htonl(LDMS_CMD_SEND_MSG); @@ -3545,7 +3707,10 @@ static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) sizeof(struct ldms_send_cmd_param) + msg_len; req->hdr.len = htonl(len); - rc = zap_send2(x->zap_ep, req, len, (void*)(uint64_t)LDMS_CMD_SEND_MSG); + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->send_profile.send_ts); + } + rc = zap_send2(x->zap_ep, req, len, (void*)op_ctxt); #ifdef DEBUG if (rc) { XPRT_LOG(x, OVIS_LDEBUG, "send: error. put ref %p.\n", x->zap_ep); @@ -3560,7 +3725,19 @@ static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) int ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) { - return _x->ops.send(_x, msg_buf, msg_len); + int rc; + struct ldms_op_ctxt *op_ctxt = NULL; + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_SEND; + (void)clock_gettime(CLOCK_REALTIME, &(op_ctxt->send_profile.app_req_ts)); + } + rc = _x->ops.send(_x, msg_buf, msg_len, op_ctxt); + if (rc) + free(op_ctxt); + return rc; } static size_t __ldms_xprt_msg_max(ldms_t x) @@ -3699,7 +3876,8 @@ int ldms_xprt_dir_cancel(ldms_t x) int __ldms_remote_lookup(ldms_t _x, const char *path, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *arg) + ldms_lookup_cb_t cb, void *arg, + struct ldms_op_ctxt *op_ctxt) { struct ldms_xprt *x = _x; struct ldms_request *req; @@ -3749,6 +3927,10 @@ int __ldms_remote_lookup(ldms_t _x, const char *path, XPRT_LOG(x, OVIS_LDEBUG, "remote_lookup: get ref %p: active_lookup = %d\n", x->zap_ep, x->active_lookup); #endif /* DEBUG */ + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + ctxt->op_ctxt = op_ctxt; + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->lookup_profile.req_send_ts); + } zap_err_t zerr = zap_send(x->zap_ep, req, len); if (zerr) { pthread_mutex_lock(&x->lock); @@ -3777,37 +3959,91 @@ static void sync_lookup_cb(ldms_t x, enum ldms_lookup_status status, int more, } static int __ldms_xprt_lookup(ldms_t x, const char *path, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg) + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt) { int rc; if ((flags & !cb) || strlen(path) > LDMS_LOOKUP_PATH_MAX) return EINVAL; if (!cb) { - rc = __ldms_remote_lookup(x, path, flags, sync_lookup_cb, cb_arg); + rc = __ldms_remote_lookup(x, path, flags, sync_lookup_cb, cb_arg, op_ctxt); if (rc) return rc; sem_wait(&x->sem); rc = x->sem_rc; } else - rc = __ldms_remote_lookup(x, path, flags, cb, cb_arg); + rc = __ldms_remote_lookup(x, path, flags, cb, cb_arg, op_ctxt); return rc; } int ldms_xprt_lookup(ldms_t x, const char *path, enum ldms_lookup_flags flags, ldms_lookup_cb_t cb, void *cb_arg) { - return x->ops.lookup(x, path, flags, cb, cb_arg); + int rc; + struct ldms_op_ctxt *op_ctxt = NULL; + + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_LOOKUP; + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->lookup_profile.app_req_ts); + } + rc = x->ops.lookup(x, path, flags, cb, cb_arg, op_ctxt); + if (rc) + free(op_ctxt); + return rc; } -static void __ldms_xprt_stats(ldms_t _x, ldms_xprt_stats_t stats) +static void __ldms_xprt_stats(ldms_t _x, ldms_xprt_stats_t stats, int mask, int is_reset) { + struct ldms_op_ctxt_list *src_list, *dst_list; + struct ldms_op_ctxt *src, *dst; + enum ldms_xprt_ops_e op_e; + + if (!stats) + goto reset; *stats = _x->stats; + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + TAILQ_INIT(&stats->op_ctxt_lists[op_e]); + dst_list = &stats->op_ctxt_lists[op_e]; + src_list = __rail_op_ctxt_list(_x, op_e); + + TAILQ_FOREACH(src, src_list, ent) { + dst = malloc(sizeof(*dst)); + if (!dst) { + ovis_log(NULL, OVIS_LCRIT, "Memory allocation failure.\n"); + return; + } + memcpy(dst, src, sizeof(*dst)); + dst->ent.tqe_next = NULL; + dst->ent.tqe_prev = NULL; + TAILQ_INSERT_TAIL(dst_list, dst, ent); + } + } + reset: + if (!is_reset) + return; + if (mask & LDMS_PERF_M_STATS) { + /* last_op and ops could also be reset by ldms_xprt_rate_data(). */ + /* don't reset the connect/disconnect time */ + memset(&_x->stats.last_op, 0, sizeof(_x->stats.last_op)); + memset(&_x->stats.ops, 0, sizeof(_x->stats.ops)); + } + if (mask & LDMS_PERF_M_PROFILNG) { + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + src_list = __rail_op_ctxt_list(_x, op_e); + while ((src = TAILQ_FIRST(src_list))) { + TAILQ_REMOVE(src_list, src, ent); + free(src); + } + } + } } -void ldms_xprt_stats(ldms_t _x, ldms_xprt_stats_t stats) +void ldms_xprt_stats(ldms_t _x, ldms_xprt_stats_t stats, int mask, int is_reset) { - _x->ops.stats(_x, stats); + _x->ops.stats(_x, stats, mask, is_reset); } static int send_req_notify(ldms_t _x, ldms_set_t s, uint32_t flags, diff --git a/ldms/src/core/ldms_xprt.h b/ldms/src/core/ldms_xprt.h index 7a98300f5..9a61330da 100644 --- a/ldms/src/core/ldms_xprt.h +++ b/ldms/src/core/ldms_xprt.h @@ -281,6 +281,8 @@ struct ldms_rendezvous_lookup_param { uint32_t card; /* card of dict */ uint32_t schema_len; uint32_t array_card; /* card of array */ + // struct timespec req_recv; /* Timestamp when server has received the lookup request. */ + // struct timespec share; /* Timestamp when server has called zap_share(). */ /* schema name, then instance name, and then set_info key value pairs */ char set_info[OVIS_FLEX]; }; @@ -346,6 +348,10 @@ struct ldms_stream_sub_reply { char msg[0]; }; +struct ldms_set_delete_reply { + struct timespec recv_ts; +}; + struct ldms_reply { struct ldms_reply_hdr hdr; union { @@ -354,6 +360,7 @@ struct ldms_reply { struct ldms_auth_challenge_reply auth_challenge; struct ldms_push_reply push; struct ldms_stream_sub_reply sub; + struct ldms_set_delete_reply set_del; }; }; #pragma pack() @@ -377,6 +384,7 @@ struct ldms_context { int rc; struct ldms_xprt *x; ldms_context_type_t type; + struct ldms_op_ctxt *op_ctxt; union { struct { ldms_dir_cb_t cb; @@ -449,13 +457,13 @@ struct ldms_xprt_ops_s { struct sockaddr *remote_sa, socklen_t *sa_len); void (*close)(ldms_t x); - int (*send)(ldms_t x, char *msg_buf, size_t msg_len); + int (*send)(ldms_t x, char *msg_buf, size_t msg_len, struct ldms_op_ctxt *op_ctxt); size_t (*msg_max)(ldms_t x); int (*dir)(ldms_t x, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags); int (*dir_cancel)(ldms_t x); int (*lookup)(ldms_t t, const char *name, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg); - void (*stats)(ldms_t x, ldms_xprt_stats_t stats); + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt); + void (*stats)(ldms_t x, ldms_xprt_stats_t stats, int mask, int is_reset); ldms_t (*get)(ldms_t x); /* ref get */ void (*put)(ldms_t x); /* ref put */ @@ -465,7 +473,8 @@ struct ldms_xprt_ops_s { const char *(*type_name)(ldms_t x); void (*priority_set)(ldms_t x, int prio); void (*cred_get)(ldms_t x, ldms_cred_t lcl, ldms_cred_t rmt); - int (*update)(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg); + int (*update)(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg, + struct ldms_op_ctxt *op_ctxt); int (*get_threads)(ldms_t x, pthread_t *out, int n); @@ -559,4 +568,10 @@ void ldms_xprt_auth_begin(ldms_t xprt); int ldms_xprt_auth_send(ldms_t _x, const char *msg_buf, size_t msg_len); void ldms_xprt_auth_end(ldms_t xprt, int result); +/* ======================== + * LDMS operation profiling + * ======================== + */ +#define ENABLED_PROFILING(_OP_) (__enable_profiling[_OP_] == 1) + #endif diff --git a/ldms/src/ldmsd/ldmsd_request.c b/ldms/src/ldmsd/ldmsd_request.c index 29d9e2f65..4b749381c 100644 --- a/ldms/src/ldmsd/ldmsd_request.c +++ b/ldms/src/ldmsd/ldmsd_request.c @@ -264,6 +264,7 @@ static int update_time_stats_handler(ldmsd_req_ctxt_t reqc); static int set_sec_mod_handler(ldmsd_req_ctxt_t reqc); static int log_status_handler(ldmsd_req_ctxt_t reqc); static int stats_reset_handler(ldmsd_req_ctxt_t reqc); +static int profiling_handler(ldmsd_req_ctxt_t req); /* these are implemented in ldmsd_failover.c */ int failover_config_handler(ldmsd_req_ctxt_t req_ctxt); @@ -565,6 +566,10 @@ static struct request_handler_entry request_handler[] = { LDMSD_SET_DEFAULT_AUTHZ_REQ, set_default_authz_handler, XUG | MOD }, + [LDMSD_PROFILING_REQ] = { + LDMSD_PROFILING_REQ, profiling_handler, XALL + }, + /* FAILOVER user commands */ [LDMSD_FAILOVER_CONFIG_REQ] = { LDMSD_FAILOVER_CONFIG_REQ, failover_config_handler, XUG | MOD, @@ -7056,7 +7061,7 @@ static char *__xprt_stats_as_json(size_t *json_sz, int reset, int level) first = 0; } - ldms_xprt_stats(x, &xs); + ldms_xprt_stats(x, &xs, LDMS_PERF_M_STATS, reset); xprt_count += 1; switch (ep_state) { @@ -7228,6 +7233,254 @@ static int xprt_stats_handler(ldmsd_req_ctxt_t req) return ENOMEM; } +double __ts2double(struct timespec ts) +{ + return ts.tv_sec + ((double)ts.tv_nsec)/1000000000.0; +} + +json_t *__ldms_op_profiling_as_json(struct ldms_op_ctxt *xc, enum ldms_xprt_ops_e op_e) +{ + json_t *stat; + stat = json_object(); + switch (op_e) { + case LDMS_XPRT_OP_LOOKUP: + json_object_set_new(stat, "app_req", + json_real(__ts2double(xc->lookup_profile.app_req_ts))); + json_object_set_new(stat, "req_send", + json_real(__ts2double(xc->lookup_profile.req_send_ts))); + json_object_set_new(stat, "req_recv", + json_real(__ts2double(xc->lookup_profile.req_recv_ts))); + json_object_set_new(stat, "share", + json_real(__ts2double(xc->lookup_profile.share_ts))); + json_object_set_new(stat, "rendzv", + json_real(__ts2double(xc->lookup_profile.rendzv_ts))); + json_object_set_new(stat, "read", + json_real(__ts2double(xc->lookup_profile.read_ts))); + json_object_set_new(stat, "complete", + json_real(__ts2double(xc->lookup_profile.complete_ts))); + json_object_set_new(stat, "deliver", + json_real(__ts2double(xc->lookup_profile.deliver_ts))); + break; + case LDMS_XPRT_OP_UPDATE: + json_object_set_new(stat, "app_req", + json_real(__ts2double(xc->update_profile.app_req_ts))); + json_object_set_new(stat, "read_start", + json_real(__ts2double(xc->update_profile.read_ts))); + json_object_set_new(stat, "read_complete", + json_real(__ts2double(xc->update_profile.read_complete_ts))); + json_object_set_new(stat, "deliver", + json_real(__ts2double(xc->update_profile.deliver_ts))); + break; + case LDMS_XPRT_OP_SEND: + json_object_set_new(stat, "app_req", + json_real(__ts2double(xc->send_profile.app_req_ts))); + json_object_set_new(stat, "send", + json_real(__ts2double(xc->send_profile.send_ts))); + json_object_set_new(stat, "complete", + json_real(__ts2double(xc->send_profile.complete_ts))); + json_object_set_new(stat, "deliver", + json_real(__ts2double(xc->send_profile.deliver_ts))); + break; + case LDMS_XPRT_OP_SET_DELETE: + json_object_set_new(stat, "send", + json_real(__ts2double(xc->set_del_profile.send_ts))); + json_object_set_new(stat, "recv", + json_real(__ts2double(xc->set_del_profile.recv_ts))); + json_object_set_new(stat, "acknowledge", + json_real(__ts2double(xc->set_del_profile.ack_ts))); + break; + case LDMS_XPRT_OP_STREAM_PUBLISH: + json_object_set_new(stat, "hop_cnt", + json_integer(xc->stream_pub_profile.hop_num)); + json_object_set_new(stat, "recv", + json_real(__ts2double(xc->stream_pub_profile.recv_ts))); + json_object_set_new(stat, "send", + json_real(__ts2double(xc->stream_pub_profile.send_ts))); + break; + default: + break; + } + return stat; +} + +int __stream_profiling_as_json(json_t **_jobj, int is_reset) { + json_t *jobj, *strm_jobj, *src_jobj, *hop_jobj, *prf_array, *prf_jobj; + struct ldms_stream_stats_tq_s *tq; + struct ldms_stream_stats_s *ss; + struct ldms_stream_src_stats_s *strm_src; + struct ldms_stream_profile_ent *prf; + struct ldms_addr addr; + char addr_buf[128] = ""; + struct rbn *rbn; + int i, rc = 0; + + jobj = json_object(); + tq = ldms_stream_stats_tq_get(NULL, 0, is_reset); + if (!tq) { + /* no stream ... nothing to do here. */ + goto out; + } + TAILQ_FOREACH(ss, tq, entry) { + strm_jobj = json_object(); + + RBT_FOREACH(rbn, &ss->src_stats_rbt) { + src_jobj = json_array(); + + strm_src = container_of(rbn, struct ldms_stream_src_stats_s, rbn); + addr = strm_src->src; + ldms_addr_ntop(&addr, addr_buf, sizeof(addr_buf)); + TAILQ_FOREACH(prf, &strm_src->profiles, ent) { + hop_jobj = json_object(); + json_object_set_new(hop_jobj, "hop_count", json_integer(prf->profiles.hop_cnt)); + prf_array = json_array(); + json_object_set_new(hop_jobj, "profile", prf_array); + for (i = 0; i < prf->profiles.hop_cnt; i++) { + prf_jobj = json_object(); + json_object_set_new(prf_jobj, "recv", + json_real(__ts2double(prf->profiles.hops[i].recv_ts))); + json_object_set_new(prf_jobj, "deliver", + json_real(__ts2double(prf->profiles.hops[i].send_ts))); + json_array_append_new(prf_array, prf_jobj); + } + json_array_append_new(src_jobj, hop_jobj); + } + json_object_set_new(strm_jobj, addr_buf, src_jobj); + } + json_object_set_new(jobj, ss->name, strm_jobj); + } + out: + *_jobj = jobj; + return rc; +} + +int __xprt_profiling_as_json(json_t **_obj, int is_reset) +{ + json_t *obj, *ep_prf, *op_prf; + ldms_t x; + struct ldms_xprt_stats stats; + struct ldms_op_ctxt *xc; + int rc; + enum ldms_xprt_ops_e op_e; + char lhostname[128], lport_no[32], rhostname[128], rport_no[32], name[161]; + + + obj = json_object(); + if (!obj) { + ovis_log(config_log, OVIS_LCRIT, "Memory allocation failure\n"); + return ENOMEM; + } + for (x = ldms_xprt_first(); x; x = ldms_xprt_next(x)) { + rc = ldms_xprt_names(x, lhostname, sizeof(lhostname), + lport_no, sizeof(lport_no), + rhostname, sizeof(rhostname), + rport_no, sizeof(rport_no), + NI_NAMEREQD | NI_NUMERICSERV); + if (rc) { + if (rc == ENOTCONN) + continue; + } + + ldms_xprt_stats(x, &stats, LDMS_PERF_M_PROFILNG, is_reset); + snprintf(name, 160, "%s:%s", rhostname, rport_no); + ep_prf = json_object(); + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + op_prf = json_array(); + TAILQ_FOREACH(xc, &stats.op_ctxt_lists[op_e], ent) { + json_array_append_new(op_prf, __ldms_op_profiling_as_json(xc, op_e)); + } + json_object_set_new(ep_prf, ldms_xprt_op_names[op_e], op_prf); + + } + json_object_set_new(obj, name, ep_prf); + } + *_obj = obj; + return 0; +} + +static int profiling_handler(ldmsd_req_ctxt_t req) +{ + json_t *obj, *xprt_prf, *strm_prf; + char *json_as_str; + int rc = 0; + struct ldmsd_req_attr_s attr; + size_t str_len; + char *enable_str, *reset_str; + int is_enable = -1; /* -1 means only getting the profile data, don't enable/disable */ + int is_reset = 0; + + enable_str = ldmsd_req_attr_str_value_get_by_id(req, LDMSD_ATTR_TYPE); + if (enable_str) { + is_enable = 1; + if (0 == strcasecmp(enable_str, "false")) + is_enable = 0; /* disable */ + } + reset_str = ldmsd_req_attr_str_value_get_by_id(req, LDMSD_ATTR_RESET); + if (reset_str) { + is_reset = 1; + if (0 == strcasecmp(reset_str, "false")) + is_reset = 0; + } + + if (is_enable == 1) { + ldms_profiling_enable(-1, NULL, NULL); + } else if (is_enable == 0) { + ldms_profiling_disable(-1, NULL, NULL); + } + + /* + * The output JSON object looks like this: + * + * { + * "xprt": { + * : { + * "lookup": , + * "update": , + * "send": + * }, + * ... + * }, + * "stream" : { + * : , + * ... + * } + * } + */ + obj = json_object(); + (void)__xprt_profiling_as_json(&xprt_prf, is_reset); + json_object_set_new(obj, "xprt", xprt_prf); + + (void)__stream_profiling_as_json(&strm_prf, is_reset); + json_object_set_new(obj, "stream", strm_prf); + + json_as_str = json_dumps(obj, JSON_INDENT(0)); + str_len = strlen(json_as_str) + 1; /* +1 for \0 */ + + attr.discrim = 1; + attr.attr_id = LDMSD_ATTR_JSON; + attr.attr_len = str_len; + ldmsd_hton_req_attr(&attr); + + if (ldmsd_append_reply(req, (const char *)&attr, sizeof(attr), LDMSD_REQ_SOM_F)) + goto err; + + if (ldmsd_append_reply(req, json_as_str, str_len, 0)) + goto err; + + attr.discrim = 0; + if (ldmsd_append_reply(req, (const char *)&attr.discrim, sizeof(attr.discrim), LDMSD_REQ_EOM_F)) + goto err; + + free(obj); + free(json_as_str); + return 0; +err: + free(obj); + free(json_as_str); + req->errcode = rc; + ldmsd_send_req_response(req, "Failed to get ldms_xprt's probe data"); + return ENOMEM; +} + struct store_time_thread { pid_t tid; uint64_t store_time; diff --git a/ldms/src/ldmsd/ldmsd_request.h b/ldms/src/ldmsd/ldmsd_request.h index 5dab3a501..39fdfd803 100644 --- a/ldms/src/ldmsd/ldmsd_request.h +++ b/ldms/src/ldmsd/ldmsd_request.h @@ -165,6 +165,7 @@ enum ldmsd_request { LDMSD_DEFAULT_QUOTA_REQ, LDMSD_PID_FILE_REQ, LDMSD_BANNER_MODE_REQ, + LDMSD_PROFILING_REQ, /* failover requests by user */ LDMSD_FAILOVER_CONFIG_REQ = 0x700, /* "failover_config" user command */ diff --git a/ldms/src/ldmsd/ldmsd_request_util.c b/ldms/src/ldmsd/ldmsd_request_util.c index 40b623e9c..710232a9d 100644 --- a/ldms/src/ldmsd/ldmsd_request_util.c +++ b/ldms/src/ldmsd/ldmsd_request_util.c @@ -123,6 +123,7 @@ struct req_str_id req_str_id_table[] = { { "prdcr_stream_status",LDMSD_PRDCR_STREAM_STATUS_REQ }, { "prdcr_subscribe", LDMSD_PRDCR_SUBSCRIBE_REQ }, { "prdcr_unsubscribe", LDMSD_PRDCR_UNSUBSCRIBE_REQ }, + { "profiling", LDMSD_PROFILING_REQ }, { "publish_kernel", LDMSD_PUBLISH_KERNEL_REQ }, { "qgroup_config", LDMSD_QGROUP_CONFIG_REQ }, { "qgroup_info", LDMSD_QGROUP_INFO_REQ }, @@ -329,6 +330,7 @@ const char *ldmsd_req_id2str(enum ldmsd_request req_id) case LDMSD_CMDLINE_OPTIONS_SET_REQ : return "CMDLINE_OPTION_SET_REQ"; case LDMSD_SET_SEC_MOD_REQ : return "SET_SEC_REQ"; case LDMSD_LOG_STATUS_REQ : return "LOG_STATUS_REQ"; + case LDMSD_PROFILING_REQ : return "PROFILING_REQ"; /* failover requests by user */ case LDMSD_FAILOVER_CONFIG_REQ : return "FAILOVER_CONFIG_REQ";