-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
关于sasl认证的kafka应该如何使用 #1375
Comments
试一下 kafka://europe-west2.gcp.confluent.cloud:9092 呢? |
加上前缀了还是这个报错,对于认证的连接我这样使用是正确的么 |
sasl是在KafkaConfig上设置的,可以参考下这个文档: workflow/docs/tutorial-13-kafka_cli.md Line 108 in 45a55ce
|
我在本地部署一个kafka试试能不能连上 目前从代码写法上面看没问题. |
更新下进展: |
你好。麻烦贴一下代码?我看看你完整url怎么写的。 |
void KafkaProtocol::init() void KafkaProtocol::PostMessage(const Json& msg, KafkaJsonFunc func){
} |
可能是你的用户名密码信息里的数据没有转义,userinfo里如果有非url字符,需要escape一下。 不过,这确实是个好问题,我感觉,这里似乎应该让框架来做escape的事。我们像MySQL的,因为密码在URL里,用户需要自己escape,但这里你的密码是在client里转的,似乎client应该做这事。你先用试试自己调用一下url_encode_component能不能解决:#1310 |
或者看看这个修改能不能解决:#1377 |
我在函数执行的时候打印 没有任何输出,这个函数可能没有调用到. |
更新了这个pr之后报错信息变成了 Connection reset by peer .这个url的云端连接是一直开着的 |
我刚才试用你这个地址,不加用户名密码也是返回这个reset by peer。你用其它kafka client可以访问这个地址? |
是的,我换了一个broker 也是同样的报错 |
我也试一下。我可以连这个地址,而且和你报同样的错误。 |
这个报错的原因是因为url不对 连不上么 我不使用sasl的方式也没法连接. |
pkc-l6wr6.europe-west2.gcp.confluent.cloud:9092 这个broker我使用libkafka 的接口可以连接成功,排除云端环境的问题 |
好的,我这边还在调。 |
我们用tcp抓包看了一下,我们发的第一个请求包没有什么问题(但不包括用户名密码),但对方直接就关闭了连接。目前不确定是不是我们的请求里没有正确的用户名密码所以对方直接关闭。你那边有没有测试账号能让我们调一下? |
好的 我把账号发给1412了。你们那边同步一下 |
收到!能不能把librdkafka的示例代码也贴在这里? |
//
// Created by parallels on 9/21/23.
//
#include "PostKafkaProtocol.h"
#include "Log.h"
#include "workflow/StringUtil.h"
static rd_kafka_t *rk;
static void on_logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
{
LOG_DEBUG("RDKAFKA-{}-{}: {}: {}\n", level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
}
static void on_delivery(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
{
if (rkmessage->err) {
LOG_DEBUG("Message delivery failed: {}", rd_kafka_err2str(rkmessage->err));
} else {
LOG_DEBUG("Message delivered (topic: {}, {} bytes, partition {})",
rd_kafka_topic_name(rkmessage->rkt), rkmessage->len, rkmessage->partition);
}
}
void PostKafkaProtocol::init(Kafka config)
{
kafka_config = config;
std::string url = kafka_config.brokers;
char errstr[1024];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (rd_kafka_conf_set(conf, "bootstrap.servers", kafka_config.brokers.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
LOG_ERROR("Set kafka brokers: {} fail: {}", kafka_config.brokers, errstr);
}
if (rd_kafka_conf_set(conf, "queue.buffering.max.ms", "1", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
LOG_ERROR("Set kafka brokers: {} fail: {}", kafka_config.brokers, errstr);
}
// ssl
if (rd_kafka_conf_set(conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
LOG_ERROR("Set kafka protocol: {} fail: %s", kafka_config.brokers, errstr);
}
if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
LOG_ERROR("Set kafka mechanisms: {} fail: {}", kafka_config.brokers, errstr);
}
if (rd_kafka_conf_set(conf, "sasl.username", kafka_config.api_key.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
LOG_ERROR("Set kafka key: {} fail: {}", kafka_config.api_key, errstr);
}
std::string key = StringUtil::url_encode_component(kafka_config.api_secret);
if (rd_kafka_conf_set(conf, "sasl.password", kafka_config.api_secret.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
LOG_ERROR("Set kafka secret: {} fail: {}", kafka_config.api_secret, errstr);
}
rd_kafka_conf_set_log_cb(conf, on_logger);
rd_kafka_conf_set_dr_msg_cb(conf, on_delivery);
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (rk == NULL) {
LOG_ERROR("Failed to create new producer: {}", errstr);
}
topic_ = rd_kafka_topic_new(rk, "users", NULL);
if (topic_ == NULL) {
LOG_ERROR("Failed to create topic object: {}", rd_kafka_err2str(rd_kafka_last_error()));
}
return;
}
int PostKafkaProtocol::fini_message(void) {
rd_kafka_flush(rk, 1000);
rd_kafka_topic_destroy(topic_);
rd_kafka_destroy(rk);
return 0;
return 0;
}
int PostKafkaProtocol::push_message(char *message, rd_kafka_topic_t *topic) {
LOG_INFO("push {} message: {}", rd_kafka_topic_name(topic), message);
int ret = rd_kafka_produce(topic, 0, RD_KAFKA_MSG_F_COPY, message, strlen(message), NULL, 0, NULL);
if (ret == -1) {
LOG_INFO("Failed to produce: {} to topic {}: {}\n", message, rd_kafka_topic_name(topic_), rd_kafka_err2str(rd_kafka_last_error()));
if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
// list_add_node_tail(list, message);
return 0;
}
}
return 0;
}
void PostKafkaProtocol::PostMessage(const Json &json, PostKafkaProtocol::KafkaJsonFunc func) {
std::string messge = json.dump();
char* charPtr = &messge[0];
push_message(charPtr, topic_);
} 这是整个cpp文件,你们看一下哈 |
多谢多谢! |
你们不会是走的SSL连接吧? |
好像确实是SSL连接的原因!目前我们的kafka没有支持SSL。我们看看是不是可以简单的支持上。 |
确实是SSL的原因。我们把这行: workflow/src/factory/KafkaTaskImpl.cc Line 317 in 3503a4f
直接改成TT_TCP_SSL已经可以和你那个kafka broker通信了。我们支持一下。 |
好的 感谢支持! |
你好,麻烦再试一下我们目前的master分支是否还有问题。我们用你提供的账号实测了一下,可以跑了。 |
顺便还发现了OpenSSL 1.1.1及以下的一个bug。你看一下这个bug列表的第一条: 访问你提供的这个kafka broker,有的时候在SSL连接时就会被对方中断。如果有Open SSL 1.1.1及以下,会出现一个系统错误但错误码为0。这个bug在OpenSSL 3.0里被修复,可以得到一个SSL错误。相关讨论:openssl/openssl#12416 (comment) |
好的 感谢支持,我明天试一试. |
使用master分支之后可以接通,感谢提供支持. |
哈哈哈。多谢协助我们实现了这个重要功能! 看了一下你原来的代码,还可以优化一下,config设置在client上就可以了,不用每个任务都设置一遍用户名密码,除非每个任务使用不一样的信息。 |
哈哈哈感谢建议, 另外我想问一下,如果一个SSL连接被断开之后 client怎么触发重连呢 |
这个内部解决的。你直接重新产生请求就可以了。或者加上retry? |
好的,感谢解答,我再加上一个retry. |
试了一下,retry不行。我再改一下! |
@18328594608 麻烦帮我们试试这个修改:#1381 |
|
噢噢,就是让retry的功能生效了。
之前的代码问题是,如果出现ssl连接失败等问题,直接返回错误。现在配置了retry_max之后,会自动重试若干次了。
…---原始邮件---
发件人: ***@***.***>
发送时间: 2023年9月27日(周三) 上午9:55
收件人: ***@***.***>;
抄送: ***@***.***>;"State ***@***.***>;
主题: Re: [sogou/workflow] 关于sasl认证的kafka应该如何使用 (Issue #1375)
@18328594608 麻烦帮我们试试这个修改:#1381
这个修改主要影响到什么地方,需要测试什么场景?
—
Reply to this email directly, view it on GitHub, or unsubscribe.
You are receiving this because you modified the open/close state.Message ID: ***@***.***>
|
|
后续断连后触发了retry 目前看起来使用正常.感谢跟进. |
有一个"brokers" 内容大概为"europe-west2.gcp.confluent.cloud:9092",这样的格式,init时直接使用的这个值作为url
std::string url = kafka.brokers;
_m_client.init(url);
后续在使用时代码是这样:
std::string query;
query ="api=produce";
The text was updated successfully, but these errors were encountered: