From 6654ffd8422858bc5a3b646f69aa056a1113be05 Mon Sep 17 00:00:00 2001 From: Conor McCarthy Date: Wed, 17 Jun 2020 10:12:25 +0100 Subject: [PATCH 1/4] Initial commit of assignment functionality addition --- kfk.c | 120 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ kfk.q | 10 ++++- 2 files changed, 129 insertions(+), 1 deletion(-) diff --git a/kfk.c b/kfk.c index faa2725..90265dd 100644 --- a/kfk.c +++ b/kfk.c @@ -644,6 +644,126 @@ EXP K3(kfkPoll){ return kj(n); } + +/* The following set of functions define interactions with the assign functionality with Kafka. + * This provides more control to the user over where data can be consumed from. + * Note the differences between Kafka Assign vs Subscribe functionality, summarised in part + * https://github.com/confluentinc/confluent-kafka-dotnet/issues/278#issuecomment-318858243 +*/ + +/** Define functionality needed for the addition and deletion of topic-partition + ** pairs to the current assignment + * @param dict topic!associated-partitions --> S!J +*/ + +static V ptlistadd(K dict, rd_kafka_topic_partition_list_t *t_partition){ + K dk=kK(dict)[0],dv=kK(dict)[1]; + S*p;J*o,i; + p=kS(dk);o=kJ(dv); + for(i = 0; i < dk->n; i++) + rd_kafka_topic_partition_list_add(t_partition,p[i],o[i]); +} + +static V ptlistdel(K dict,rd_kafka_topic_partition_list_t *t_partition){ + K dk=kK(dict)[0],dv=kK(dict)[1]; + S*p;J*o,i; + p=kS(dk);o=kJ(dv); + for(i = 0; i < dk->n; i++) + rd_kafka_topic_partition_list_del(t_partition,p[i],o[i]); +} + +/** Assign the partitions from which to consume data for specified topics + * @param x Client Index (previously created) + * @param y Dictionary mapping individual topics to associated partitions. S!J + * @returns Null value on correct execution +*/ + +EXP K2(kfkAssign){ + rd_kafka_t *rk; + rd_kafka_topic_partition_list_t *t_partition; + rd_kafka_resp_err_t err; + if(!checkType("i!", x, y)) + return KNL; + if(!(rk= clientIndex(x))) + return KNL; + t_partition = rd_kafka_topic_partition_list_new(y->n); + // topic-partition assignment + ptlistadd(y,t_partition); + if(KFK_OK != (err=rd_kafka_assign(rk,t_partition))) + return krr((S) rd_kafka_err2str(err)); + rd_kafka_topic_partition_list_destroy(t_partition); + return KNL; +} + +/** Return the current consumption assignment for a specified client + * @param x Client Index (previously created) + * @returns List of dictionaries defining information about the current assignment +*/ +EXP K1(kfkAssignment){ + K r; + rd_kafka_topic_partition_list_t *t; + rd_kafka_t *rk; + rd_kafka_resp_err_t err; + if(!checkType("i", x)) + return KNL; + if(!(rk= clientIndex(x))) + return KNL; + if(KFK_OK != (err=rd_kafka_assignment(rk, &t))) + return krr((S)rd_kafka_err2str(err)); + r = decodeParList(t); + rd_kafka_topic_partition_list_destroy(t); + return r; +} + +/** Add to the current assignment for a client with new topic-partition pair + * @param x Client Index (previously created) + * @param y Dictionary mapping individual topics to associated partitions. S!J + * @returns Null value on correct execution +*/ + +EXP K2(kfkAssignmentAdd){ + rd_kafka_t *rk; + rd_kafka_topic_partition_list_t *t; + rd_kafka_resp_err_t err; + if(!checkType("i", x)) + return KNL; + if(!(rk= clientIndex(x))) + return KNL; + // retrieve the current assignment + if(KFK_OK != (err=rd_kafka_assignment(rk, &t))) + return krr((S)rd_kafka_err2str(err)); + ptlistadd(y,t); + if(KFK_OK != (err=rd_kafka_assign(rk,t))) + return krr((S) rd_kafka_err2str(err)); + rd_kafka_topic_partition_list_destroy(t); + return 0; +} + +/** Delete a topic-partition mapped dictionary from the current assignment for a client + * @param x Client Index (previously created) + * @param y Dictionary mapping individual topics to associated partitions. S!J + * @returns Null value on correct execution +*/ + +EXP K2(kfkAssignmentDel){ + rd_kafka_t *rk; + rd_kafka_topic_partition_list_t *t; + rd_kafka_resp_err_t err; + if(!checkType("i", x)) + return KNL; + if(!(rk= clientIndex(x))) + return KNL; + // retrieve the current assignment + if(KFK_OK != (err=rd_kafka_assignment(rk, &t))) + return krr((S)rd_kafka_err2str(err)); + ptlistdel(y,t); + if(KFK_OK != (err=rd_kafka_assign(rk,t))) + return krr((S) rd_kafka_err2str(err)); + rd_kafka_topic_partition_list_destroy(t); + return 0; +} + + // other EXP K1(kfkOutQLen){ rd_kafka_t *rk; diff --git a/kfk.q b/kfk.q index 5af0784..b2e9534 100644 --- a/kfk.q +++ b/kfk.q @@ -54,7 +54,15 @@ funcs:( // .kfk.VersionSym[]:s (`kfkVersionSym;1); // .kfk.SetLoggerLevel[client_id:i;int_level:i]:() - (`kfkSetLoggerLevel;2) + (`kfkSetLoggerLevel;2); + // .kfk.Assignment[client_id:i]:T + (`kfkAssignment;1); + // .kfk.Assign[client_id:i;topic_partition:S!J]:() + (`kfkAssign;2); + // .kfk.AssignmentAdd[client_id:i;topic_partition:S!J]:() + (`kfkAssignmentAdd;2); + // .kfk.AssignmentDel[client_id:i;topic_partition:S!J]:() + (`kfkAssignDel;2); ); // binding functions from dictionary funcs using rule From b39097be2c31d9a3fd5113c57af96df38ac0ef77 Mon Sep 17 00:00:00 2001 From: Conor McCarthy Date: Thu, 18 Jun 2020 20:56:10 +0100 Subject: [PATCH 2/4] Addition of logic to handle assignment deletion/addition without assigning/deleting something that does/doesn't exist --- kfk.q | 51 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/kfk.q b/kfk.q index b2e9534..fb30af8 100644 --- a/kfk.q +++ b/kfk.q @@ -62,7 +62,7 @@ funcs:( // .kfk.AssignmentAdd[client_id:i;topic_partition:S!J]:() (`kfkAssignmentAdd;2); // .kfk.AssignmentDel[client_id:i;topic_partition:S!J]:() - (`kfkAssignDel;2); + (`kfkAssignmentDel;2) ); // binding functions from dictionary funcs using rule @@ -142,17 +142,52 @@ Subscribe:{[cid;top;part;cb] } +// Assignment API logic + +// Assign additional topic-partition pairs which could be consumed from +/* cid = Integer denoting client ID +/* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition +AssignAdd:{[cid;toppar] + // Generate the unique topic partition lists used to compare to current assignment + tplist:(,'/)(key::;value::)@\:toppar; + // Mark locations where user is attempting to add an already existing assignment + loc:i.compAssign[cid;tplist]; + $[any loc; + [show tplist where loc;'"The above topic-partition pairs already exist, please modify dictionary"]; + AssignmentAdd[cid;toppar]]; + } + +// Remove assigned topic-parition pairs from the current assignment from which data can be consumed +/* cid = Integer denoting client ID +/* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition +AssignDel:{[cid;toppar] + // Generate the unique topic partition lists used to compare to current assignment + tplist:(,'/)(key::;value::)@\:toppar; + // Mark locations where user is attempting to delete from an non existent assignment + loc:not i.compAssign[cid;tplist]; + $[any loc; + [show tplist where loc;'"The above topic-partition pairs cannot be deleted as they are not currently assigned"]; + AssignmentDel[cid;toppar]]; + } + +// dictionary defining the current assignment for used in comparisons +i.compAssign:{[cid;tplist] + assignment:Assignment[cid]; + tplist in(assignment@'`topic),'"j"$assignment@'`partition + } + + // Addition of error callback (rd_kafka_conf_set_error_cb) -/* cid is an integer -/* err_int is an integer code relating to the kafka issue -/* reason is a string denoting the reason for the error +/* cid = Integer denoting client ID +/* err_int = Integer denoting the error code relating to the kafka issue raised +/* reason = String denoting the reason for the error errcb:{[cid;err_int;reason]} // Triggered callback on non-zero throttle time from a broker (rd_kafka_conf_set_throttle_cb) -/* cid is an integer -/* broker_name is a string -/* broker_id is an integer -/* throttle_time_ms is an integer +/* cid = Integer denoting client ID +/* broker_name = String denoting the name of the broker from which the callback originated +/* broker_id = Integer denoting the identifying number of the broker +/* throttle_time_ms = is an integer denoting the throttle time throttlecb:{[cid;broker_name;broker_id;throttle_time_ms]} \d . From bb436e78831d2729348df036c7c4b163f7ddb3c4 Mon Sep 17 00:00:00 2001 From: Conor McCarthy Date: Thu, 18 Jun 2020 21:26:18 +0100 Subject: [PATCH 3/4] Logic to protect assign from segfault, protection from toppar being an unsuitable dictionary input --- kfk.c | 4 ++-- kfk.q | 25 ++++++++++++++++++++++--- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/kfk.c b/kfk.c index 90265dd..c038a08 100644 --- a/kfk.c +++ b/kfk.c @@ -678,11 +678,11 @@ static V ptlistdel(K dict,rd_kafka_topic_partition_list_t *t_partition){ * @returns Null value on correct execution */ -EXP K2(kfkAssign){ +EXP K2(kfkAssignTopPar){ rd_kafka_t *rk; rd_kafka_topic_partition_list_t *t_partition; rd_kafka_resp_err_t err; - if(!checkType("i!", x, y)) + if(!checkType("i", x)) return KNL; if(!(rk= clientIndex(x))) return KNL; diff --git a/kfk.q b/kfk.q index fb30af8..0f6081d 100644 --- a/kfk.q +++ b/kfk.q @@ -57,8 +57,8 @@ funcs:( (`kfkSetLoggerLevel;2); // .kfk.Assignment[client_id:i]:T (`kfkAssignment;1); - // .kfk.Assign[client_id:i;topic_partition:S!J]:() - (`kfkAssign;2); + // .kfk.AssignTopPar[client_id:i;topic_partition:S!J]:() + (`kfkAssignTopPar;2); // .kfk.AssignmentAdd[client_id:i;topic_partition:S!J]:() (`kfkAssignmentAdd;2); // .kfk.AssignmentDel[client_id:i;topic_partition:S!J]:() @@ -144,10 +144,21 @@ Subscribe:{[cid;top;part;cb] // Assignment API logic +// Assign a new topic-partition dictionary to be consumed by a designated clientid +/* cid = Integer denoting client ID +/* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition +Assign:{[cid;toppar] + i.checkDict[toppar]; + // Create a distinct set of topic-partition pairs to assign, non distinct entries cause a segfault + toppar:(!). flip distinct(,'/)(key::;value::)@\:toppar; + AssignTopPar[cid](!). flip distinct(,'/)(key::;value::)@\:toppar; + } + // Assign additional topic-partition pairs which could be consumed from /* cid = Integer denoting client ID /* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition AssignAdd:{[cid;toppar] + i.checkDict[toppar]; // Generate the unique topic partition lists used to compare to current assignment tplist:(,'/)(key::;value::)@\:toppar; // Mark locations where user is attempting to add an already existing assignment @@ -161,12 +172,13 @@ AssignAdd:{[cid;toppar] /* cid = Integer denoting client ID /* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition AssignDel:{[cid;toppar] + i.checkDict[toppar]; // Generate the unique topic partition lists used to compare to current assignment tplist:(,'/)(key::;value::)@\:toppar; // Mark locations where user is attempting to delete from an non existent assignment loc:not i.compAssign[cid;tplist]; $[any loc; - [show tplist where loc;'"The above topic-partition pairs cannot be deleted as they are not currently assigned"]; + [show tplist where loc;'"The above topic-partition pairs cannot be deleted as they are not assigned"]; AssignmentDel[cid;toppar]]; } @@ -176,6 +188,13 @@ i.compAssign:{[cid;tplist] tplist in(assignment@'`topic),'"j"$assignment@'`partition } +// Ensure that the dictionaries used in assignments map symbol to long +i.checkDict:{[dict] + if[not 99h=type dict ;'"Final parameter must be a dictionary"]; + if[not 11h=type key dict ;'"Dictionary key must of type symbol"]; + if[not 7h =type value dict;'"Dictionary values must be of type long"]; + } + // Addition of error callback (rd_kafka_conf_set_error_cb) /* cid = Integer denoting client ID From fee6dfe5ce92dfdca9f550b599b4254318410af2 Mon Sep 17 00:00:00 2001 From: Conor McCarthy Date: Wed, 28 Oct 2020 09:20:40 +0000 Subject: [PATCH 4/4] Update to remove code duplication, stronger logical conditioning and unnecessary constructs --- kfk.q | 47 +++++++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/kfk.q b/kfk.q index 63d7c12..8272182 100644 --- a/kfk.q +++ b/kfk.q @@ -171,43 +171,54 @@ Subscribe:{[cid;top;part;cb] /* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition Assign:{[cid;toppar] i.checkDict[toppar]; - // Create a distinct set of topic-partition pairs to assign, non distinct entries cause a segfault - toppar:(!). flip distinct(,'/)(key::;value::)@\:toppar; - AssignTopPar[cid](!). flip distinct(,'/)(key::;value::)@\:toppar; + // Create a distinct set of topic-partition pairs to assign, + // non distinct entries cause a segfault + toppar:(!). flip distinct(,'/)(key;value)@\:toppar; + AssignTopPar[cid;toppar] } // Assign additional topic-partition pairs which could be consumed from /* cid = Integer denoting client ID /* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition AssignAdd:{[cid;toppar] - i.checkDict[toppar]; - // Generate the unique topic partition lists used to compare to current assignment - tplist:(,'/)(key::;value::)@\:toppar; - // Mark locations where user is attempting to add an already existing assignment - loc:i.compAssign[cid;tplist]; - $[any loc; - [show tplist where loc;'"The above topic-partition pairs already exist, please modify dictionary"]; - AssignmentAdd[cid;toppar]]; + tpdict:i.assignCheck[cid;toppar;0b]; + AssignmentAdd[cid;tpdict]; } // Remove assigned topic-parition pairs from the current assignment from which data can be consumed /* cid = Integer denoting client ID /* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition AssignDel:{[cid;toppar] + tpdict:i.assignCheck[cid;toppar;1b]; + AssignmentDel[cid;tpdict]; + } + +// Utility function to check current assignment against proposed additions/deletions, +// retirn unique toppar pairs as a dictionary to avoid segfaults from duplicate +/* cid = Integer denoting the client ID +/* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition +/* addDel = Boolean denoting addition/deletion functionality +i.assignCheck:{[cid;toppar;addDel] i.checkDict[toppar]; - // Generate the unique topic partition lists used to compare to current assignment - tplist:(,'/)(key::;value::)@\:toppar; + // Generate the partition provided used to compare to current assignment + tplist:distinct(,'/)(key;value)@\:toppar; // Mark locations where user is attempting to delete from an non existent assignment - loc:not i.compAssign[cid;tplist]; - $[any loc; - [show tplist where loc;'"The above topic-partition pairs cannot be deleted as they are not assigned"]; - AssignmentDel[cid;toppar]]; + loc:$[addDel;not;]i.compAssign[cid;tplist]; + if[any loc; + show tplist where loc; + $[addDel; + '"The above topic-partition pairs cannot be deleted as they are not assigned"; + '"The above topic-partition pairs already exist, please modify dictionary"] + ]; + (!). flip tplist } // dictionary defining the current assignment for used in comparisons i.compAssign:{[cid;tplist] assignment:Assignment[cid]; - tplist in(assignment@'`topic),'"j"$assignment@'`partition + // current assignment is a list of dictionaries + currentTopPar:(assignment@'`topic),'"j"$assignment@'`partition; + tplist in currentTopPar } // Ensure that the dictionaries used in assignments map symbol to long