Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assignment functionality addition #49

Merged
merged 9 commits into from
Oct 28, 2020
120 changes: 120 additions & 0 deletions kfk.c
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,126 @@ EXP K3(kfkPoll){
return kj(n);
}


/* The following set of functions define interactions with the assign functionality with Kafka.
* This provides more control to the user over where data can be consumed from.
* Note the differences between Kafka Assign vs Subscribe functionality, summarised in part
* https://github.com/confluentinc/confluent-kafka-dotnet/issues/278#issuecomment-318858243
*/

/** Define functionality needed for the addition and deletion of topic-partition
** pairs to the current assignment
* @param dict topic!associated-partitions --> S!J
*/

static V ptlistadd(K dict, rd_kafka_topic_partition_list_t *t_partition){
K dk=kK(dict)[0],dv=kK(dict)[1];
S*p;J*o,i;
p=kS(dk);o=kJ(dv);
for(i = 0; i < dk->n; i++)
rd_kafka_topic_partition_list_add(t_partition,p[i],o[i]);
}

static V ptlistdel(K dict,rd_kafka_topic_partition_list_t *t_partition){
K dk=kK(dict)[0],dv=kK(dict)[1];
S*p;J*o,i;
p=kS(dk);o=kJ(dv);
for(i = 0; i < dk->n; i++)
rd_kafka_topic_partition_list_del(t_partition,p[i],o[i]);
}

/** Assign the partitions from which to consume data for specified topics
* @param x Client Index (previously created)
* @param y Dictionary mapping individual topics to associated partitions. S!J
* @returns Null value on correct execution
*/

EXP K2(kfkAssignTopPar){
rd_kafka_t *rk;
rd_kafka_topic_partition_list_t *t_partition;
rd_kafka_resp_err_t err;
if(!checkType("i", x))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
t_partition = rd_kafka_topic_partition_list_new(y->n);
// topic-partition assignment
ptlistadd(y,t_partition);
if(KFK_OK != (err=rd_kafka_assign(rk,t_partition)))
return krr((S) rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(t_partition);
return KNL;
}

/** Return the current consumption assignment for a specified client
* @param x Client Index (previously created)
* @returns List of dictionaries defining information about the current assignment
*/
EXP K1(kfkAssignment){
K r;
rd_kafka_topic_partition_list_t *t;
rd_kafka_t *rk;
rd_kafka_resp_err_t err;
if(!checkType("i", x))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
if(KFK_OK != (err=rd_kafka_assignment(rk, &t)))
return krr((S)rd_kafka_err2str(err));
r = decodeParList(t);
rd_kafka_topic_partition_list_destroy(t);
return r;
}

/** Add to the current assignment for a client with new topic-partition pair
* @param x Client Index (previously created)
* @param y Dictionary mapping individual topics to associated partitions. S!J
* @returns Null value on correct execution
*/

EXP K2(kfkAssignmentAdd){
rd_kafka_t *rk;
rd_kafka_topic_partition_list_t *t;
rd_kafka_resp_err_t err;
if(!checkType("i", x))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
// retrieve the current assignment
if(KFK_OK != (err=rd_kafka_assignment(rk, &t)))
return krr((S)rd_kafka_err2str(err));
ptlistadd(y,t);
if(KFK_OK != (err=rd_kafka_assign(rk,t)))
return krr((S) rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(t);
return 0;
}

/** Delete a topic-partition mapped dictionary from the current assignment for a client
* @param x Client Index (previously created)
* @param y Dictionary mapping individual topics to associated partitions. S!J
* @returns Null value on correct execution
*/

EXP K2(kfkAssignmentDel){
rd_kafka_t *rk;
rd_kafka_topic_partition_list_t *t;
rd_kafka_resp_err_t err;
if(!checkType("i", x))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
// retrieve the current assignment
if(KFK_OK != (err=rd_kafka_assignment(rk, &t)))
return krr((S)rd_kafka_err2str(err));
ptlistdel(y,t);
if(KFK_OK != (err=rd_kafka_assign(rk,t)))
return krr((S) rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(t);
return 0;
}


// other
EXP K1(kfkOutQLen){
rd_kafka_t *rk;
Expand Down
75 changes: 74 additions & 1 deletion kfk.q
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ funcs:(
// .kfk.VersionSym[]:s
(`kfkVersionSym;1);
// .kfk.SetLoggerLevel[client_id:i;int_level:i]:()
(`kfkSetLoggerLevel;2)
(`kfkSetLoggerLevel;2);
// .kfk.Assignment[client_id:i]:T
(`kfkAssignment;1);
// .kfk.AssignTopPar[client_id:i;topic_partition:S!J]:()
(`kfkAssignTopPar;2);
// .kfk.AssignmentAdd[client_id:i;topic_partition:S!J]:()
(`kfkAssignmentAdd;2);
// .kfk.AssignmentDel[client_id:i;topic_partition:S!J]:()
(`kfkAssignmentDel;2)
);

// binding functions from dictionary funcs using rule
Expand Down Expand Up @@ -156,6 +164,71 @@ Subscribe:{[cid;top;part;cb]
}


// Assignment API logic

// Assign a new topic-partition dictionary to be consumed by a designated clientid
/* cid = Integer denoting client ID
/* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition
Assign:{[cid;toppar]
i.checkDict[toppar];
// Create a distinct set of topic-partition pairs to assign,
// non distinct entries cause a segfault
toppar:(!). flip distinct(,'/)(key;value)@\:toppar;
AssignTopPar[cid;toppar]
}

// Assign additional topic-partition pairs which could be consumed from
/* cid = Integer denoting client ID
/* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition
AssignAdd:{[cid;toppar]
tpdict:i.assignCheck[cid;toppar;0b];
AssignmentAdd[cid;tpdict];
}

// Remove assigned topic-parition pairs from the current assignment from which data can be consumed
/* cid = Integer denoting client ID
/* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition
AssignDel:{[cid;toppar]
tpdict:i.assignCheck[cid;toppar;1b];
AssignmentDel[cid;tpdict];
}

// Utility function to check current assignment against proposed additions/deletions,
// retirn unique toppar pairs as a dictionary to avoid segfaults from duplicate
/* cid = Integer denoting the client ID
/* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition
/* addDel = Boolean denoting addition/deletion functionality
i.assignCheck:{[cid;toppar;addDel]
i.checkDict[toppar];
// Generate the partition provided used to compare to current assignment
tplist:distinct(,'/)(key;value)@\:toppar;
// Mark locations where user is attempting to delete from an non existent assignment
loc:$[addDel;not;]i.compAssign[cid;tplist];
if[any loc;
show tplist where loc;
$[addDel;
'"The above topic-partition pairs cannot be deleted as they are not assigned";
'"The above topic-partition pairs already exist, please modify dictionary"]
];
(!). flip tplist
}

// dictionary defining the current assignment for used in comparisons
i.compAssign:{[cid;tplist]
assignment:Assignment[cid];
// current assignment is a list of dictionaries
currentTopPar:(assignment@'`topic),'"j"$assignment@'`partition;
tplist in currentTopPar
}

// Ensure that the dictionaries used in assignments map symbol to long
i.checkDict:{[dict]
if[not 99h=type dict ;'"Final parameter must be a dictionary"];
if[not 11h=type key dict ;'"Dictionary key must of type symbol"];
if[not 7h =type value dict;'"Dictionary values must be of type long"];
}


// Handling of error callbacks (rd_kafka_conf_set_error_cb)
/* cid is an integer
/* err_int is an integer code relating to the kafka issue
Expand Down