diff --git a/lib/resty/kafka/client.lua b/lib/resty/kafka/client.lua index 55f34cca..67c607c7 100644 --- a/lib/resty/kafka/client.lua +++ b/lib/resty/kafka/client.lua @@ -44,9 +44,9 @@ local function _metadata_cache(self, topic) end -local function metadata_encode(client_id, topics, num) +local function metadata_encode(client_id, topics, num, api_version) local id = 0 -- hard code correlation_id - local req = request:new(request.MetadataRequest, id, client_id, request.API_VERSION_V1) + local req = request:new(request.MetadataRequest, id, client_id, api_version or request.API_VERSION_V1) req:int32(num) @@ -113,9 +113,9 @@ local function metadata_decode(resp) end -local function api_versions_encode(client_id) +local function api_versions_encode(client_id, api_version) local id = 1 -- hard code correlation_id - return request:new(request.ApiVersionsRequest, id, client_id, request.API_VERSION_V2) + return request:new(request.ApiVersionsRequest, id, client_id,api_version or request.API_VERSION_V2) end @@ -136,8 +136,8 @@ local function api_versions_decode(resp) end -local function _fetch_api_versions(broker, client_id) - local resp, err = broker:send_receive(api_versions_encode(client_id)) +local function _fetch_api_versions(broker, client_id, api_version) + local resp, err = broker:send_receive(api_versions_encode(client_id, api_version)) if not resp then return nil, err else @@ -171,7 +171,7 @@ local function _fetch_metadata(self, new_topic) local broker_list = self.broker_list local sc = self.socket_config - local req = metadata_encode(self.client_id, topics, num) + local req = metadata_encode(self.client_id, topics, num, self.client_api_version) for i = 1, #broker_list do local host, port, sasl_config = broker_list[i].host, @@ -195,7 +195,7 @@ local function _fetch_metadata(self, new_topic) self.brokers, self.topic_partitions = brokers, topic_partitions -- fetch ApiVersions for compatibility - local api_versions, err = _fetch_api_versions(bk, self.client_id) + local api_versions, err = _fetch_api_versions(bk, self.client_id, self.client_api_version) if not api_versions then ngx_log(INFO, "broker fetch api versions failed, err:", err, ", host: ", broker.host, ", port: ", broker.port) @@ -247,6 +247,7 @@ function _M.new(self, broker_list, client_config) api_versions = {}, -- support APIs version on broker client_id = "worker" .. pid(), socket_config = socket_config, + client_api_version = opts.api_version or request.API_VERSION_V0 -- support API version on client }, mt) if opts.refresh_interval then