由于支持Kafka的多种压缩方式,因此系统需要预先安装zlib,snappy,lz4(>=1.7.5),zstd等第三方库。
支持CMake和Bazel两种编译方式。
CMake:执行命令make KAFKA=y 编译独立的类库(libwfkafka.a和libwfkafka.so)支持kafka协议;cd tutorial; make KAFKA=y 可以编译kafka_cli
Bazel:执行bazel build kafka 编译支持kafka协议的类库;执行bazel build kafka_cli 编译kafka_cli
这是一个kafka client,可以完成kafka的消息生产(produce)和消息消费(fetch)。
编译时需要在tutorial目录中执行编译命令make KAFKA=y或者在项目根目录执行make KAFKA=y tutorial。
该程序从命令行读取一个kafka broker服务器地址和本次任务的类型(produce/fetch):
./kafka_cli <broker_url> [p/c]
程序会在执行完任务后自动退出,一切资源完全回收。
其中broker_url可以有多个url组成,多个url之间以,分割
- 形式如:kafka://host:port,kafka://host1:port...
- port默认为9092;
- 如果用户在这一层有upstream选取需求,可以参考upstream文档。
Kafka broker_url示例:
kafka://127.0.0.1/
kafka://kafka.host:9090/
kafka://10.160.23.23:9000,10.123.23.23,kafka://kafka.sogou
kafka client内部实现上除了压缩功能外没有依赖第三方库,同时利用了workflow的高性能,在合理的配置和环境下,每秒钟可以处理几万次Kafka请求。
在内部实现上,kafka client会把一次请求按照内部使用到的broker分拆成并行parallel任务,每个broker地址对应parallel任务中的一个子任务,
这样可以最大限度的提升效率,同时利用workflow内部对连接的复用机制使得整体的连接数控制在一个合理的范围。
如果一个broker地址下有多个topic partition,为了提高吞吐,应该创建多个client,然后按照topic partition分别创建任务独立启动。
首先需要创建一个WFKafkaClient对象,然后调用init函数初始化WFKafkaClient对象,
int init(const std::string& broker_url);
int init(const std::string& broker_url, const std::string& group);
其中broker_url是kafka broker集群的地址,格式可以参考上面的broker_url,
group是消费者组的group_name,用在基于消费者组的fetch任务中,如果是produce任务或者没有使用消费者组的fetch任务,则不需要使用此接口;
用消费者组的时候,可以设置heartbeat的间隔时间,时间单位是毫秒,用于维持心跳:
void set_heartbeat_interval(size_t interval_ms);
后面再通过WFKafkaClient对象创建kafka任务
using kafka_callback_t = std::function<void (WFKafkaTask *)>;
WFKafkaTask *create_kafka_task(const std::string& query, int retry_max, kafka_callback_t cb);
WFKafkaTask *create_kafka_task(int retry_max, kafka_callback_t cb);
其中query中包含此次任务的类型以及topic等属性,retry_max表示最大重试次数,cb为用户自定义的callback函数,当task执行完毕后会被调用,
接着还可以修改task的默认配置以满足实际需要,详细接口可以在KafkaDataTypes.h中查看
KafkaConfig config;
config.set_client_id("workflow");
task->set_config(std::move(config));
支持的配置选项描述如下:
配置名 | 类型 | 默认值 | 含义 |
---|---|---|---|
produce_timeout | int | 100ms | produce的超时时间 |
produce_msg_max_bytes | int | 1000000 bytes | 单个消息的最大长度限制 |
produce_msgset_cnt | int | int | 10000 |
produce_msgset_max_bytes | int | 1000000 bytes | 一次通信消息集合的最大长度限制 |
fetch_timeout | int | 100ms | fetch的超时时间 |
fetch_min_bytes | int | 1 byte | 一次fetch通信最小消息的长度 |
fetch_max_bytes | int | 50M bytes | 一次fetch通信最大消息的长度 |
fetch_msg_max_bytes | int | 1M bytes | 一次fetch通信单个消息的最大长度 |
offset_timestamp | long long int | -2 | 消费者组模式下,没有找到历史offset时,初始化的offset,-2表示最久,-1表示最新 |
session_timeout | int | 10s | 加入消费者组初始化时的超时时间 |
rebalance_timeout | int | 10s | 加入消费者组同步信息阶段的超时时间 |
produce_acks | int | -1 | produce任务在返回之前应确保消息成功复制的broker节点数,-1表示所有的复制broker节点 |
allow_auto_topic_creation | bool | true | produce时topic不存在时,是否自动创建topic |
broker_version | char * | NULL | 指定broker的版本号,<0.10时需要手动指定 |
compress_type | int | NoCompress | produce消息的压缩类型 |
client_id | char * | NULL | 表示client的id |
check_crcs | bool | false | fetch任务中是否校验消息的crc32 |
offset_store | int | 0 | 加入消费者组时,是否使用上次提交offset,1表示使用指定的offset,0表示优先使用上次提交 |
sasl_mechanisms | char * | NULL | sasl认证类型,目前支持plain和scram |
sasl_username | char * | NULL | sasl认证所需的username |
sasl_password | char * | NULL | sasl认证所需的password |
最后就可以调用start接口启动kafka任务。
1、在创建并初始化WFKafkaClient之后,可以在query中直接指定topic等信息创建WFKafkaTask任务
使用示例如下:
int main(int argc, char *argv[])
{
...
client = new WFKafkaClient();
client->init(url);
task = client->create_kafka_task("api=fetch&topic=xxx&topic=yyy", 3, kafka_callback);
...
task->start();
...
}
2、在创建完WFKafkaTask之后,先通过调用set_key, set_value, add_header_pair等方法构建KafkaRecord,
关于KafkaRecord的更多接口,可以在KafkaDataTypes.h中查看
然后应该通过调用add_produce_record添加KafkaRecord,关于更多接口的详细定义,可以在WFKafkaClient.h中查看
需要注意的是,add_produce_record的第二个参数partition,当>=0是表示指定的partition,-1表示随机指定partition或者调用自定义的kafka_partitioner_t
kafka_partitioner_t可以通过set_partitioner接口设置自定义规则。
使用示例如下:
int main(int argc, char *argv[])
{
...
WFKafkaClient *client_fetch = new WFKafkaClient();
client_fetch->init(url);
task = client_fetch->create_kafka_task("api=produce&topic=xxx&topic=yyy", 3, kafka_callback);
task->set_partitioner(partitioner);
KafkaRecord record;
record.set_key("key1", strlen("key1"));
record.set_value(buf, sizeof(buf));
record.add_header_pair("hk1", 3, "hv1", 3);
task->add_produce_record("workflow_test1", -1, std::move(record));
...
task->start();
...
}
3、produce还可以使用kafka支持的4种压缩协议,通过设置配置项来实现
使用示例如下:
int main(int argc, char *argv[])
{
...
WFKafkaClient *client_fetch = new WFKafkaClient();
client_fetch->init(url);
task = client_fetch->create_kafka_task("api=produce&topic=xxx&topic=yyy", 3, kafka_callback);
KafkaConfig config;
config.set_compress_type(Kafka_Zstd);
task->set_config(std::move(config));
KafkaRecord record;
record.set_key("key1", strlen("key1"));
record.set_value(buf, sizeof(buf));
record.add_header_pair("hk1", 3, "hv1", 3);
task->add_produce_record("workflow_test1", -1, std::move(record));
...
task->start();
...
}
fetch任务支持消费者组模式和手动模式
1、手动模式
无需指定消费者组,同时需要用户指定topic、partition和offset
使用示例如下:
client = new WFKafkaClient();
client->init(url);
task = client->create_kafka_task("api=fetch", 3, kafka_callback);
KafkaToppar toppar;
toppar.set_topic_partition("workflow_test1", 0);
toppar.set_offset(0);
task->add_toppar(toppar);
2、消费者组模式
在初始化client的时候需要指定消费者组的名称
使用示例如下:
int main(int argc, char *argv[])
{
...
WFKafkaClient *client_fetch = new WFKafkaClient();
client_fetch->init(url, cgroup_name);
task = client_fetch->create_kafka_task("api=fetch&topic=xxx&topic=yyy", 3, kafka_callback);
...
task->start();
...
}
3、offset的提交
在消费者组模式下,用户消费消息后,可以在callback函数中,通过创建commit任务来自动提交消费的记录,使用示例如下:
void kafka_callback(WFKafkaTask *task)
{
...
commit_task = client.create_kafka_task("api=commit", 3, kafka_callback);
...
commit_task->start();
...
}
在消费者组模式下,client在关闭之前需要调用create_leavegroup_task创建leavegroup_task,
它会发送leavegroup协议包,如果没有启动leavegroup_task,会导致消费者组没有正确退出,触发这个组的rebalance。
消息的结果集的数据结构是KafkaResult,可以通过调用WFKafkaTask的get_result()接口获得,
然后调用KafkaResult的fetch_record接口可以将本次task相关的record取出来,它是一个KafkaRecord的二维vector,
第一维是topic partition,第二维是某个topic partition下对应的KafkaRecord,
在KafkaResult.h中可以看到KafkaResult的定义
void kafka_callback(WFKafkaTask *task)
{
int state = task->get_state();
int error = task->get_error();
// handle error states
...
protocol::KafkaResult *result = task->get_result();
result->fetch_records(records);
for (auto &v : records)
{
for (auto &w: v)
{
const void *value;
size_t value_len;
w->get_value(&value, &value_len);
printf("produce\ttopic: %s, partition: %d, status: %d, offset: %lld, val_len: %zu\n",
w->get_topic(), w->get_partition(), w->get_status(), w->get_offset(), value_len);
}
}
...
protocol::KafkaResult new_result = std::move(*task->get_result());
if (new_result.fetch_records(records))
{
for (auto &v : records)
{
if (v.empty())
continue;
for (auto &w: v)
{
if (fp)
{
const void *value;
size_t value_len;
w->get_value(&value, &value_len);
fwrite(w->get_value(), w->get_value_len(), 1, fp);
}
}
}
}
...
}
认证信息需要在配置中设置,以sasl为例:
int main(int argc, char *argv[])
{
...
client = new WFKafkaClient();
client->init(url);
task = client->create_kafka_task("api=fetch&topic=xxx&topic=yyy", 3, kafka_callback);
config.set_sasl_username("fetch");
config.set_sasl_password("fetch-secret");
config.set_sasl_mech("SCRAM-SHA-256");
task->set_config(std::move(config));
...
task->start();
...
}