From c6fb393b2f4dc6aa19adea14136e6d68aa8811dd Mon Sep 17 00:00:00 2001 From: Nichamon Naksinehaboon Date: Thu, 22 Aug 2024 14:22:33 -0500 Subject: [PATCH] Profiling LDMS Operations This commit introduces profiling capabilities to LDMS operations, including lookup, update, set_delete, and stream_publish. It collects timestamps for key events, such as API calls, request/response exchanges, and completion notifications. The feature is disabled by default but can be configured using the 'profilng' configuration command to enable, disable, or reset data collection. This enhancement will aid in performance analysis and optimization. --- ldms/python/ldmsd/Makefile.am | 2 +- ldms/python/ldmsd/ldmsd_communicator.py | 21 ++ ldms/python/ldmsd/ldmsd_controller | 25 ++- ldms/python/ldmsd/ldmsd_profiling.py | 45 +++++ ldms/src/core/ldms.c | 95 ++++++++- ldms/src/core/ldms.h | 99 ++++++++- ldms/src/core/ldms_private.h | 11 +- ldms/src/core/ldms_rail.c | 138 +++++++++++-- ldms/src/core/ldms_rail.h | 10 + ldms/src/core/ldms_stream.c | 170 ++++++++++++++-- ldms/src/core/ldms_stream.h | 12 +- ldms/src/core/ldms_xprt.c | 241 ++++++++++++++++++++-- ldms/src/core/ldms_xprt.h | 23 ++- ldms/src/ldmsd/ldmsd_request.c | 255 +++++++++++++++++++++++- ldms/src/ldmsd/ldmsd_request.h | 1 + ldms/src/ldmsd/ldmsd_request_util.c | 2 + 16 files changed, 1080 insertions(+), 70 deletions(-) create mode 100644 ldms/python/ldmsd/ldmsd_profiling.py diff --git a/ldms/python/ldmsd/Makefile.am b/ldms/python/ldmsd/Makefile.am index f4dd661dd4..25fc2d9031 100644 --- a/ldms/python/ldmsd/Makefile.am +++ b/ldms/python/ldmsd/Makefile.am @@ -1,3 +1,3 @@ pkgpythondir=${pythondir}/ldmsd -pkgpython_PYTHON = __init__.py ldmsd_setup.py ldmsd_util.py ldmsd_communicator.py ldmsd_config.py +pkgpython_PYTHON = __init__.py ldmsd_setup.py ldmsd_util.py ldmsd_communicator.py ldmsd_config.py ldmsd_profiling.py dist_bin_SCRIPTS = ldmsd_controller diff --git a/ldms/python/ldmsd/ldmsd_communicator.py b/ldms/python/ldmsd/ldmsd_communicator.py index ed6ad1e64f..fda45f9206 100644 --- a/ldms/python/ldmsd/ldmsd_communicator.py +++ b/ldms/python/ldmsd/ldmsd_communicator.py @@ -160,6 +160,7 @@ 'set_sec_mod' : {'req_attr': ['regex'], 'opt_attr': ['uid', 'gid', 'perm']}, 'log_status' : {'req_attr' : [], 'opt_attr' : ['name']}, 'stats_reset' : {'req_attr' : [], 'opt_attr' : ['list']}, + 'profiling' : {'req_attr' : [], 'opt_attr' : ['enable', 'reset']}, ##### Failover. ##### 'failover_config': { 'req_attr': [ @@ -616,6 +617,7 @@ class LDMSD_Request(object): SET_SEC_MOD = 0x600 + 19 LOG_STATUS = 0x600 + 20 STATS_RESET = 0x600 + 21 + PROFILING = 0x600 + 31 FAILOVER_CONFIG = 0x700 FAILOVER_PEERCFG_START = 0x700 + 1 @@ -731,6 +733,7 @@ class LDMSD_Request(object): 'failover_stop' : {'id' : FAILOVER_STOP}, 'set_route' : {'id': SET_ROUTE}, 'xprt_stats' : {'id' : XPRT_STATS}, + 'profiling' : {'id' : PROFILING}, 'thread_stats' : {'id' : THREAD_STATS}, 'prdcr_stats' : {'id' : PRDCR_STATS}, 'set_stats' : {'id' : SET_STATS}, @@ -3386,6 +3389,24 @@ def xprt_stats(self, reset=False, level=0): self.close() return errno.ENOTCONN, str(e) + def profiling(self, enable = None, reset = None): + attrs = [] + if enable is not None: + attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.TYPE, + value = enable)) + if reset is not None: + attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.RESET, + value = reset)) + req = LDMSD_Request( + command_id=LDMSD_Request.PROFILING, attrs=attrs) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + self.close() + return errno.ENOTCONN, str(e) + def thread_stats(self, reset=False): """Query the daemon's I/O thread utilization data""" if reset is None: diff --git a/ldms/python/ldmsd/ldmsd_controller b/ldms/python/ldmsd/ldmsd_controller index 8d15cf0ba0..a4c854c57e 100755 --- a/ldms/python/ldmsd/ldmsd_controller +++ b/ldms/python/ldmsd/ldmsd_controller @@ -2407,6 +2407,30 @@ class LdmsdCmdParser(cmd.Cmd): def complete_xprt_stats(self, text, line, begidx, endidx): return self.__complete_attr_list('xprt_stats', text) + def do_profiling(self, arg): + """ + Enable/disable and query the LDMS operation profiling data + + The command was intended for diagnostic or study to improve ldmsd performance. + + The command always reports the cached profiling data if exists. + + Parameters: + [enabled=] True to enable LDMS profiling + [reset=] True to reset and free cached profiling data after the report + """ + arg = self.handle_args('profiling', arg) + if not arg: + return + rc, msg = self.comm.profiling(**arg) + if msg == "": + return + if rc != 0: + print(f"Error: {rc} {msg}") + return + stats = fmt_status(msg) + print(stats) + def do_updtr_task(self, arg): """ Report the updater tasks @@ -3209,7 +3233,6 @@ class LdmsdCmdParser(cmd.Cmd): cmd, arg = line[:i], line[i:].strip() return cmd, arg, line - if __name__ == "__main__": is_debug = True try: diff --git a/ldms/python/ldmsd/ldmsd_profiling.py b/ldms/python/ldmsd/ldmsd_profiling.py new file mode 100644 index 0000000000..73b408f2c1 --- /dev/null +++ b/ldms/python/ldmsd/ldmsd_profiling.py @@ -0,0 +1,45 @@ +import json +import pandas as pd + +from ldmsd.ldmsd_communicator import * + +def profiling_as_json(xprt, host, port): + comm = Communicator(xprt=xprt, host=host, port=port) + comm.connect() + o = comm.profiling() + return json.loads(o[1]) + +def get_hosts(o): + return o['xprt'].keys() + +def get_streams(o): + return o['stream'].keys() + +def lookup_df(o, host): + df = pd.DataFrame(o['xprt'][host]['LOOKUP']) + return df + +def update_df(o, host): + df = pd.DataFrame(o['xprt'][host]['UPDATE']) + return df + +def send_df(o, host): + df = pd.DataFrame(o['xprt'][host]['SEND']) + return df + +def set_delete_df(o, host): + df = pd.DataFrame(o['xprt'][host]['SET_DELETE']) + return df + +def stream_publish_df(o, host): + df = pd.DataFrame(o['xprt'][host]['STREAM_PUBLISH']) + return df + +def stream_by_stream_df(o, stream_name = None, src = None): + d = o['stream'] + if stream_name is not None: + d = d[stream_name] + if src is not None: + d = d[src] + df = pd.DataFrame(d) + return df diff --git a/ldms/src/core/ldms.c b/ldms/src/core/ldms.c index f8ccdabd2d..1036d5d49e 100644 --- a/ldms/src/core/ldms.c +++ b/ldms/src/core/ldms.c @@ -100,9 +100,79 @@ const char *ldms_xprt_op_names[] = { "DIR_REP", "SEND", "RECV", + "STREAM_PUBLISH", + "STREAM_SUBSCRIBE", + "STREAM_UNSUBSCRIBE" }; static char *type_names[]; +/* -ENOSYS means that LDMS doesn't support profiling for those operation. */ +int enable_profiling[LDMS_XPRT_OP_COUNT] = { + 0, /* lookup */ + 0, /* update */ + -ENOSYS, /* Publish */ + 0, /* set_delete */ + -ENOSYS, /* dir_req */ + -ENOSYS, /* dir_rep */ + 0, /* send */ + -ENOSYS, /* receive */ + 0, /* stream_publish */ + -ENOSYS, /* stream_subscribe */ + -ENOSYS /* stream_unsubscribe */ +}; + +int ldms_profiling_enable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err) +{ + int i; + int rc = 0; + + if (ops_cnt < 0) { + for (i = 0; i < LDMS_XPRT_OP_COUNT; i++) { + if (enable_profiling[i] != ENOSYS) + enable_profiling[i] = 1; + } + } else { + if (ops_err) + bzero(ops_err, sizeof(int) * ops_cnt); + for (i = 0; i < ops_cnt; i++) { + if (enable_profiling[ops[i]] == -ENOSYS) { + rc = -1; + if (ops_err) + ops_err[i] = ENOSYS; + } else { + enable_profiling[ops[i]] = 1; + } + } + } + return rc; +} + +int ldms_profiling_disable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err) +{ + int i; + int rc = 0; + + if (ops_cnt < 0) { + for (i = 0; i < LDMS_XPRT_OP_COUNT; i++) { + if (enable_profiling[i] != ENOSYS) + enable_profiling[i] = 0; + } + } else { + if (ops_err) + bzero(ops_err, sizeof(int) * ops_cnt); + for (i = 0; i < ops_cnt; i++) { + if (enable_profiling[ops[i]] == -ENOSYS) { + rc = -1; + if (ops_err) + ops_err[i] = ENOSYS; + } else { + enable_profiling[ops[i]] = 0; + } + } + } + return rc; +} + static struct ldms_digest_s null_digest; /* This function is useful for displaying data structures stored in @@ -739,7 +809,8 @@ static void sync_update_cb(ldms_t x, ldms_set_t s, int status, void *arg) sem_post(&x->sem); } -int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg) +int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, + void *arg, struct ldms_op_ctxt *op_ctxt) { ldms_t xprt = ldms_xprt_get(x); int rc; @@ -777,6 +848,21 @@ int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void ldms_t __ldms_xprt_to_rail(ldms_t x); int ldms_xprt_update(struct ldms_set *set, ldms_update_cb_t cb, void *arg) { + int rc; + struct ldms_op_ctxt *op_ctxt = NULL; + + assert(set); + + if (set->curr_updt_ctxt) + return EBUSY; + + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_UPDATE; + (void)clock_gettime(CLOCK_REALTIME, &(op_ctxt->update_profile.app_req_ts)); + } /* * We convert the transport handle to a rail handle using * __ldms_xprt_to_rail() and pass it to x->ops.update(). @@ -786,7 +872,12 @@ int ldms_xprt_update(struct ldms_set *set, ldms_update_cb_t cb, void *arg) * when the update completes. */ ldms_t r = __ldms_xprt_to_rail(set->xprt); - return r->ops.update(r, set, cb, arg); + rc = r->ops.update(r, set, cb, arg, op_ctxt); + if (rc) { + set->curr_updt_ctxt = NULL; + free(op_ctxt); + } + return rc; } void __ldms_set_on_xprt_term(ldms_set_t set, ldms_t xprt) diff --git a/ldms/src/core/ldms.h b/ldms/src/core/ldms.h index d9dc89902f..d1f7cfd851 100644 --- a/ldms/src/core/ldms.h +++ b/ldms/src/core/ldms.h @@ -944,6 +944,7 @@ int ldms_xprt_addr(ldms_t x, struct ldms_addr *local_addr, const char *ldms_sockaddr_ntop(struct sockaddr *sa, char *buff, size_t sz); const char *ldms_addr_ntop(struct ldms_addr *addr, char *buff, size_t sz); + /** * \brief Convert a CIDR IP address string to \c ldms_addr * @@ -1476,6 +1477,8 @@ struct ldms_stream_close_event_s { typedef struct ldms_stream_event_s { ldms_t r; /* rail */ enum ldms_stream_event_type type; + struct timespec recv_ts; + uint32_t hop_num; union { struct ldms_stream_recv_data_s recv; struct ldms_stream_return_status_s status; @@ -1567,11 +1570,28 @@ struct ldms_stream_counters_s { *(p) = LDMS_STREAM_COUNTERS_INITIALIZER; \ } while (0) +struct ldms_stream_hop { + struct timespec recv_ts; + struct timespec send_ts; +}; + +#define STREAM_MAX_PROFILE_HOPS 8 +struct ldms_stream_profile { + uint32_t hop_cnt; + struct ldms_stream_hop hops[OVIS_FLEX]; +}; +struct ldms_stream_profile_ent { + TAILQ_ENTRY(ldms_stream_profile_ent) ent; + struct ldms_stream_profile profiles; +}; +TAILQ_HEAD(ldms_stream_profile_list, ldms_stream_profile_ent); + /* stream statistics by src */ struct ldms_stream_src_stats_s { struct rbn rbn; /* key ==> src */ struct ldms_addr src; struct ldms_stream_counters_s rx; /* total rx from src */ + struct ldms_stream_profile_list profiles; }; /* stats of stream-client pair */ @@ -2068,9 +2088,74 @@ typedef enum ldms_xprt_ops_e { LDMS_XPRT_OP_DIR_REP, LDMS_XPRT_OP_SEND, LDMS_XPRT_OP_RECV, + LDMS_XPRT_OP_STREAM_PUBLISH, + LDMS_XPRT_OP_STREAM_SUBSCRIBE, + LDMS_XPRT_OP_STREAM_UNSUBSCRIBE, LDMS_XPRT_OP_COUNT } ldms_xprt_ops_t; +struct ldms_op_ctxt { + enum ldms_xprt_ops_e op_type; + union { + struct lookup_profile_s { + struct timespec app_req_ts; + struct timespec req_send_ts; + struct timespec req_recv_ts; + struct timespec share_ts; + struct timespec rendzv_ts; + struct timespec read_ts; + struct timespec complete_ts; + struct timespec deliver_ts; + } lookup_profile; + struct update_profile { + struct timespec app_req_ts; + struct timespec read_ts; + struct timespec read_complete_ts; + struct timespec deliver_ts; + } update_profile; + struct set_delete_profile_s { + struct timespec send_ts; + struct timespec recv_ts; + struct timespec ack_ts; + } set_del_profile; + struct send_profile_s { + struct timespec app_req_ts; + struct timespec send_ts; + struct timespec complete_ts; + struct timespec deliver_ts; + } send_profile; + struct strm_publish_profile_s { + uint32_t hop_num; + struct timespec recv_ts; + struct timespec send_ts; /* to remote client */ + } stream_pub_profile; + }; + TAILQ_ENTRY(ldms_op_ctxt) ent; +}; +TAILQ_HEAD(ldms_op_ctxt_list, ldms_op_ctxt); + +/** + * TODO: Finish the document. + * Enable/disable LDMS operations' profiling + * + * If profiling is enabled, LDMS collects the following timestamps: + * for LOOKUP: when ldms_xprt_lookup() is called, + * when LDMS sends the lookup request to the peer, + * when the peer receive ... + * + * \param ops_cnt Number of operations in \c ops. + * -1 to enable/disable profiling of all operations + * \param ops Array of operations to enable their profiling + * \param ops_err Array to store an error of each given operation + * + * \return 0 on success; Otherwise, -1 is given. + * In this case, an error code will be assigned in the \c ops_err + * + * TODO: List the possible error code in the returning array and its meaning + */ +int ldms_profiling_enable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err); +int ldms_profiling_disable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err); + extern const char *ldms_xprt_op_names[]; struct ldms_xprt_rate_data { @@ -2178,15 +2263,25 @@ typedef struct ldms_xprt_stats { struct timespec disconnected; struct timespec last_op; struct ldms_stats_entry ops[LDMS_XPRT_OP_COUNT]; + struct ldms_op_ctxt_list op_ctxt_lists[LDMS_XPRT_OP_COUNT]; } *ldms_xprt_stats_t; +#define LDMS_PERF_M_STATS 1 +#define LDMS_PERF_M_PROFILNG 2 +#define LDMS_PERF_M_ALL LDMS_PERF_M_STATS | LDMS_PERF_M_PROFILNG + /** * \brief Retrieve transport request statistics * + * The function gets the statistics and then reset it if \c reset is not 0. + * To only reset the statistics, \c stats must be NULL. + * * \param x The transport handle - * \param s Pointer to an ldms_xprt_stats structure + * \param stats Pointer to an ldms_xprt_stats structure + * \param reset Reset the statistics after getting the statistics if not 0 + * */ -extern void ldms_xprt_stats(ldms_t x, ldms_xprt_stats_t stats); +extern void ldms_xprt_stats(ldms_t x, ldms_xprt_stats_t stats, int mask, int reset); /* * Metric template for: diff --git a/ldms/src/core/ldms_private.h b/ldms/src/core/ldms_private.h index 6be464a3ca..4e1523e7d1 100644 --- a/ldms/src/core/ldms_private.h +++ b/ldms/src/core/ldms_private.h @@ -149,6 +149,14 @@ struct ldms_set { struct ldms_context *notify_ctxt; /* Notify req context */ ldms_heap_t heap; struct ldms_heap_instance heap_inst; + + /* + * A handle of the context of the current update operation on the set. + * + * curr_updt_ctxt is NULL when there is no outstanding update on the set. + * ldms doesn't update the set while there is an outstanding update on the set. + */ + struct ldms_op_ctxt *curr_updt_ctxt; }; /* Convenience macro to roundup a value to a multiple of the _s parameter */ @@ -157,7 +165,8 @@ struct ldms_set { extern int __ldms_xprt_push(ldms_set_t s, int push_flags); extern int __ldms_remote_lookup(ldms_t _x, const char *path, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg); + ldms_lookup_cb_t cb, void *cb_arg, + struct ldms_op_ctxt *op_ctxt); extern int __ldms_remote_dir(ldms_t x, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags); extern int __ldms_remote_dir_cancel(ldms_t x); extern struct ldms_set * diff --git a/ldms/src/core/ldms_rail.c b/ldms/src/core/ldms_rail.c index 16d8ee9a32..2e16f455af 100644 --- a/ldms/src/core/ldms_rail.c +++ b/ldms/src/core/ldms_rail.c @@ -76,6 +76,9 @@ extern ovis_log_t xlog; ovis_log(xlog, OVIS_LERROR, fmt, ## __VA_ARGS__); \ } while (0); +/* The definition is in ldms.c. */ +extern int enable_profiling[LDMS_XPRT_OP_COUNT]; + static int __rail_connect(ldms_t _r, struct sockaddr *sa, socklen_t sa_len, ldms_event_cb_t cb, void *cb_arg); static int __rail_is_connected(ldms_t _r); @@ -86,13 +89,14 @@ static int __rail_sockaddr(ldms_t _r, struct sockaddr *local_sa, struct sockaddr *remote_sa, socklen_t *sa_len); static void __rail_close(ldms_t _r); -static int __rail_send(ldms_t _r, char *msg_buf, size_t msg_len); +static int __rail_send(ldms_t _r, char *msg_buf, size_t msg_len, + struct ldms_op_ctxt *op_ctxt); static size_t __rail_msg_max(ldms_t x); static int __rail_dir(ldms_t _r, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags); static int __rail_dir_cancel(ldms_t _r); static int __rail_lookup(ldms_t _r, const char *name, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg); -static void __rail_stats(ldms_t _r, ldms_xprt_stats_t stats); + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt); +static void __rail_stats(ldms_t _r, ldms_xprt_stats_t stats, int mask, int is_reset); static ldms_t __rail_get(ldms_t _r); /* ref get */ static void __rail_put(ldms_t _r); /* ref put */ @@ -102,7 +106,8 @@ static uint64_t __rail_conn_id(ldms_t _r); static const char *__rail_type_name(ldms_t _r); static void __rail_priority_set(ldms_t _r, int prio); static void __rail_cred_get(ldms_t _r, ldms_cred_t lcl, ldms_cred_t rmt); -static int __rail_update(ldms_t _r, struct ldms_set *set, ldms_update_cb_t cb, void *arg); +static int __rail_update(ldms_t _r, struct ldms_set *set, ldms_update_cb_t cb, void *arg, + struct ldms_op_ctxt *op_ctxt); static int __rail_get_threads(ldms_t _r, pthread_t *out, int n); static ldms_set_t __rail_set_by_name(ldms_t x, const char *set_name); @@ -199,6 +204,7 @@ ldms_t ldms_xprt_rail_new(const char *xprt_name, ldms_rail_t r; zap_t zap; int i; + enum ldms_xprt_ops_e op_e; if (n <= 0) { errno = EINVAL; @@ -228,6 +234,7 @@ ldms_t ldms_xprt_rail_new(const char *xprt_name, r->recv_quota = recv_quota; r->recv_rate_limit = rate_limit; rbt_init(&r->stream_client_rbt, __str_rbn_cmp); + snprintf(r->name, sizeof(r->name), "%s", xprt_name); snprintf(r->auth_name, sizeof(r->auth_name), "%s", auth_name); if (auth_av_list) { @@ -248,6 +255,9 @@ ldms_t ldms_xprt_rail_new(const char *xprt_name, r->eps[i].remote_is_rail = -1; rbt_init(&r->eps[i].sbuf_rbt, __stream_buf_cmp); TAILQ_INIT(&r->eps[i].sbuf_tq); + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + TAILQ_INIT(&(r->eps[i].op_ctxt_lists[op_e])); + } } zap = __ldms_zap_get(xprt_name); @@ -1066,20 +1076,31 @@ int __rail_rep_send_raw(struct ldms_rail_ep_s *rep, void *data, int len) return rc; } -static int __rail_send(ldms_t _r, char *msg_buf, size_t msg_len) +static int __rail_send(ldms_t _r, char *msg_buf, size_t msg_len, + struct ldms_op_ctxt *op_ctxt) { /* send over ep0 for now */ ldms_rail_t r = (ldms_rail_t)_r; int rc; struct ldms_rail_ep_s *rep; /* an endpoint inside the rail */ + pthread_mutex_lock(&r->mutex); if (r->eps[0].state != LDMS_RAIL_EP_CONNECTED) { rc = ENOTCONN; goto out; } rep = &r->eps[0]; - rc = ldms_xprt_send(rep->ep, msg_buf, msg_len); + + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + TAILQ_INSERT_TAIL(&(rep->op_ctxt_lists[LDMS_XPRT_OP_SEND]), + op_ctxt, ent); + } + rc = rep->ep->ops.send(rep->ep, msg_buf, msg_len, op_ctxt); if (rc) { + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + TAILQ_REMOVE(&(rep->op_ctxt_lists[LDMS_XPRT_OP_SEND]), + op_ctxt, ent); + } /* release the acquired quota if send failed */ __rep_quota_release(rep, msg_len); } @@ -1145,24 +1166,29 @@ struct ldms_rail_lookup_ctxt_s { ldms_lookup_cb_t app_cb; void *cb_arg; enum ldms_lookup_flags flags; + struct ldms_op_ctxt *op_ctxt; } *ldms_rail_lookup_ctxt_t; void __rail_lookup_cb(ldms_t x, enum ldms_lookup_status status, int more, ldms_set_t s, void *arg) { ldms_rail_lookup_ctxt_t lc = arg; + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + (void)clock_gettime(CLOCK_REALTIME, &lc->op_ctxt->lookup_profile.deliver_ts); + } lc->app_cb((void*)lc->r, status, more, s, lc->cb_arg); if (!more) free(lc); } static int __rail_lookup(ldms_t _r, const char *name, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg) + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt) { ldms_rail_t r = (ldms_rail_t)_r; int rc; struct ldms_rail_ep_s *rep; ldms_rail_lookup_ctxt_t lc; + pthread_mutex_lock(&r->mutex); if (r->state != LDMS_RAIL_EP_CONNECTED) { rc = ENOTCONN; @@ -1177,19 +1203,27 @@ static int __rail_lookup(ldms_t _r, const char *name, enum ldms_lookup_flags fla lc->app_cb = cb; lc->cb_arg = cb_arg; lc->flags = flags; + lc->op_ctxt = op_ctxt; rep = &r->eps[r->lookup_rr++]; r->lookup_rr %= r->n_eps; - rc = ldms_xprt_lookup(rep->ep, name, flags, __rail_lookup_cb, lc); + + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + TAILQ_INSERT_TAIL(&(rep->op_ctxt_lists[LDMS_XPRT_OP_LOOKUP]), op_ctxt, ent); + } + rc = rep->ep->ops.lookup(rep->ep, name, flags, __rail_lookup_cb, lc, op_ctxt); if (rc) { /* synchronous error */ free(lc); + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + TAILQ_REMOVE(&rep->op_ctxt_lists[LDMS_XPRT_OP_LOOKUP], op_ctxt, ent); + } } out: pthread_mutex_unlock(&r->mutex); return rc; } -static void __rail_stats(ldms_t _r, ldms_xprt_stats_t stats) +static void __rail_stats(ldms_t _r, ldms_xprt_stats_t stats, int mask, int is_reset) { /* TODO IMPLEMENT ME */ assert(0 == "Not Implemented"); @@ -1263,17 +1297,24 @@ void __rail_update_cb(ldms_t x, ldms_set_t s, int flags, void *arg) { struct ldms_rail_ep_s *rep = ldms_xprt_ctxt_get(x); ldms_rail_update_ctxt_t uc = arg; + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + struct ldms_op_ctxt *op_ctxt = s->curr_updt_ctxt; + + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->update_profile.deliver_ts); + s->curr_updt_ctxt = NULL; + } uc->app_cb((ldms_t)rep->rail, s, flags, uc->cb_arg); if (!(flags & LDMS_UPD_F_MORE)) { free(uc); } } -static int __rail_update(ldms_t _r, struct ldms_set *set, - ldms_update_cb_t cb, void *arg) +static int __rail_update(ldms_t _r, struct ldms_set *set, ldms_update_cb_t cb, + void *arg, struct ldms_op_ctxt *op_ctxt) { ldms_rail_t r = (void*)_r; ldms_rail_update_ctxt_t uc; + struct ldms_rail_ep_s *rep; int rc; uc = calloc(1, sizeof(*uc)); @@ -1282,9 +1323,19 @@ static int __rail_update(ldms_t _r, struct ldms_set *set, uc->r = r; uc->app_cb = cb; uc->cb_arg = arg; - rc = set->xprt->ops.update(set->xprt, set, __rail_update_cb, uc); + + rep = ldms_xprt_ctxt_get(set->xprt); + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + TAILQ_INSERT_TAIL(&(rep->op_ctxt_lists[LDMS_XPRT_OP_UPDATE]), op_ctxt, ent); + set->curr_updt_ctxt = op_ctxt; + } + rc = set->xprt->ops.update(set->xprt, set, __rail_update_cb, uc, op_ctxt); if (rc) { /* synchronously error, clean up the context */ + set->curr_updt_ctxt = NULL; + + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) + TAILQ_REMOVE(&(rep->op_ctxt_lists[LDMS_XPRT_OP_UPDATE]), op_ctxt, ent); free(uc); } return rc; @@ -1404,6 +1455,13 @@ void __rail_process_send_quota(ldms_t x, struct ldms_request *req) __rep_flush_sbuf_tq(rep); } +struct ldms_op_ctxt_list * +__rail_op_ctxt_list(struct ldms_xprt *x, enum ldms_xprt_ops_e op_e) +{ + struct ldms_rail_ep_s *rep = ldms_xprt_ctxt_get(x); + return &(rep->op_ctxt_lists[op_e]); +} + int ldms_xprt_rail_send_quota_get(ldms_t _r, uint64_t *quotas, int n) { ldms_rail_t r; @@ -1463,6 +1521,18 @@ zap_ep_t __rail_get_zap_ep(ldms_t x) return r->eps[0].ep->zap_ep; } +void timespec_hton(struct timespec *ts) +{ + ts->tv_nsec = htobe64(ts->tv_nsec); + ts->tv_sec = htobe64(ts->tv_sec); +} + +void timespec_ntoh(struct timespec *ts) +{ + ts->tv_nsec = be64toh(ts->tv_nsec); + ts->tv_sec = be64toh(ts->tv_sec); +} + int sockaddr2ldms_addr(struct sockaddr *sa, struct ldms_addr *la) { union ldms_sockaddr *lsa = (void*)sa; @@ -1639,8 +1709,6 @@ size_t format_set_delete_req(struct ldms_request *req, uint64_t xid, void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, ldms_set_delete_cb_t cb_fn) { - /* - */ assert(XTYPE_IS_RAIL(_r->xtype)); ldms_rail_t r = (ldms_rail_t)_r; @@ -1651,6 +1719,8 @@ void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, struct xprt_set_coll_entry *ent; int i; ldms_t x; + struct ldms_rail_ep_s *rep; + struct ldms_op_ctxt *op_ctxt; x = NULL; @@ -1697,6 +1767,19 @@ void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, req = (struct ldms_request *)(ctxt + 1); len = format_set_delete_req(req, (uint64_t)(unsigned long)ctxt, ldms_set_instance_name_get(s)); + if (ENABLED_PROFILING(LDMS_XPRT_OP_SET_DELETE)) { + rep = (struct ldms_rail_ep_s *)ldms_xprt_ctxt_get(x); + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) { + ovis_log(xlog, OVIS_LCRIT, "%s:%s:%d Memory allocation failure\n", + __FILE__, __func__, __LINE__); + /* Let the routine continue */ + } else { + ctxt->op_ctxt = op_ctxt; + TAILQ_INSERT_TAIL(&rep->op_ctxt_lists[LDMS_XPRT_OP_SET_DELETE], op_ctxt, ent); + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->set_del_profile.send_ts); + } + } zap_err_t zerr = zap_send(x->zap_ep, req, len); if (zerr) { char name[128]; @@ -1707,6 +1790,8 @@ void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, __FILE__, __func__, __LINE__, zerr, name); x->zerrno = zerr; __ldms_free_ctxt(x, ctxt); + if (ENABLED_PROFILING(LDMS_XPRT_OP_SET_DELETE)) + TAILQ_REMOVE(&rep->op_ctxt_lists[LDMS_XPRT_OP_SET_DELETE], op_ctxt, ent); } pthread_mutex_unlock(&x->lock); } @@ -1714,20 +1799,41 @@ void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, int __rep_flush_sbuf_tq(struct ldms_rail_ep_s *rep) { int rc; + struct ldms_op_ctxt *op_ctxt = NULL; struct __pending_sbuf_s *p; while ((p = TAILQ_FIRST(&rep->sbuf_tq))) { rc = __rep_quota_acquire(rep, p->sbuf->msg->msg_len); if (rc) goto out; + + if (ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) { + rc = ENOMEM; + goto out; + } + op_ctxt->op_type = LDMS_XPRT_OP_STREAM_PUBLISH; + op_ctxt->stream_pub_profile.hop_num = p->hop_num; + op_ctxt->stream_pub_profile.recv_ts = p->recv_ts; + TAILQ_INSERT_TAIL(&(rep->op_ctxt_lists[LDMS_XPRT_OP_STREAM_PUBLISH]), + op_ctxt, ent); + } rc = __rep_publish(rep, p->sbuf->name, p->sbuf->msg->name_hash, p->sbuf->msg->stream_type, &p->sbuf->msg->src, p->sbuf->msg->msg_gn, &p->sbuf->msg->cred, p->sbuf->msg->perm, - p->sbuf->data, - p->sbuf->data_len); + p->sbuf->msg->hop_cnt, + p->sbuf->msg->hops, + p->sbuf->data, p->sbuf->data_len, + &(op_ctxt->stream_pub_profile)); if (rc) { __rep_quota_release(rep, p->sbuf->msg->msg_len); + if (ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + TAILQ_REMOVE(&(rep->op_ctxt_lists[LDMS_XPRT_OP_STREAM_PUBLISH]), + op_ctxt, ent); + free(op_ctxt); + } goto out; } TAILQ_REMOVE(&rep->sbuf_tq, p, entry); diff --git a/ldms/src/core/ldms_rail.h b/ldms/src/core/ldms_rail.h index adf34180cb..1c19eb09d0 100644 --- a/ldms/src/core/ldms_rail.h +++ b/ldms/src/core/ldms_rail.h @@ -114,6 +114,8 @@ struct ldms_rail_rate_quota_s { }; struct __pending_sbuf_s { + uint32_t hop_num; + struct timespec recv_ts; TAILQ_ENTRY(__pending_sbuf_s) entry; struct __stream_buf_s *sbuf; }; @@ -133,6 +135,14 @@ struct ldms_rail_ep_s { uint64_t pending_ret_quota; /* pending return quota */ int in_eps_stq; TAILQ_HEAD(, __pending_sbuf_s) sbuf_tq; /* pending fwd stream msgs */ + /* + * Array of operation context lists, indexed by `ldms_xprt_ops_e`. + * + * Each list stores operation contexts for the corresponding operation type. + * Operation contexts track profiling data for various operations, + * such as lookups, updates, and stream operations. + */ + struct ldms_op_ctxt_list op_ctxt_lists[LDMS_XPRT_OP_COUNT]; }; typedef struct ldms_rail_dir_ctxt_s { diff --git a/ldms/src/core/ldms_stream.c b/ldms/src/core/ldms_stream.c index 5318ae7dc1..89abd39ce0 100644 --- a/ldms/src/core/ldms_stream.c +++ b/ldms/src/core/ldms_stream.c @@ -79,6 +79,9 @@ #include "ldms_stream.h" #include "ldms_qgroup.h" +/* The definition is in ldms.c. */ +extern int enable_profiling[LDMS_XPRT_OP_COUNT]; + static ovis_log_t __ldms_stream_log = NULL; /* see __ldms_stream_init() below */ #define __LOG(LVL, FMT, ...) ovis_log(__ldms_stream_log, LVL, FMT, ##__VA_ARGS__ ); @@ -227,12 +230,19 @@ static int __part_send(struct ldms_rail_ep_s *rep, return rc; } +/* The implementations are in ldms_rail.c, */ +extern void timespec_ntoh(struct timespec *ts); +extern void timespec_hton(struct timespec *ts); + int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, uint32_t hash, - ldms_stream_type_t stream_type, + ldms_stream_type_t stream_type, struct ldms_addr *src, uint64_t msg_gn, ldms_cred_t cred, int perm, - const char *data, size_t data_len) + uint32_t hop_cnt, + struct ldms_stream_hop * hops, + const char *data, size_t data_len, + struct strm_publish_profile_s *pts) { int rc = 0; int name_len = strlen(stream_name) + 1; @@ -246,6 +256,7 @@ int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, } else { bzero(&msg.src, sizeof(msg.src)); } + msg.msg_gn = htobe64(msg_gn); msg.msg_len = htobe32(name_len + data_len); msg.stream_type = htobe32(stream_type); @@ -253,6 +264,32 @@ int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, msg.cred.gid = htobe32(cred->gid); msg.perm = htobe32(perm); msg.name_hash = hash; + msg.hop_cnt = htobe32(hop_cnt); + if (hops && hop_cnt) { + size_t sz = hop_cnt * sizeof(struct ldms_stream_hop); + memcpy(&msg.hops, hops, sz); + /* + * The timespec in hops are in network order already, + * so don't covert it. + */ + } + + (void)clock_gettime(CLOCK_REALTIME, &(pts->send_ts)); + if (hop_cnt <= STREAM_MAX_PROFILE_HOPS) { + /* + * We store the receive and send time only + * when the stream data has been forwarded + * at most STREAM_MAX_PROFILE_HOPS times. + * + * We ignore the timestamps after + * the STREAM_MAX_PROFILE_HOPS'th. + * + */ + msg.hops[hop_cnt].recv_ts = pts->recv_ts; + msg.hops[hop_cnt].send_ts = pts->send_ts; + timespec_hton(&msg.hops[hop_cnt].recv_ts); + timespec_hton(&msg.hops[hop_cnt].send_ts); + } rc = __part_send(rep, &msg.src, msg_gn, &msg, sizeof(msg), /* msg hdr */ stream_name, name_len, /* name */ @@ -277,11 +314,14 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) int rc; uint64_t addr_port; uint64_t hash; + struct ldms_op_ctxt *op_ctxt = NULL; + if (ev->type == LDMS_STREAM_EVENT_CLOSE) return 0; assert( ev->type == LDMS_STREAM_EVENT_RECV ); if (!XTYPE_IS_RAIL(ev->recv.client->x->xtype)) return ENOTSUP; + r = (ldms_rail_t)ev->recv.client->x; switch (ev->recv.src.sa_family) { case 0: @@ -320,6 +360,8 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) struct __pending_sbuf_s *e; e = malloc(sizeof(*e)); if (e) { + e->hop_num = _ev->pub.hop_num; + e->recv_ts = _ev->pub.recv_ts; e->sbuf = sbuf; ref_get(&sbuf->ref, "pending"); TAILQ_INSERT_TAIL(&r->eps[ep_idx].sbuf_tq, e, entry); @@ -328,14 +370,29 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) goto out; } + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_STREAM_PUBLISH; + op_ctxt->stream_pub_profile.hop_num = _ev->pub.hop_num; + op_ctxt->stream_pub_profile.recv_ts = _ev->pub.recv_ts; rc = __rep_publish(&r->eps[ep_idx], ev->recv.name, ev->recv.name_hash, ev->recv.type, &ev->recv.src, ev->recv.msg_gn, &ev->recv.cred, ev->recv.perm, - ev->recv.data, - ev->recv.data_len); - if (rc) + _ev->sbuf->msg->hop_cnt, + _ev->sbuf->msg->hops, + ev->recv.data, ev->recv.data_len, + &(op_ctxt->stream_pub_profile)); + if (rc || !ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + free(op_ctxt); + } else { + TAILQ_INSERT_TAIL(&(r->eps[ep_idx].op_ctxt_lists[LDMS_XPRT_OP_STREAM_PUBLISH]), + op_ctxt, ent); + } + if (rc) { __rate_quota_release(&ev->recv.client->rate_quota, ev->recv.data_len); + } out: return rc; } @@ -484,13 +541,15 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, uint32_t hash, ldms_stream_type_t stream_type, ldms_cred_t cred, uint32_t perm, - const char *data, size_t data_len) + const char *data, size_t data_len, + uint32_t hop_cnt, struct timespec *recv_ts) { int rc = 0, gc; struct ldms_stream_s *s; struct ldms_stream_client_entry_s *sce, *next_sce; struct ldms_stream_client_s *c; struct timespec now; + size_t sz; s = __stream_get(stream_name, NULL); if (!s) { @@ -500,6 +559,8 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, struct __stream_event_s _ev = { .pub = { + .hop_num = hop_cnt, + .recv_ts = *recv_ts, .type = LDMS_STREAM_EVENT_RECV, .recv = { .src = {0}, @@ -528,10 +589,11 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, pthread_rwlock_wrlock(&s->rwlock); clock_gettime(CLOCK_REALTIME, &now); __counters_update(&s->rx, &now, data_len); - if (__stream_stats_level > 1) { + if ((__stream_stats_level > 1) || ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { /* stats by src */ struct rbn *rbn = rbt_find(&s->src_stats_rbt, &_ev.pub.recv.src); struct ldms_stream_src_stats_s *ss; + struct ldms_stream_profile_ent *prof; if (rbn) { ss = container_of(rbn, struct ldms_stream_src_stats_s, rbn); } else { @@ -546,8 +608,30 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, rbn_init(&ss->rbn, &ss->src); ss->rx = LDMS_STREAM_COUNTERS_INITIALIZER; rbt_ins(&s->src_stats_rbt, &ss->rbn); + TAILQ_INIT(&ss->profiles); } __counters_update(&ss->rx, &now, data_len); + if (sbuf && ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + /* Receive the stream data from remote server, so cache the profile */ + sz = (sbuf->msg->hop_cnt+1) * sizeof(struct ldms_stream_hop); + prof = calloc(1, sizeof(*prof) + sz); + if (!prof) { + /* error in stats shall not break the normal + * operations + */ + pthread_rwlock_unlock(&s->rwlock); + goto skip_stats; + } + prof->profiles.hop_cnt = sbuf->msg->hop_cnt; + memcpy(&(prof->profiles.hops), &(sbuf->msg->hops), sz); + int i; + for (i = 0; i < prof->profiles.hop_cnt; i++) { + timespec_ntoh(&prof->profiles.hops[i].recv_ts); + timespec_ntoh(&prof->profiles.hops[i].send_ts); + } + prof->profiles.hops[prof->profiles.hop_cnt].recv_ts = *recv_ts; + TAILQ_INSERT_TAIL(&ss->profiles, prof, ent); + } } pthread_rwlock_unlock(&s->rwlock); skip_stats: @@ -822,8 +906,7 @@ int __publish_cred_check(ldms_cred_t cred) int ldms_stream_publish(ldms_t x, const char *stream_name, ldms_stream_type_t stream_type, - ldms_cred_t cred, - uint32_t perm, + ldms_cred_t cred, uint32_t perm, const char *data, size_t data_len) { ldms_rail_t r; @@ -834,6 +917,11 @@ int ldms_stream_publish(ldms_t x, const char *stream_name, int rc; uint32_t hash; int ep_idx; + struct ldms_op_ctxt *op_ctxt = NULL; + struct ldms_op_ctxt_list *op_ctxt_list; + struct timespec recv_ts; + + (void)clock_gettime(CLOCK_REALTIME, &recv_ts); msg_gn = __atomic_fetch_add(&stream_gn, 1, __ATOMIC_SEQ_CST); @@ -863,14 +951,30 @@ int ldms_stream_publish(ldms_t x, const char *stream_name, rc = __rep_quota_acquire(&r->eps[ep_idx], q); if (rc) return rc; - return __rep_publish(&r->eps[ep_idx], stream_name, hash, - stream_type, 0, msg_gn, cred, perm, data, - data_len); + + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_PUBLISH; + op_ctxt->stream_pub_profile.hop_num = 0; + op_ctxt->stream_pub_profile.recv_ts = recv_ts; + rc = __rep_publish(&r->eps[ep_idx], stream_name, hash, + stream_type, 0, msg_gn, cred, perm, + 0, NULL, data, data_len, + &(op_ctxt->stream_pub_profile)); + + if (rc || !ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + free(op_ctxt); + } else { + op_ctxt_list = &(r->eps[ep_idx].op_ctxt_lists[LDMS_XPRT_OP_PUBLISH]); + TAILQ_INSERT_TAIL(op_ctxt_list, op_ctxt, ent); + } + return rc; } /* else publish locally */ return __stream_deliver(0, msg_gn, stream_name, name_len, hash, - stream_type, cred, perm, data, data_len); + stream_type, cred, perm, data, data_len, 0, &recv_ts); } static void __client_ref_free(void *arg) @@ -1156,6 +1260,7 @@ static void __process_stream_msg(ldms_t x, struct ldms_request *req) { struct ldms_rail_ep_s *rep = ldms_xprt_ctxt_get(x); + struct ldms_thrstat *thrstat; /* no need to take lock ; only one zap thread working on this tree */ struct rbn *rbn; struct __stream_buf_s *sbuf; @@ -1217,7 +1322,7 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) fmsg = (void*)req->stream_part.part_msg; flen = be32toh(fmsg->msg_len); - sbuf = malloc(sizeof(*sbuf) + sizeof(*fmsg) + flen); + sbuf = calloc(1, sizeof(*sbuf) + sizeof(*fmsg) + flen); if (!sbuf) return; ref_init(&sbuf->ref, "init", __stream_buf_s_ref_free, sbuf); @@ -1253,6 +1358,8 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) sbuf->msg->cred.uid = be32toh(sbuf->msg->cred.uid); sbuf->msg->cred.gid = be32toh(sbuf->msg->cred.gid); sbuf->msg->perm = be32toh(sbuf->msg->perm); + sbuf->msg->hop_cnt = be32toh(sbuf->msg->hop_cnt); + sbuf->msg->hop_cnt++; /* sbuf->msg->name_hash does not need byte conversion */ sbuf->name = sbuf->msg->msg; @@ -1260,11 +1367,13 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) sbuf->data = sbuf->msg->msg + sbuf->name_len; sbuf->data_len = sbuf->msg->msg_len - sbuf->name_len; + thrstat = zap_thrstat_ctxt_get(x->zap_ep); __stream_deliver(sbuf, sbuf->msg->msg_gn, sbuf->name, sbuf->name_len, sbuf->msg->name_hash, sbuf->msg->stream_type, &sbuf->msg->cred, sbuf->msg->perm, - sbuf->data, sbuf->data_len); + sbuf->data, sbuf->data_len, + sbuf->msg->hop_cnt, &(thrstat->last_op_start)); cleanup: rbt_del(&rep->sbuf_rbt, &sbuf->rbn); @@ -1399,6 +1508,7 @@ void __stream_req_recv(ldms_t x, int cmd, struct ldms_request *req) { assert(0 == XTYPE_IS_RAIL(x->xtype)); /* x is NOT a rail */ assert(x->event_cb == __rail_cb); + switch (cmd) { case LDMS_CMD_STREAM_MSG: __process_stream_msg(x, req); @@ -1488,6 +1598,15 @@ int ldms_stream_stats_level_get() return __atomic_load_n(&__stream_stats_level, __ATOMIC_SEQ_CST); } +void __stream_profiling_purge(struct ldms_stream_profile_list *profiles) +{ + struct ldms_stream_profile_ent *prf; + while ((prf = TAILQ_FIRST(profiles))) { + TAILQ_REMOVE(profiles, prf, ent); + free(prf); + } +} + void __src_stats_rbt_purge(struct rbt *rbt) { struct rbn *rbn; @@ -1495,6 +1614,7 @@ void __src_stats_rbt_purge(struct rbt *rbt) while ((rbn = rbt_min(rbt))) { rbt_del(rbt, rbn); sss = container_of(rbn, struct ldms_stream_src_stats_s, rbn); + __stream_profiling_purge(&sss->profiles); free(sss); } } @@ -1504,6 +1624,8 @@ int __src_stats_rbt_copy(struct rbt *t0, struct rbt *t1, int is_reset) { struct rbn *rbn; struct ldms_stream_src_stats_s *s0, *s1; + struct ldms_stream_profile_ent *prf0, *prf1; + size_t sz; int rc; for (rbn = rbt_min(t0); rbn; rbn = rbn_succ(rbn)) { s0 = container_of(rbn, struct ldms_stream_src_stats_s, rbn); @@ -1513,8 +1635,24 @@ int __src_stats_rbt_copy(struct rbt *t0, struct rbt *t1, int is_reset) goto err_0; } *s1 = *s0; - if (is_reset) + TAILQ_INIT(&s1->profiles); + + /* Copy the profiles */ + TAILQ_FOREACH(prf0, &s0->profiles, ent) { + sz = prf0->profiles.hop_cnt * sizeof(struct ldms_stream_hop); + prf1 = calloc(1, sizeof(*prf1) + sz); + if (!prf1) { + rc = ENOMEM; + goto err_0; + } + prf1->profiles.hop_cnt = prf0->profiles.hop_cnt; + memcpy(&(prf1->profiles.hops), &(prf0->profiles.hops), sz); + TAILQ_INSERT_TAIL(&s1->profiles, prf1, ent); + } + if (is_reset) { LDMS_STREAM_COUNTERS_INIT(&s0->rx); + __stream_profiling_purge(&s0->profiles); + } rbn_init(&s1->rbn, &s1->src); rbt_ins(t1, &s1->rbn); } diff --git a/ldms/src/core/ldms_stream.h b/ldms/src/core/ldms_stream.h index 7e796b3aa3..ba896a23f3 100644 --- a/ldms/src/core/ldms_stream.h +++ b/ldms/src/core/ldms_stream.h @@ -135,6 +135,9 @@ struct ldms_stream_full_msg_s { struct ldms_cred cred; /* credential of the originator */ uint32_t perm; /* 0777 style permission */ uint32_t name_hash; + /* Allocate space to collect profile data for 8 hops */ + uint32_t hop_cnt; + struct ldms_stream_hop hops[STREAM_MAX_PROFILE_HOPS+1]; char msg[OVIS_FLEX]; /* `msg` format: * .----------------------. @@ -169,10 +172,11 @@ struct __stream_buf_s { /* for internal use */ int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, - uint32_t hash, - ldms_stream_type_t stream_type, + uint32_t hash, ldms_stream_type_t stream_type, struct ldms_addr *src, uint64_t msg_gn, ldms_cred_t cred, int perm, - const char *data, size_t data_len); - + uint32_t hop_cnt, + struct ldms_stream_hop *hops, + const char *data, size_t data_len, + struct strm_publish_profile_s *pts); #endif /* __LDMS_STREAM_H__ */ diff --git a/ldms/src/core/ldms_xprt.c b/ldms/src/core/ldms_xprt.c index c3e0d3776e..15ef266b58 100644 --- a/ldms/src/core/ldms_xprt.c +++ b/ldms/src/core/ldms_xprt.c @@ -6,8 +6,7 @@ * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU - * General Public License (GPL) Version 2, available from the file - * COPYING in the main directory of this source tree, or the BSD-type +` * COPYING in the main directory of this source tree, or the BSD-type * license below: * * Redistribution and use in source and binary forms, with or without @@ -84,6 +83,9 @@ extern ovis_log_t xlog; ovis_log(xlog, level, fmt, ## __VA_ARGS__); \ } while (0); +/* The definition is in ldms_xprt.c. */ +extern int enable_profiling[LDMS_XPRT_OP_COUNT]; + /** * zap callback function. */ @@ -95,6 +97,10 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev); */ void ldms_zap_auto_cb(zap_ep_t zep, zap_event_t ev); +/* The implementation is is ldms_rail.c. */ +struct ldms_op_ctxt_list * +__rail_op_ctxt_list(ldms_t x, enum ldms_xprt_ops_e op_e); + #if 0 #define TF() XPRT_LOG(NULL, OVIS_LALWAYS, "%s:%d\n", __FUNCTION__, __LINE__) #else @@ -744,10 +750,15 @@ void ldms_xprt_put(ldms_t x) x->ops.put(x); } +/* The implementations are in ldms_rail.c. */ +extern void timespec_hton(struct timespec *ts); +extern void timespec_ntoh(struct timespec *ts); + static void process_set_delete_request(struct ldms_xprt *x, struct ldms_request *req) { struct ldms_reply reply; struct ldms_set *set; + size_t len; /* * Always notify the application about peer set delete. If we happened @@ -777,8 +788,11 @@ static void process_set_delete_request(struct ldms_xprt *x, struct ldms_request reply.hdr.xid = req->hdr.xid; reply.hdr.cmd = htonl(LDMS_CMD_SET_DELETE_REPLY); reply.hdr.rc = 0; - reply.hdr.len = htonl(sizeof(reply.hdr)); - zap_err_t zerr = zap_send(x->zap_ep, &reply, sizeof(reply.hdr)); + len = sizeof(reply.hdr) + sizeof(reply.set_del); + reply.hdr.len = htonl(len); + (void)clock_gettime(CLOCK_REALTIME, &reply.set_del.recv_ts); + timespec_hton(&reply.set_del.recv_ts); + zap_err_t zerr = zap_send(x->zap_ep, &reply, len); if (zerr != ZAP_ERR_OK) { x->zerrno = zerr; XPRT_LOG(x, OVIS_LERROR, "%s: zap_send synchronously error. " @@ -790,6 +804,12 @@ static void process_set_delete_reply(struct ldms_xprt *x, struct ldms_reply *reply, struct ldms_context *ctxt) { + if (ENABLED_PROFILING(LDMS_XPRT_OP_SET_DELETE)) { + struct ldms_thrstat *thrstat = zap_thrstat_ctxt_get(x->zap_ep); + memcpy(&ctxt->op_ctxt->set_del_profile.ack_ts, &thrstat->last_op_start, sizeof(struct timespec)); + timespec_ntoh(&reply->set_del.recv_ts); + memcpy(&ctxt->op_ctxt->set_del_profile.recv_ts, &reply->set_del.recv_ts, sizeof(struct timespec)); + } ctxt->set_delete.cb(x, reply->hdr.rc, ctxt->set_delete.s, ctxt->set_delete.cb_arg); pthread_mutex_lock(&x->lock); __ldms_free_ctxt(x, ctxt); @@ -1312,6 +1332,7 @@ static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, ldms_name_t name = get_instance_name(set->meta); ldms_name_t schema = get_schema_name(set->meta); + struct ldms_thrstat *thrstat = zap_thrstat_ctxt_get(x->zap_ep); /* * The lookup.set_info encodes schema name, instance name * and the set info key value pairs as follows. @@ -1381,6 +1402,8 @@ static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, msg->lookup.meta_len = htonl(__le32_to_cpu(set->meta->meta_sz)); msg->lookup.card = htonl(__le32_to_cpu(set->meta->card)); msg->lookup.array_card = htonl(__le32_to_cpu(set->meta->array_card)); + memcpy(&msg->lookup.req_recv, &thrstat->last_op_start, sizeof(struct timespec)); + (void)clock_gettime(CLOCK_REALTIME, &msg->lookup.share); XPRT_LOG(x, OVIS_LDEBUG, "%s(): x %p: sharing ... remote lookup ctxt %p\n", __func__, x, (void *)xid); @@ -1549,6 +1572,24 @@ static int do_read_all(ldms_t x, ldms_set_t s, ldms_update_cb_t cb, void *arg) goto out; } assert(x == ctxt->x); + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + ctxt->op_ctxt = s->curr_updt_ctxt; + if (0 == ctxt->op_ctxt->update_profile.read_ts.tv_sec) { + /* + * If the data read timestamp is not set, + * record the current time as the start of the read operation. + * + * The read operation may involve reading the entire set at once, + * reading the meta followed by data, + * or reading multiple times to obtain the updated copy of the set. + */ + (void)clock_gettime(CLOCK_REALTIME, &ctxt->op_ctxt->update_profile.read_ts); + } else { + /* + * Continue reading the set. The read operation has already started. + */ + } + } rc = zap_read(x->zap_ep, s->rmap, zap_map_addr(s->rmap), s->lmap, zap_map_addr(s->lmap), len, ctxt); if (rc) { @@ -1575,6 +1616,20 @@ static int do_read_meta(ldms_t x, ldms_set_t s, ldms_update_cb_t cb, void *arg) goto out; } assert(x == ctxt->x); + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + ctxt->op_ctxt = s->curr_updt_ctxt; + if (0 == ctxt->op_ctxt->update_profile.read_ts.tv_sec) { + /* + * If the data read timestamp is not set, + * record the current time as the start of the read operation. + */ + (void)clock_gettime(CLOCK_REALTIME, &ctxt->op_ctxt->update_profile.read_ts); + } else { + /* + * Continue reading the set. The read operation has already started. + */ + } + } rc = zap_read(x->zap_ep, s->rmap, zap_map_addr(s->rmap), s->lmap, zap_map_addr(s->lmap), meta_sz, ctxt); if (rc) { @@ -1598,7 +1653,6 @@ static int do_read_data(ldms_t x, ldms_set_t s, int idx_from, int idx_to, ctxt = __ldms_alloc_ctxt(x, sizeof(*ctxt), LDMS_CONTEXT_UPDATE, s, cb, arg, idx_from, idx_to); - if (!ctxt) { rc = ENOMEM; goto out; @@ -1609,6 +1663,20 @@ static int do_read_data(ldms_t x, ldms_set_t s, int idx_from, int idx_to, dlen = (idx_to - idx_from + 1) * data_sz; assert(x == ctxt->x); + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + ctxt->op_ctxt = s->curr_updt_ctxt; + if (0 == ctxt->op_ctxt->update_profile.read_ts.tv_sec) { + /* + * If the data read timestamp is not set, + * record the current time as the start of the read operation. + */ + (void)clock_gettime(CLOCK_REALTIME, &ctxt->op_ctxt->update_profile.read_ts); + } else { + /* + * Continue reading the set. The read operation has already started. + */ + } + } rc = zap_read(x->zap_ep, s->rmap, zap_map_addr(s->rmap) + doff, s->lmap, zap_map_addr(s->lmap) + doff, dlen, ctxt); if (rc) { @@ -1758,6 +1826,12 @@ static void process_lookup_reply(struct ldms_xprt *x, struct ldms_reply *reply, struct ldms_context *ctxt) { + struct ldms_thrstat *thrstat; + + thrstat = zap_thrstat_ctxt_get(x->zap_ep); + memcpy(&ctxt->op_ctxt->lookup_profile.complete_ts, &thrstat->last_op_start, + sizeof(struct timespec)); + int rc = ntohl(reply->hdr.rc); if (!rc) { /* A peer should only receive error in lookup_reply. @@ -2538,14 +2612,40 @@ static void handle_zap_read_complete(zap_ep_t zep, zap_event_t ev) switch (ctxt->type) { case LDMS_CONTEXT_UPDATE: thrstat->last_op = LDMS_THRSTAT_OP_UPDATE_REPLY; + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + /* + * If read complete timestamp is already set, + * we replace it with a new timestamp. + * + * We collect the timestamp of the beginning of thr first read and + * the timestamp of the completion of the last read. + */ + memcpy(&ctxt->op_ctxt->update_profile.read_complete_ts, + &thrstat->last_op_start, + sizeof(struct timespec)); + } __handle_update_data(x, ctxt, ev); break; case LDMS_CONTEXT_UPDATE_META: - thrstat->last_op = LDMS_THRSTAT_OP_UPDATE_REPLY; + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + /* + * With the same reason as in the LDMS_CONTEXT_UPDATE case, + * we set or reset the read complete timestamp. + */ + memcpy(&ctxt->op_ctxt->update_profile.read_complete_ts, + &thrstat->last_op_start, + sizeof(struct timespec)); + } __handle_update_meta(x, ctxt, ev); break; case LDMS_CONTEXT_LOOKUP_READ: thrstat->last_op = LDMS_THRSTAT_OP_LOOKUP_REPLY; + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + memcpy(&ctxt->op_ctxt->lookup_profile.complete_ts, + &thrstat->last_op_start, + sizeof(struct timespec)); + } + __handle_lookup(x, ctxt, ev); break; default: @@ -2660,6 +2760,14 @@ static void handle_rendezvous_lookup(zap_ep_t zep, zap_event_t ev, int rc; ldms_name_t schema_name, inst_name; + struct ldms_thrstat *thrstat = zap_thrstat_ctxt_get(zep); + struct ldms_op_ctxt *op_ctxt = ctxt->op_ctxt; + + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + memcpy(&op_ctxt->lookup_profile.rendzv_ts, &thrstat->last_op_start, sizeof(struct timespec)); + memcpy(&op_ctxt->lookup_profile.req_recv_ts, &lu->req_recv, sizeof(struct timespec)); + memcpy(&op_ctxt->lookup_profile.share_ts, &lu->share, sizeof(struct timespec)); + } #ifdef DEBUG if (!__is_lookup_name_good(x, lu, ctxt)) { XPRT_LOG(x, OVIS_LALWAYS, @@ -2719,8 +2827,12 @@ static void handle_rendezvous_lookup(zap_ep_t zep, zap_event_t ev, rd_ctxt->sem = ctxt->sem; rd_ctxt->sem_p = ctxt->sem_p; rd_ctxt->rc = ctxt->rc; + rd_ctxt->op_ctxt = ctxt->op_ctxt; pthread_mutex_unlock(&x->lock); assert((zep == x->zap_ep) && (x == rd_ctxt->x)); + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->lookup_profile.read_ts); + } rc = zap_read(zep, lset->rmap, zap_map_addr(lset->rmap), lset->lmap, zap_map_addr(lset->lmap), @@ -2950,6 +3062,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) struct ldms_xprt *x = zap_get_ucontext(zep); struct ldms_thrstat *thrstat; struct ldms_thrstat_entry *thrstat_e = NULL; + struct ldms_op_ctxt *op_ctxt = NULL; if (x == NULL) return; @@ -3112,7 +3225,13 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) * Applications know only the connection is connecting. */ } else { - if (x->event_cb && (uint64_t)ev->context == LDMS_CMD_SEND_MSG) { + if (x->event_cb && ev->context) { + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + op_ctxt = (struct ldms_op_ctxt *)ev->context; + memcpy(&op_ctxt->send_profile.complete_ts, &thrstat->last_op_start, + sizeof(struct timespec)); + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->send_profile.deliver_ts); + } event.type = LDMS_XPRT_EVENT_SEND_COMPLETE; x->event_cb(x, &event, x->event_cb_arg); } @@ -3244,12 +3363,13 @@ static int __ldms_xprt_sockaddr(ldms_t x, struct sockaddr *local_sa, struct sockaddr *remote_sa, socklen_t *sa_len); static void __ldms_xprt_close(ldms_t x); -static int __ldms_xprt_send(ldms_t x, char *msg_buf, size_t msg_len); +static int __ldms_xprt_send(ldms_t x, char *msg_buf, size_t msg_len, + struct ldms_op_ctxt *op_ctxt); static size_t __ldms_xprt_msg_max(ldms_t x); static int __ldms_xprt_dir(ldms_t x, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags); static int __ldms_xprt_lookup(ldms_t x, const char *path, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg); -static void __ldms_xprt_stats(ldms_t x, ldms_xprt_stats_t stats); + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt); +static void __ldms_xprt_stats(ldms_t x, ldms_xprt_stats_t stats, int mask, int is_reset); static int __ldms_xprt_dir_cancel(ldms_t x); static ldms_t __ldms_xprt_get(ldms_t x); /* ref get */ @@ -3261,7 +3381,8 @@ static const char *__ldms_xprt_type_name(ldms_t x); static void __ldms_xprt_priority_set(ldms_t x, int prio); static void __ldms_xprt_cred_get(ldms_t x, ldms_cred_t lcl, ldms_cred_t rmt); static void __ldms_xprt_event_cb_set(ldms_t x, ldms_event_cb_t cb, void *cb_arg); -int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg); +int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg, + struct ldms_op_ctxt *op_ctxt); int __ldms_xprt_get_threads(ldms_t x, pthread_t *out, int n); zap_ep_t __ldms_xprt_get_zap_ep(ldms_t x); static ldms_set_t __ldms_xprt_set_by_name(ldms_t x, const char *set_name); @@ -3511,7 +3632,8 @@ size_t format_cancel_notify_req(struct ldms_request *req, uint64_t xid, return len; } -static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) +static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len, + struct ldms_op_ctxt *op_ctxt) { struct ldms_xprt *x = _x; struct ldms_request *req; @@ -3536,6 +3658,7 @@ static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) rc = ENOMEM; goto err_0; } + ctxt->op_ctxt = op_ctxt; req = (struct ldms_request *)(ctxt + 1); req->hdr.xid = 0; req->hdr.cmd = htonl(LDMS_CMD_SEND_MSG); @@ -3545,7 +3668,10 @@ static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) sizeof(struct ldms_send_cmd_param) + msg_len; req->hdr.len = htonl(len); - rc = zap_send2(x->zap_ep, req, len, (void*)(uint64_t)LDMS_CMD_SEND_MSG); + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->send_profile.send_ts); + } + rc = zap_send2(x->zap_ep, req, len, (void*)op_ctxt); #ifdef DEBUG if (rc) { XPRT_LOG(x, OVIS_LDEBUG, "send: error. put ref %p.\n", x->zap_ep); @@ -3560,7 +3686,19 @@ static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) int ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) { - return _x->ops.send(_x, msg_buf, msg_len); + int rc; + struct ldms_op_ctxt *op_ctxt = NULL; + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_SEND; + (void)clock_gettime(CLOCK_REALTIME, &(op_ctxt->send_profile.app_req_ts)); + } + rc = _x->ops.send(_x, msg_buf, msg_len, op_ctxt); + if (rc) + free(op_ctxt); + return rc; } static size_t __ldms_xprt_msg_max(ldms_t x) @@ -3699,7 +3837,8 @@ int ldms_xprt_dir_cancel(ldms_t x) int __ldms_remote_lookup(ldms_t _x, const char *path, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *arg) + ldms_lookup_cb_t cb, void *arg, + struct ldms_op_ctxt *op_ctxt) { struct ldms_xprt *x = _x; struct ldms_request *req; @@ -3749,6 +3888,10 @@ int __ldms_remote_lookup(ldms_t _x, const char *path, XPRT_LOG(x, OVIS_LDEBUG, "remote_lookup: get ref %p: active_lookup = %d\n", x->zap_ep, x->active_lookup); #endif /* DEBUG */ + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + ctxt->op_ctxt = op_ctxt; + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->lookup_profile.req_send_ts); + } zap_err_t zerr = zap_send(x->zap_ep, req, len); if (zerr) { pthread_mutex_lock(&x->lock); @@ -3777,37 +3920,91 @@ static void sync_lookup_cb(ldms_t x, enum ldms_lookup_status status, int more, } static int __ldms_xprt_lookup(ldms_t x, const char *path, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg) + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt) { int rc; if ((flags & !cb) || strlen(path) > LDMS_LOOKUP_PATH_MAX) return EINVAL; if (!cb) { - rc = __ldms_remote_lookup(x, path, flags, sync_lookup_cb, cb_arg); + rc = __ldms_remote_lookup(x, path, flags, sync_lookup_cb, cb_arg, op_ctxt); if (rc) return rc; sem_wait(&x->sem); rc = x->sem_rc; } else - rc = __ldms_remote_lookup(x, path, flags, cb, cb_arg); + rc = __ldms_remote_lookup(x, path, flags, cb, cb_arg, op_ctxt); return rc; } int ldms_xprt_lookup(ldms_t x, const char *path, enum ldms_lookup_flags flags, ldms_lookup_cb_t cb, void *cb_arg) { - return x->ops.lookup(x, path, flags, cb, cb_arg); + int rc; + struct ldms_op_ctxt *op_ctxt = NULL; + + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_LOOKUP; + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->lookup_profile.app_req_ts); + } + rc = x->ops.lookup(x, path, flags, cb, cb_arg, op_ctxt); + if (rc) + free(op_ctxt); + return rc; } -static void __ldms_xprt_stats(ldms_t _x, ldms_xprt_stats_t stats) +static void __ldms_xprt_stats(ldms_t _x, ldms_xprt_stats_t stats, int mask, int is_reset) { + struct ldms_op_ctxt_list *src_list, *dst_list; + struct ldms_op_ctxt *src, *dst; + enum ldms_xprt_ops_e op_e; + + if (!stats) + goto reset; *stats = _x->stats; + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + TAILQ_INIT(&stats->op_ctxt_lists[op_e]); + dst_list = &stats->op_ctxt_lists[op_e]; + src_list = __rail_op_ctxt_list(_x, op_e); + + TAILQ_FOREACH(src, src_list, ent) { + dst = malloc(sizeof(*dst)); + if (!dst) { + ovis_log(NULL, OVIS_LCRIT, "Memory allocation failure.\n"); + return; + } + memcpy(dst, src, sizeof(*dst)); + dst->ent.tqe_next = NULL; + dst->ent.tqe_prev = NULL; + TAILQ_INSERT_TAIL(dst_list, dst, ent); + } + } + reset: + if (!is_reset) + return; + if (mask & LDMS_PERF_M_STATS) { + /* last_op and ops could also be reset by ldms_xprt_rate_data(). */ + /* don't reset the connect/disconnect time */ + memset(&_x->stats.last_op, 0, sizeof(_x->stats.last_op)); + memset(&_x->stats.ops, 0, sizeof(_x->stats.ops)); + } + if (mask & LDMS_PERF_M_PROFILNG) { + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + src_list = __rail_op_ctxt_list(_x, op_e); + while ((src = TAILQ_FIRST(src_list))) { + TAILQ_REMOVE(src_list, src, ent); + free(src); + } + } + } } -void ldms_xprt_stats(ldms_t _x, ldms_xprt_stats_t stats) +void ldms_xprt_stats(ldms_t _x, ldms_xprt_stats_t stats, int mask, int is_reset) { - _x->ops.stats(_x, stats); + _x->ops.stats(_x, stats, mask, is_reset); } static int send_req_notify(ldms_t _x, ldms_set_t s, uint32_t flags, diff --git a/ldms/src/core/ldms_xprt.h b/ldms/src/core/ldms_xprt.h index 7a98300f51..46de135f8c 100644 --- a/ldms/src/core/ldms_xprt.h +++ b/ldms/src/core/ldms_xprt.h @@ -281,6 +281,8 @@ struct ldms_rendezvous_lookup_param { uint32_t card; /* card of dict */ uint32_t schema_len; uint32_t array_card; /* card of array */ + struct timespec req_recv; /* Timestamp when server has received the lookup request. */ + struct timespec share; /* Timestamp when server has called zap_share(). */ /* schema name, then instance name, and then set_info key value pairs */ char set_info[OVIS_FLEX]; }; @@ -346,6 +348,10 @@ struct ldms_stream_sub_reply { char msg[0]; }; +struct ldms_set_delete_reply { + struct timespec recv_ts; +}; + struct ldms_reply { struct ldms_reply_hdr hdr; union { @@ -354,6 +360,7 @@ struct ldms_reply { struct ldms_auth_challenge_reply auth_challenge; struct ldms_push_reply push; struct ldms_stream_sub_reply sub; + struct ldms_set_delete_reply set_del; }; }; #pragma pack() @@ -377,6 +384,7 @@ struct ldms_context { int rc; struct ldms_xprt *x; ldms_context_type_t type; + struct ldms_op_ctxt *op_ctxt; union { struct { ldms_dir_cb_t cb; @@ -449,13 +457,13 @@ struct ldms_xprt_ops_s { struct sockaddr *remote_sa, socklen_t *sa_len); void (*close)(ldms_t x); - int (*send)(ldms_t x, char *msg_buf, size_t msg_len); + int (*send)(ldms_t x, char *msg_buf, size_t msg_len, struct ldms_op_ctxt *op_ctxt); size_t (*msg_max)(ldms_t x); int (*dir)(ldms_t x, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags); int (*dir_cancel)(ldms_t x); int (*lookup)(ldms_t t, const char *name, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg); - void (*stats)(ldms_t x, ldms_xprt_stats_t stats); + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt); + void (*stats)(ldms_t x, ldms_xprt_stats_t stats, int mask, int is_reset); ldms_t (*get)(ldms_t x); /* ref get */ void (*put)(ldms_t x); /* ref put */ @@ -465,7 +473,8 @@ struct ldms_xprt_ops_s { const char *(*type_name)(ldms_t x); void (*priority_set)(ldms_t x, int prio); void (*cred_get)(ldms_t x, ldms_cred_t lcl, ldms_cred_t rmt); - int (*update)(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg); + int (*update)(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg, + struct ldms_op_ctxt *op_ctxt); int (*get_threads)(ldms_t x, pthread_t *out, int n); @@ -559,4 +568,10 @@ void ldms_xprt_auth_begin(ldms_t xprt); int ldms_xprt_auth_send(ldms_t _x, const char *msg_buf, size_t msg_len); void ldms_xprt_auth_end(ldms_t xprt, int result); +/* ======================== + * LDMS operation profiling + * ======================== + */ +#define ENABLED_PROFILING(_OP_) (enable_profiling[_OP_] == 1) + #endif diff --git a/ldms/src/ldmsd/ldmsd_request.c b/ldms/src/ldmsd/ldmsd_request.c index 29d9e2f653..4b749381c1 100644 --- a/ldms/src/ldmsd/ldmsd_request.c +++ b/ldms/src/ldmsd/ldmsd_request.c @@ -264,6 +264,7 @@ static int update_time_stats_handler(ldmsd_req_ctxt_t reqc); static int set_sec_mod_handler(ldmsd_req_ctxt_t reqc); static int log_status_handler(ldmsd_req_ctxt_t reqc); static int stats_reset_handler(ldmsd_req_ctxt_t reqc); +static int profiling_handler(ldmsd_req_ctxt_t req); /* these are implemented in ldmsd_failover.c */ int failover_config_handler(ldmsd_req_ctxt_t req_ctxt); @@ -565,6 +566,10 @@ static struct request_handler_entry request_handler[] = { LDMSD_SET_DEFAULT_AUTHZ_REQ, set_default_authz_handler, XUG | MOD }, + [LDMSD_PROFILING_REQ] = { + LDMSD_PROFILING_REQ, profiling_handler, XALL + }, + /* FAILOVER user commands */ [LDMSD_FAILOVER_CONFIG_REQ] = { LDMSD_FAILOVER_CONFIG_REQ, failover_config_handler, XUG | MOD, @@ -7056,7 +7061,7 @@ static char *__xprt_stats_as_json(size_t *json_sz, int reset, int level) first = 0; } - ldms_xprt_stats(x, &xs); + ldms_xprt_stats(x, &xs, LDMS_PERF_M_STATS, reset); xprt_count += 1; switch (ep_state) { @@ -7228,6 +7233,254 @@ static int xprt_stats_handler(ldmsd_req_ctxt_t req) return ENOMEM; } +double __ts2double(struct timespec ts) +{ + return ts.tv_sec + ((double)ts.tv_nsec)/1000000000.0; +} + +json_t *__ldms_op_profiling_as_json(struct ldms_op_ctxt *xc, enum ldms_xprt_ops_e op_e) +{ + json_t *stat; + stat = json_object(); + switch (op_e) { + case LDMS_XPRT_OP_LOOKUP: + json_object_set_new(stat, "app_req", + json_real(__ts2double(xc->lookup_profile.app_req_ts))); + json_object_set_new(stat, "req_send", + json_real(__ts2double(xc->lookup_profile.req_send_ts))); + json_object_set_new(stat, "req_recv", + json_real(__ts2double(xc->lookup_profile.req_recv_ts))); + json_object_set_new(stat, "share", + json_real(__ts2double(xc->lookup_profile.share_ts))); + json_object_set_new(stat, "rendzv", + json_real(__ts2double(xc->lookup_profile.rendzv_ts))); + json_object_set_new(stat, "read", + json_real(__ts2double(xc->lookup_profile.read_ts))); + json_object_set_new(stat, "complete", + json_real(__ts2double(xc->lookup_profile.complete_ts))); + json_object_set_new(stat, "deliver", + json_real(__ts2double(xc->lookup_profile.deliver_ts))); + break; + case LDMS_XPRT_OP_UPDATE: + json_object_set_new(stat, "app_req", + json_real(__ts2double(xc->update_profile.app_req_ts))); + json_object_set_new(stat, "read_start", + json_real(__ts2double(xc->update_profile.read_ts))); + json_object_set_new(stat, "read_complete", + json_real(__ts2double(xc->update_profile.read_complete_ts))); + json_object_set_new(stat, "deliver", + json_real(__ts2double(xc->update_profile.deliver_ts))); + break; + case LDMS_XPRT_OP_SEND: + json_object_set_new(stat, "app_req", + json_real(__ts2double(xc->send_profile.app_req_ts))); + json_object_set_new(stat, "send", + json_real(__ts2double(xc->send_profile.send_ts))); + json_object_set_new(stat, "complete", + json_real(__ts2double(xc->send_profile.complete_ts))); + json_object_set_new(stat, "deliver", + json_real(__ts2double(xc->send_profile.deliver_ts))); + break; + case LDMS_XPRT_OP_SET_DELETE: + json_object_set_new(stat, "send", + json_real(__ts2double(xc->set_del_profile.send_ts))); + json_object_set_new(stat, "recv", + json_real(__ts2double(xc->set_del_profile.recv_ts))); + json_object_set_new(stat, "acknowledge", + json_real(__ts2double(xc->set_del_profile.ack_ts))); + break; + case LDMS_XPRT_OP_STREAM_PUBLISH: + json_object_set_new(stat, "hop_cnt", + json_integer(xc->stream_pub_profile.hop_num)); + json_object_set_new(stat, "recv", + json_real(__ts2double(xc->stream_pub_profile.recv_ts))); + json_object_set_new(stat, "send", + json_real(__ts2double(xc->stream_pub_profile.send_ts))); + break; + default: + break; + } + return stat; +} + +int __stream_profiling_as_json(json_t **_jobj, int is_reset) { + json_t *jobj, *strm_jobj, *src_jobj, *hop_jobj, *prf_array, *prf_jobj; + struct ldms_stream_stats_tq_s *tq; + struct ldms_stream_stats_s *ss; + struct ldms_stream_src_stats_s *strm_src; + struct ldms_stream_profile_ent *prf; + struct ldms_addr addr; + char addr_buf[128] = ""; + struct rbn *rbn; + int i, rc = 0; + + jobj = json_object(); + tq = ldms_stream_stats_tq_get(NULL, 0, is_reset); + if (!tq) { + /* no stream ... nothing to do here. */ + goto out; + } + TAILQ_FOREACH(ss, tq, entry) { + strm_jobj = json_object(); + + RBT_FOREACH(rbn, &ss->src_stats_rbt) { + src_jobj = json_array(); + + strm_src = container_of(rbn, struct ldms_stream_src_stats_s, rbn); + addr = strm_src->src; + ldms_addr_ntop(&addr, addr_buf, sizeof(addr_buf)); + TAILQ_FOREACH(prf, &strm_src->profiles, ent) { + hop_jobj = json_object(); + json_object_set_new(hop_jobj, "hop_count", json_integer(prf->profiles.hop_cnt)); + prf_array = json_array(); + json_object_set_new(hop_jobj, "profile", prf_array); + for (i = 0; i < prf->profiles.hop_cnt; i++) { + prf_jobj = json_object(); + json_object_set_new(prf_jobj, "recv", + json_real(__ts2double(prf->profiles.hops[i].recv_ts))); + json_object_set_new(prf_jobj, "deliver", + json_real(__ts2double(prf->profiles.hops[i].send_ts))); + json_array_append_new(prf_array, prf_jobj); + } + json_array_append_new(src_jobj, hop_jobj); + } + json_object_set_new(strm_jobj, addr_buf, src_jobj); + } + json_object_set_new(jobj, ss->name, strm_jobj); + } + out: + *_jobj = jobj; + return rc; +} + +int __xprt_profiling_as_json(json_t **_obj, int is_reset) +{ + json_t *obj, *ep_prf, *op_prf; + ldms_t x; + struct ldms_xprt_stats stats; + struct ldms_op_ctxt *xc; + int rc; + enum ldms_xprt_ops_e op_e; + char lhostname[128], lport_no[32], rhostname[128], rport_no[32], name[161]; + + + obj = json_object(); + if (!obj) { + ovis_log(config_log, OVIS_LCRIT, "Memory allocation failure\n"); + return ENOMEM; + } + for (x = ldms_xprt_first(); x; x = ldms_xprt_next(x)) { + rc = ldms_xprt_names(x, lhostname, sizeof(lhostname), + lport_no, sizeof(lport_no), + rhostname, sizeof(rhostname), + rport_no, sizeof(rport_no), + NI_NAMEREQD | NI_NUMERICSERV); + if (rc) { + if (rc == ENOTCONN) + continue; + } + + ldms_xprt_stats(x, &stats, LDMS_PERF_M_PROFILNG, is_reset); + snprintf(name, 160, "%s:%s", rhostname, rport_no); + ep_prf = json_object(); + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + op_prf = json_array(); + TAILQ_FOREACH(xc, &stats.op_ctxt_lists[op_e], ent) { + json_array_append_new(op_prf, __ldms_op_profiling_as_json(xc, op_e)); + } + json_object_set_new(ep_prf, ldms_xprt_op_names[op_e], op_prf); + + } + json_object_set_new(obj, name, ep_prf); + } + *_obj = obj; + return 0; +} + +static int profiling_handler(ldmsd_req_ctxt_t req) +{ + json_t *obj, *xprt_prf, *strm_prf; + char *json_as_str; + int rc = 0; + struct ldmsd_req_attr_s attr; + size_t str_len; + char *enable_str, *reset_str; + int is_enable = -1; /* -1 means only getting the profile data, don't enable/disable */ + int is_reset = 0; + + enable_str = ldmsd_req_attr_str_value_get_by_id(req, LDMSD_ATTR_TYPE); + if (enable_str) { + is_enable = 1; + if (0 == strcasecmp(enable_str, "false")) + is_enable = 0; /* disable */ + } + reset_str = ldmsd_req_attr_str_value_get_by_id(req, LDMSD_ATTR_RESET); + if (reset_str) { + is_reset = 1; + if (0 == strcasecmp(reset_str, "false")) + is_reset = 0; + } + + if (is_enable == 1) { + ldms_profiling_enable(-1, NULL, NULL); + } else if (is_enable == 0) { + ldms_profiling_disable(-1, NULL, NULL); + } + + /* + * The output JSON object looks like this: + * + * { + * "xprt": { + * : { + * "lookup": , + * "update": , + * "send": + * }, + * ... + * }, + * "stream" : { + * : , + * ... + * } + * } + */ + obj = json_object(); + (void)__xprt_profiling_as_json(&xprt_prf, is_reset); + json_object_set_new(obj, "xprt", xprt_prf); + + (void)__stream_profiling_as_json(&strm_prf, is_reset); + json_object_set_new(obj, "stream", strm_prf); + + json_as_str = json_dumps(obj, JSON_INDENT(0)); + str_len = strlen(json_as_str) + 1; /* +1 for \0 */ + + attr.discrim = 1; + attr.attr_id = LDMSD_ATTR_JSON; + attr.attr_len = str_len; + ldmsd_hton_req_attr(&attr); + + if (ldmsd_append_reply(req, (const char *)&attr, sizeof(attr), LDMSD_REQ_SOM_F)) + goto err; + + if (ldmsd_append_reply(req, json_as_str, str_len, 0)) + goto err; + + attr.discrim = 0; + if (ldmsd_append_reply(req, (const char *)&attr.discrim, sizeof(attr.discrim), LDMSD_REQ_EOM_F)) + goto err; + + free(obj); + free(json_as_str); + return 0; +err: + free(obj); + free(json_as_str); + req->errcode = rc; + ldmsd_send_req_response(req, "Failed to get ldms_xprt's probe data"); + return ENOMEM; +} + struct store_time_thread { pid_t tid; uint64_t store_time; diff --git a/ldms/src/ldmsd/ldmsd_request.h b/ldms/src/ldmsd/ldmsd_request.h index 5dab3a501b..39fdfd8032 100644 --- a/ldms/src/ldmsd/ldmsd_request.h +++ b/ldms/src/ldmsd/ldmsd_request.h @@ -165,6 +165,7 @@ enum ldmsd_request { LDMSD_DEFAULT_QUOTA_REQ, LDMSD_PID_FILE_REQ, LDMSD_BANNER_MODE_REQ, + LDMSD_PROFILING_REQ, /* failover requests by user */ LDMSD_FAILOVER_CONFIG_REQ = 0x700, /* "failover_config" user command */ diff --git a/ldms/src/ldmsd/ldmsd_request_util.c b/ldms/src/ldmsd/ldmsd_request_util.c index 40b623e9c4..710232a9dc 100644 --- a/ldms/src/ldmsd/ldmsd_request_util.c +++ b/ldms/src/ldmsd/ldmsd_request_util.c @@ -123,6 +123,7 @@ struct req_str_id req_str_id_table[] = { { "prdcr_stream_status",LDMSD_PRDCR_STREAM_STATUS_REQ }, { "prdcr_subscribe", LDMSD_PRDCR_SUBSCRIBE_REQ }, { "prdcr_unsubscribe", LDMSD_PRDCR_UNSUBSCRIBE_REQ }, + { "profiling", LDMSD_PROFILING_REQ }, { "publish_kernel", LDMSD_PUBLISH_KERNEL_REQ }, { "qgroup_config", LDMSD_QGROUP_CONFIG_REQ }, { "qgroup_info", LDMSD_QGROUP_INFO_REQ }, @@ -329,6 +330,7 @@ const char *ldmsd_req_id2str(enum ldmsd_request req_id) case LDMSD_CMDLINE_OPTIONS_SET_REQ : return "CMDLINE_OPTION_SET_REQ"; case LDMSD_SET_SEC_MOD_REQ : return "SET_SEC_REQ"; case LDMSD_LOG_STATUS_REQ : return "LOG_STATUS_REQ"; + case LDMSD_PROFILING_REQ : return "PROFILING_REQ"; /* failover requests by user */ case LDMSD_FAILOVER_CONFIG_REQ : return "FAILOVER_CONFIG_REQ";