Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于sasl认证的kafka应该如何使用 #1375

Closed
18328594608 opened this issue Sep 20, 2023 · 40 comments · Fixed by #1379
Closed

关于sasl认证的kafka应该如何使用 #1375

18328594608 opened this issue Sep 20, 2023 · 40 comments · Fixed by #1379

Comments

@18328594608
Copy link

有一个"brokers" 内容大概为"europe-west2.gcp.confluent.cloud:9092",这样的格式,init时直接使用的这个值作为url
std::string url = kafka.brokers;
_m_client.init(url);

后续在使用时代码是这样:
std::string query;
query ="api=produce";

task = _m_client.create_kafka_task(query, 3, kafka_callback);
KafkaConfig config;
KafkaRecord record;

config.set_compress_type(compress_type);
config.set_client_id("user_flow");
config.set_sasl_username(kafka.api_key.c_str());
config.set_sasl_password(kafka.api_secret.c_str());
config.set_sasl_mech("PLAIN");
task->set_config(std::move(config));

std::string buf = msg.dump();
record.set_value(buf.c_str(), buf.length());
task->add_produce_record(kafka.topic, -1, std::move(record));

这个task在start回调报错error msg: URI Parse Failed。
我的疑问是我在我使用这个broker直接作为url传输的时候需要加什么前缀么
@holmes1412
Copy link
Contributor

试一下 kafka://europe-west2.gcp.confluent.cloud:9092 呢?

@18328594608
Copy link
Author

试一下 kafka://europe-west2.gcp.confluent.cloud:9092 呢?

加上前缀了还是这个报错,对于认证的连接我这样使用是正确的么

@holmes1412
Copy link
Contributor

sasl是在KafkaConfig上设置的,可以参考下这个文档:

sasl_mechanisms | char * | NULL | sasl认证类型,目前支持plain和scram

@18328594608
Copy link
Author

sasl是在KafkaConfig上设置的,可以参考下这个文档:

sasl_mechanisms | char * | NULL | sasl认证类型,目前支持plain和scram

我在本地部署一个kafka试试能不能连上 目前从代码写法上面看没问题.

@18328594608
Copy link
Author

sasl是在KafkaConfig上设置的,可以参考下这个文档:

sasl_mechanisms | char * | NULL | sasl认证类型,目前支持plain和scram

我在本地部署一个kafka试试能不能连上 目前从代码写法上面看没问题.

更新下进展:
-在本地开了个kafka连接 "brokers": "127.0.0.19092",这样是能连上的, 但是换成云端环境的"brokers": "6ojv2.us-west4.gcp.confluent.cloud:9092",这个连接之后就报错URI Parse Failed ,url解析是不支持这种格式的broker么

@Barenboim
Copy link
Contributor

sasl是在KafkaConfig上设置的,可以参考下这个文档:

sasl_mechanisms | char * | NULL | sasl认证类型,目前支持plain和scram

我在本地部署一个kafka试试能不能连上 目前从代码写法上面看没问题.

更新下进展: -在本地开了个kafka连接 "brokers": "127.0.0.19092",这样是能连上的, 但是换成云端环境的"brokers": "6ojv2.us-west4.gcp.confluent.cloud:9092",这个连接之后就报错URI Parse Failed ,url解析是不支持这种格式的broker么

你好。麻烦贴一下代码?我看看你完整url怎么写的。

@18328594608
Copy link
Author

sasl是在KafkaConfig上设置的,可以参考下这个文档:

sasl_mechanisms | char * | NULL | sasl认证类型,目前支持plain和scram

我在本地部署一个kafka试试能不能连上 目前从代码写法上面看没问题.

更新下进展: -在本地开了个kafka连接 "brokers": "127.0.0.19092",这样是能连上的, 但是换成云端环境的"brokers": "6ojv2.us-west4.gcp.confluent.cloud:9092",这个连接之后就报错URI Parse Failed ,url解析是不支持这种格式的broker么

你好。麻烦贴一下代码?我看看你完整url怎么写的。

void KafkaProtocol::init()
{
Kafka kafka = ConfigManager::getInstance().GetKafka();
std::string url = kafka.brokers;
_m_client.init(url);
}

