Skip to content

Commit

Permalink
Merge pull request #49 from cmccarthy1/Assignment
Browse files Browse the repository at this point in the history
Assignment functionality addition
  • Loading branch information
cmccarthy1 authored Oct 28, 2020
2 parents 399f9b1 + fee6dfe commit bc933ff
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 1 deletion.
120 changes: 120 additions & 0 deletions kfk.c
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,126 @@ EXP K3(kfkPoll){
return kj(n);
}


/* The following set of functions define interactions with the assign functionality with Kafka.
* This provides more control to the user over where data can be consumed from.
* Note the differences between Kafka Assign vs Subscribe functionality, summarised in part
* https://github.com/confluentinc/confluent-kafka-dotnet/issues/278#issuecomment-318858243
*/

/** Define functionality needed for the addition and deletion of topic-partition
** pairs to the current assignment
* @param dict topic!associated-partitions --> S!J
*/

static V ptlistadd(K dict, rd_kafka_topic_partition_list_t *t_partition){
K dk=kK(dict)[0],dv=kK(dict)[1];
S*p;J*o,i;
p=kS(dk);o=kJ(dv);
for(i = 0; i < dk->n; i++)
rd_kafka_topic_partition_list_add(t_partition,p[i],o[i]);
}

static V ptlistdel(K dict,rd_kafka_topic_partition_list_t *t_partition){
K dk=kK(dict)[0],dv=kK(dict)[1];
S*p;J*o,i;
p=kS(dk);o=kJ(dv);
for(i = 0; i < dk->n; i++)
rd_kafka_topic_partition_list_del(t_partition,p[i],o[i]);
}

/** Assign the partitions from which to consume data for specified topics
* @param x Client Index (previously created)
* @param y Dictionary mapping individual topics to associated partitions. S!J
* @returns Null value on correct execution
*/