void KafkaProtocol::PostMessage(const Json& msg, KafkaJsonFunc func){

WFKafkaTask *task;
int compress_type = Kafka_NoCompress;
Kafka kafka = ConfigManager::getInstance().GetKafka();

std::string query;
query ="api=produce";

task = _m_client.create_kafka_task(query, 3, kafka_callback);
KafkaConfig config;
KafkaRecord record;

config.set_compress_type(compress_type);
config.set_client_id("user_flow");
config.set_sasl_mech("PLAIN");
config.set_sasl_username(kafka.api_key.c_str());
config.set_sasl_password(kafka.api_secret.c_str());


task->set_config(std::move(config));

std::string buf = msg.dump();
record.set_value(buf.c_str(), buf.length());
task->add_produce_record(kafka.topic, -1, std::move(record));

task->start();

}
完整的 "brokers": "pkc-6ojv2.us-west4.gcp.confluent.cloud:9092"
您看一下

@Barenboim
Copy link
Contributor

Barenboim commented Sep 21, 2023

可能是你的用户名密码信息里的数据没有转义,userinfo里如果有非url字符,需要escape一下。

不过,这确实是个好问题,我感觉,这里似乎应该让框架来做escape的事。我们像MySQL的,因为密码在URL里,用户需要自己escape,但这里你的密码是在client里转的,似乎client应该做这事。你先用试试自己调用一下url_encode_component能不能解决:#1310

@Barenboim
Copy link
Contributor

Barenboim commented Sep 21, 2023

或者看看这个修改能不能解决:#1377

@18328594608
Copy link
Author

你好,我怀疑是redirect的时候的URL我们认为不合法。麻烦帮我们看看这个地方的url返回的是什么: https://github.com/sogou/workflow/blob/45a55ce2f4c1ad0f5e634d133e26a33bd1d4fbf8/src/factory/KafkaTaskImpl.cc#L440C1-L440C1 把这里的url打印一下,看看是不是parser失败了。多谢!

我在函数执行的时候打印 没有任何输出,这个函数可能没有调用到.

@18328594608
Copy link
Author

或者看看这个修改能不能解决:#1377

更新了这个pr之后报错信息变成了 Connection reset by peer .这个url的云端连接是一直开着的

@Barenboim
Copy link
Contributor

我刚才试用你这个地址,不加用户名密码也是返回这个reset by peer。你用其它kafka client可以访问这个地址?

@18328594608
Copy link
Author

我刚才试用你这个地址,不加用户名密码也是返回这个reset by peer。你用其它kafka client可以访问这个地址?

是的,我换了一个broker 也是同样的报错
pkc-6ojv2.us-west4.gcp.confluent.cloud:9092 这个broker是我们另一个同事的服务正在用的kafka

@Barenboim
Copy link
Contributor

我也试一下。我可以连这个地址,而且和你报同样的错误。

@18328594608
Copy link
Author

我也试一下。我可以连这个地址,而且和你报同样的错误。

这个报错的原因是因为url不对 连不上么 我不使用sasl的方式也没法连接.

@18328594608
Copy link
Author

我也试一下。我可以连这个地址,而且和你报同样的错误。

这个报错的原因是因为url不对 连不上么 我不使用sasl的方式也没法连接.

pkc-l6wr6.europe-west2.gcp.confluent.cloud:9092 这个broker我使用libkafka 的接口可以连接成功,排除云端环境的问题

@Barenboim
Copy link
Contributor

我也试一下。我可以连这个地址,而且和你报同样的错误。

这个报错的原因是因为url不对 连不上么 我不使用sasl的方式也没法连接.

pkc-l6wr6.europe-west2.gcp.confluent.cloud:9092 这个broker我使用libkafka 的接口可以连接成功,排除云端环境的问题

好的,我这边还在调。

@Barenboim
Copy link
Contributor

我们用tcp抓包看了一下,我们发的第一个请求包没有什么问题(但不包括用户名密码),但对方直接就关闭了连接。目前不确定是不是我们的请求里没有正确的用户名密码所以对方直接关闭。你那边有没有测试账号能让我们调一下?

@18328594608
Copy link
Author

我们用tcp抓包看了一下,我们发的第一个请求包没有什么问题(但不包括用户名密码),但对方直接就关闭了连接。目前不确定是不是我们的请求里没有正确的用户名密码所以对方直接关闭。你那边有没有测试账号能让我们调一下?

好的 我把账号发给1412了。你们那边同步一下

@Barenboim
Copy link
Contributor

收到!能不能把librdkafka的示例代码也贴在这里?

@18328594608
Copy link
Author

18328594608 commented Sep 21, 2023