EXP K2(kfkAssignTopPar){
rd_kafka_t *rk;
rd_kafka_topic_partition_list_t *t_partition;
rd_kafka_resp_err_t err;
if(!checkType("i", x))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
t_partition = rd_kafka_topic_partition_list_new(y->n);
// topic-partition assignment
ptlistadd(y,t_partition);
if(KFK_OK != (err=rd_kafka_assign(rk,t_partition)))
return krr((S) rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(t_partition);
return KNL;
}

/** Return the current consumption assignment for a specified client
* @param x Client Index (previously created)
* @returns List of dictionaries defining information about the current assignment
*/
EXP K1(kfkAssignment){
K r;
rd_kafka_topic_partition_list_t *t;
rd_kafka_t *rk;
rd_kafka_resp_err_t err;
if(!checkType("i", x))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
if(KFK_OK != (err=rd_kafka_assignment(rk, &t)))
return krr((S)rd_kafka_err2str(err));
r = decodeParList(t);
rd_kafka_topic_partition_list_destroy(t);
return r;
}

/** Add to the current assignment for a client with new topic-partition pair
* @param x Client Index (previously created)
* @param y Dictionary mapping individual topics to associated partitions. S!J
* @returns Null value on correct execution
*/

EXP K2(kfkAssignmentAdd){
rd_kafka_t *rk;
rd_kafka_topic_partition_list_t *t;
rd_kafka_resp_err_t err;
if(!checkType("i", x))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
// retrieve the current assignment
if(KFK_OK != (err=rd_kafka_assignment(rk, &t)))
return krr((S)rd_kafka_err2str(err));
ptlistadd(y,t);
if(KFK_OK != (err=rd_kafka_assign(rk,t)))
return krr((S) rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(t);
return 0;
}

/** Delete a topic-partition mapped dictionary from the current assignment for a client
* @param x Client Index (previously created)
* @param y Dictionary mapping individual topics to associated partitions. S!J
* @returns Null value on correct execution
*/

EXP K2(kfkAssignmentDel){
rd_kafka_t *rk;
rd_kafka_topic_partition_list_t *t;
rd_kafka_resp_err_t err;
if(!checkType("i", x))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
// retrieve the current assignment
if(KFK_OK != (err=rd_kafka_assignment(rk, &t)))
return krr((S)rd_kafka_err2str(err));
ptlistdel(y,t);
if(KFK_OK != (err=rd_kafka_assign(rk,t)))
return krr((S) rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(t);
return 0;
}


// other
EXP K1(kfkOutQLen){
rd_kafka_t *rk;
Expand Down
75 changes: 74 additions & 1 deletion kfk.q
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ funcs:(
// .kfk.VersionSym[]:s
(`kfkVersionSym;1);
// .kfk.SetLoggerLevel[client_id:i;int_level:i]:()
(`kfkSetLoggerLevel;2)
(`kfkSetLoggerLevel;2);
// .kfk.Assignment[client_id:i]:T
(`kfkAssignment;1);
// .kfk.AssignTopPar[client_id:i;topic_partition:S!J]:()
(`kfkAssignTopPar;2);
// .kfk.AssignmentAdd[client_id:i;topic_partition:S!J]:()
(`kfkAssignmentAdd;2);
// .kfk.AssignmentDel[client_id:i;topic_partition:S!J]:()
(`kfkAssignmentDel;2)
);

// binding functions from dictionary funcs using rule
Expand Down Expand Up @@ -156,6 +164,71 @@ Subscribe:{[cid;top;part;cb]
}


// Assignment API logic

// Assign a new topic-partition dictionary to be consumed by a designated clientid
/* cid = Integer denoting client ID
/* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition
Assign:{[cid;toppar]
i.checkDict[toppar];
// Create a distinct set of topic-partition pairs to assign,
// non distinct entries cause a segfault
toppar:(!). flip distinct(,'/)(key;value)@\:toppar;
AssignTopPar[cid;toppar]
}

// Assign additional topic-partition pairs which could be consumed from
/* cid = Integer denoting client ID
/* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition
AssignAdd:{[cid;toppar]
tpdict:i.assignCheck[cid;toppar;0b];
AssignmentAdd[cid;tpdict];
}

// Remove assigned topic-parition pairs from the current assignment from which data can be consumed
/* cid = Integer denoting client ID
/* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition
AssignDel:{[cid;toppar]
tpdict:i.assignCheck[cid;toppar;1b];
AssignmentDel[cid;tpdict];
}

// Utility function to check current assignment against proposed additions/deletions,
// retirn unique toppar pairs as a dictionary to avoid segfaults from duplicate
/* cid = Integer denoting the client ID
/* toppar = Symbol!Long dictionary mapping the name of a topic to an associated partition
/* addDel = Boolean denoting addition/deletion functionality
i.assignCheck:{[cid;toppar;addDel]
i.checkDict[toppar];
// Generate the partition provided used to compare to current assignment
tplist:distinct(,'/)(key;value)@\:toppar;
// Mark locations where user is attempting to delete from an non existent assignment
loc:$[addDel;not;]i.compAssign[cid;tplist];
if[any loc;
show tplist where loc;
$[addDel;
'"The above topic-partition pairs cannot be deleted as they are not assigned";
'"The above topic-partition pairs already exist, please modify dictionary"]
];
(!). flip tplist
}

// dictionary defining the current assignment for used in comparisons
i.compAssign:{[cid;tplist]
assignment:Assignment[cid];
// current assignment is a list of dictionaries
currentTopPar:(assignment@'`topic),'"j"$assignment@'`partition;
tplist in currentTopPar
}

// Ensure that the dictionaries used in assignments map symbol to long
i.checkDict:{[dict]
if[not 99h=type dict ;'"Final parameter must be a dictionary"];
if[not 11h=type key dict ;'"Dictionary key must of type symbol"];
if[not 7h =type value dict;'"Dictionary values must be of type long"];
}


// Handling of error callbacks (rd_kafka_conf_set_error_cb)
/* cid is an integer
/* err_int is an integer code relating to the kafka issue
Expand Down

0 comments on commit bc933ff

Please sign in to comment.