收到!能不能把librdkafka的示例代码也贴在这里?

//
// Created by parallels on 9/21/23.
//

#include "PostKafkaProtocol.h"
#include "Log.h"
#include "workflow/StringUtil.h"

static rd_kafka_t *rk;

static void on_logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
{
    LOG_DEBUG("RDKAFKA-{}-{}: {}: {}\n", level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
}

static void on_delivery(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
{
    if (rkmessage->err) {
        LOG_DEBUG("Message delivery failed: {}", rd_kafka_err2str(rkmessage->err));
    } else {
        LOG_DEBUG("Message delivered (topic: {}, {} bytes, partition {})",
                  rd_kafka_topic_name(rkmessage->rkt), rkmessage->len, rkmessage->partition);
    }
}

void PostKafkaProtocol::init(Kafka config)
{
    kafka_config = config;
    std::string url = kafka_config.brokers;

    char errstr[1024];
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    if (rd_kafka_conf_set(conf, "bootstrap.servers", kafka_config.brokers.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        LOG_ERROR("Set kafka brokers: {} fail: {}", kafka_config.brokers, errstr);
    }
    if (rd_kafka_conf_set(conf, "queue.buffering.max.ms", "1", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        LOG_ERROR("Set kafka brokers: {} fail: {}", kafka_config.brokers, errstr);
    }

    // ssl
    if (rd_kafka_conf_set(conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        LOG_ERROR("Set kafka protocol: {} fail: %s", kafka_config.brokers, errstr);
    }
    if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        LOG_ERROR("Set kafka mechanisms: {} fail: {}", kafka_config.brokers, errstr);
    }
    if (rd_kafka_conf_set(conf, "sasl.username", kafka_config.api_key.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        LOG_ERROR("Set kafka key: {} fail: {}", kafka_config.api_key, errstr);
    }

    std::string key = StringUtil::url_encode_component(kafka_config.api_secret);
    if (rd_kafka_conf_set(conf, "sasl.password", kafka_config.api_secret.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        LOG_ERROR("Set kafka secret: {} fail: {}", kafka_config.api_secret, errstr);
    }

    rd_kafka_conf_set_log_cb(conf, on_logger);
    rd_kafka_conf_set_dr_msg_cb(conf, on_delivery);

    rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (rk == NULL) {
        LOG_ERROR("Failed to create new producer: {}", errstr);
    }

    topic_ = rd_kafka_topic_new(rk, "users", NULL);
    if (topic_ == NULL) {
        LOG_ERROR("Failed to create topic object: {}", rd_kafka_err2str(rd_kafka_last_error()));
    }

    return;
}

int PostKafkaProtocol::fini_message(void) {
    rd_kafka_flush(rk, 1000);
    rd_kafka_topic_destroy(topic_);
    rd_kafka_destroy(rk);

    return 0;
    return 0;
}

int PostKafkaProtocol::push_message(char *message, rd_kafka_topic_t *topic) {
    LOG_INFO("push {} message: {}", rd_kafka_topic_name(topic), message);
    int ret = rd_kafka_produce(topic, 0, RD_KAFKA_MSG_F_COPY, message, strlen(message), NULL, 0, NULL);
    if (ret == -1) {
        LOG_INFO("Failed to produce: {} to topic {}: {}\n", message, rd_kafka_topic_name(topic_), rd_kafka_err2str(rd_kafka_last_error()));
        if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
           // list_add_node_tail(list, message);
            return 0;
        }
    }
    return 0;
}

void PostKafkaProtocol::PostMessage(const Json &json, PostKafkaProtocol::KafkaJsonFunc func) {

    std::string messge = json.dump();
    char* charPtr = &messge[0];
    push_message(charPtr, topic_);
}

这是整个cpp文件,你们看一下哈

@Barenboim
Copy link
Contributor

多谢多谢!
不过我们刚才试了试好像无论发什么数据对方都是直接断开。目前有点怀疑我们对IP有什么限制。不过你那边是同一个节点上,用librdkafka可以成功对吧?

@Barenboim
Copy link
Contributor

你们不会是走的SSL连接吧?

@Barenboim
Copy link
Contributor

好像确实是SSL连接的原因!目前我们的kafka没有支持SSL。我们看看是不是可以简单的支持上。
你的这个测试号先不要删,让我们用一下。

@Barenboim
Copy link
Contributor

Barenboim commented Sep 21, 2023

确实是SSL的原因。我们把这行:

type = TT_TCP;

直接改成TT_TCP_SSL已经可以和你那个kafka broker通信了。我们支持一下。

@18328594608
Copy link
Author

确实是SSL的原因。我们把这行:

type = TT_TCP;

直接改成TT_TCP_SSL已经可以和你那个kafka broker通信了。我们支持一下。

好的 感谢支持!

@Barenboim Barenboim linked a pull request Sep 22, 2023 that will close this issue
@Barenboim Barenboim reopened this Sep 25, 2023
@Barenboim
Copy link
Contributor

Barenboim commented Sep 25, 2023

你好,麻烦再试一下我们目前的master分支是否还有问题。我们用你提供的账号实测了一下,可以跑了。
由于你的这个kafka broker需要用SSL通信,所以URL必须写成:kafkas://pkc-l6wr6.europe-west2.gcp.confluent.cloud:9092/
端口不可以缺省,因为理论上kafka SSL的默认端口是9093。

@Barenboim
Copy link
Contributor

Barenboim commented Sep 25, 2023

顺便还发现了OpenSSL 1.1.1及以下的一个bug。你看一下这个bug列表的第一条:
https://github.com/sogou/workflow/blob/master/docs/bugs.md

访问你提供的这个kafka broker,有的时候在SSL连接时就会被对方中断。如果有Open SSL 1.1.1及以下,会出现一个系统错误但错误码为0。这个bug在OpenSSL 3.0里被修复,可以得到一个SSL错误。相关讨论:openssl/openssl#12416 (comment)

@18328594608
Copy link
Author

有的时候在SSL连接时就会被对方中断。如果有Open SSL 1.1.1及以下,会出现一个系统错误但错误码为0。这个bug在OpenSSL 3.0里被修复,可以得到一个SSL错误。相关讨论

好的 感谢支持,我明天试一试.

@18328594608
Copy link
Author

有的时候在SSL连接时就会被对方中断。如果有Open SSL 1.1.1及以下,会出现一个系统错误但错误码为0。这个bug在OpenSSL 3.0里被修复,可以得到一个SSL错误。相关讨论

好的 感谢支持,我明天试一试.

使用master分支之后可以接通,感谢提供支持.

@Barenboim
Copy link
Contributor

哈哈哈。多谢协助我们实现了这个重要功能!

看了一下你原来的代码,还可以优化一下,config设置在client上就可以了,不用每个任务都设置一遍用户名密码,除非每个任务使用不一样的信息。

@18328594608
Copy link
Author

看了一下你原来的代码,还可以优化一下,config设置在client上就可以了,不用每个任务都设置一遍用户名密码,除非每个任务使用不一样的信息。

哈哈哈感谢建议, 另外我想问一下,如果一个SSL连接被断开之后 client怎么触发重连呢

@Barenboim
Copy link
Contributor

这个内部解决的。你直接重新产生请求就可以了。或者加上retry?

@18328594608
Copy link
Author

这个内部解决的。你直接重新产生请求就可以了。或者加上retry?

好的,感谢解答,我再加上一个retry.

@Barenboim
Copy link
Contributor

这个内部解决的。你直接重新产生请求就可以了。或者加上retry?

好的,感谢解答,我再加上一个retry.

试了一下,retry不行。我再改一下!

@Barenboim
Copy link
Contributor

@18328594608 麻烦帮我们试试这个修改:#1381

@18328594608
Copy link
Author

@18328594608 麻烦帮我们试试这个修改:#1381
这个修改主要影响到什么地方,需要测试什么场景?

@Barenboim
Copy link
Contributor

Barenboim commented Sep 27, 2023 via email

@18328594608
Copy link
Author

噢噢,就是让retry的功能生效了。 之前的代码问题是,如果出现ssl连接失败等问题,直接返回错误。现在配置了retry_max之后,会自动重试若干次了。

---原始邮件--- 发件人: @.> 发送时间: 2023年9月27日(周三) 上午9:55 收件人: @.>; 抄送: @.>;"State @.>; 主题: Re: [sogou/workflow] 关于sasl认证的kafka应该如何使用 (Issue #1375) @18328594608 麻烦帮我们试试这个修改:#1381 这个修改主要影响到什么地方,需要测试什么场景? — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you modified the open/close state.Message ID: @.***>
好的,我看下这个功能是否生效了.

@18328594608
Copy link
Author

后续断连后触发了retry 目前看起来使用正常.感谢跟进.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants