diff --git a/proxy/processor_default.go b/proxy/processor_default.go index fc243514..3277b6cd 100644 --- a/proxy/processor_default.go +++ b/proxy/processor_default.go @@ -158,7 +158,9 @@ func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.Requ if err != nil { return false, nil, err } - + + // Reminder: When adding support for new versions of the produce request, also update proxy/protocol/responses.go + // Change 'apiKeyProduceMaxVersion' when adding new version support case 3, 4, 5, 6, 7, 8, 9, 10, 11: // CorrelationID + ClientID if err = acksReader.ReadAndDiscardHeaderV1Part(reader); err != nil { diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index e062176a..c6802c3a 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -3,18 +3,29 @@ package protocol import ( "errors" "fmt" + "math" "github.com/grepplabs/kafka-proxy/config" ) const ( + apiKeyProduce = 0 apiKeyMetadata = 3 apiKeyFindCoordinator = 10 + apiKeyApiVersions = 18 + + // intercept and update ApiVersions response to prevent requests/responses that can't be parsed by Kafka-Proxy + apiKeyApiVersionsMaxVersion = 4 + apiKeyMetadataMaxVersion = 13 + apiKeyFindCoordinatorMaxVersion = 6 + // produce requests are parsed by proxy/processor_default.go mustReply() + apiKeyProduceMaxVersion = 11 brokersKeyName = "brokers" hostKeyName = "host" portKeyName = "port" nodeKeyName = "node_id" + apiKeysKeyname = "api_keys" coordinatorKeyName = "coordinator" coordinatorsKeyName = "coordinators" @@ -23,6 +34,7 @@ const ( var ( metadataResponseSchemaVersions = createMetadataResponseSchemaVersions() findCoordinatorResponseSchemaVersions = createFindCoordinatorResponseSchemaVersions() + apiVersionsResponseSchemaVersions = createApiVersionsResponseSchemaVersions() ) func createMetadataResponseSchemaVersions() []Schema { @@ -244,7 +256,34 @@ func createMetadataResponseSchemaVersions() []Schema { &SchemaTaggedFields{Name: "response_tagged_fields"}, ) - return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7, metadataResponseV8, metadataResponseV9, metadataResponseV10, metadataResponseV11, metadataResponseV12} + metadataResponseV13 := NewSchema("metadata_response_v13", + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, + &CompactArray{Name: brokersKeyName, Ty: metadataBrokerSchema9}, + &Mfield{Name: "cluster_id", Ty: TypeCompactNullableStr}, + &Mfield{Name: "controller_id", Ty: TypeInt32}, + &CompactArray{Name: "topic_metadata", Ty: topicMetadataSchema12}, + &Mfield{Name: "error_code", Ty: TypeInt16}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, + ) + + // Reminder: When adding support for new versions of the metadataResponse, also update proxy/protocol/responses.go + // Change 'apiKeyMetadataMaxVersion' when adding new version support + return []Schema{ + metadataResponseV0, + metadataResponseV1, + metadataResponseV2, + metadataResponseV3, + metadataResponseV4, + metadataResponseV5, + metadataResponseV6, + metadataResponseV7, + metadataResponseV8, + metadataResponseV9, + metadataResponseV10, + metadataResponseV11, + metadataResponseV12, + metadataResponseV13, + } } func createFindCoordinatorResponseSchemaVersions() []Schema { @@ -297,9 +336,119 @@ func createFindCoordinatorResponseSchemaVersions() []Schema { findCoordinatorResponseV5 := findCoordinatorResponseV4 findCoordinatorResponseV6 := findCoordinatorResponseV5 + // Reminder: When adding support for new versions of the findCoordinatorResponse, also update proxy/protocol/responses.go + // Change 'apiKeyFindCoordinatorMaxVersion' when adding new version support return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4, findCoordinatorResponseV5, findCoordinatorResponseV6} } +func createApiVersionsResponseSchemaVersions() []Schema { + apiVersionKeyV0 := NewSchema("api_versions_key_v0", + &Mfield{Name: "api_key", Ty: TypeInt16}, + &Mfield{Name: "min_version", Ty: TypeInt16}, + &Mfield{Name: "max_version", Ty: TypeInt16}, + ) + + apiVersionSchemaV3 := NewSchema("api_versions_key_schema3", + &Mfield{Name: "api_key", Ty: TypeInt16}, + &Mfield{Name: "min_version", Ty: TypeInt16}, + &Mfield{Name: "max_version", Ty: TypeInt16}, + &SchemaTaggedFields{"api_versions_tagged_fields"}, + ) + + apiVersionsResponseV0 := NewSchema("api_versions_response_v0", + &Mfield{Name: "error_code", Ty: TypeInt16}, + &Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0}, + ) + + // Version 1 adds throttle time to the response. + apiVersionsResponseV1 := NewSchema("api_versions_response_v1", + &Mfield{Name: "error_code", Ty: TypeInt16}, + &Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0}, + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, + ) + + // Starting in version 2, on quota violation, brokers send out responses before throttling. + apiVersionsResponseV2 := apiVersionsResponseV1 + + // Version 3 is the first flexible version. Tagged fields are only supported in the body but + // not in the header. The length of the header must not change in order to guarantee the + // backward compatibility. + // + // Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with the supported + // versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned. + apiVersionsResponseV3 := NewSchema("api_versions_response_v3", + &Mfield{Name: "error_code", Ty: TypeInt16}, + &CompactArray{Name: apiKeysKeyname, Ty: apiVersionSchemaV3}, + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, + ) + + // Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion from being 0. + apiVersionsResponseV4 := apiVersionsResponseV3 + + // Reminder: When adding support for new versions of the findCoordinatorResponse, also update proxy/protocol/responses.go + // Change 'apiKeyApiVersionsMaxVersion' when adding new version support + return []Schema{ + apiVersionsResponseV0, + apiVersionsResponseV1, + apiVersionsResponseV2, + apiVersionsResponseV3, + apiVersionsResponseV4, + } +} + +func modifyApiVersionsResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error { + if decodedStruct == nil { + return errors.New("decoded struct must not be nil") + } + if fn == nil { + return errors.New("net address mapper must not be nil") + } + apiVersionsArray, ok := decodedStruct.Get(apiKeysKeyname).([]interface{}) + if !ok { + return errors.New("api versions not found") + } + for _, apiVersionElement := range apiVersionsArray { + apiVersion := apiVersionElement.(*Struct) + apiKey, ok := apiVersion.Get("api_key").(int16) + if !ok { + return errors.New("api_keys.api_key not found") + } + maxVersion, ok := apiVersion.Get("max_version").(int16) + if !ok { + return errors.New("api_keys.max_version not found") + } + + limitVersion := int16(math.MaxInt16) + switch apiKey { + case apiKeyProduce: + if maxVersion > apiKeyProduceMaxVersion { + limitVersion = apiKeyProduceMaxVersion + } + case apiKeyMetadata: + if maxVersion > apiKeyMetadataMaxVersion { + limitVersion = apiKeyMetadataMaxVersion + } + case apiKeyFindCoordinator: + if maxVersion > apiKeyFindCoordinatorMaxVersion { + limitVersion = apiKeyFindCoordinatorMaxVersion + } + case apiKeyApiVersions: + if maxVersion > apiKeyApiVersionsMaxVersion { + limitVersion = apiKeyApiVersionsMaxVersion + } + } + if maxVersion > limitVersion { + err := apiVersion.Replace("max_version", limitVersion) + if err != nil { + return err + } + } + } + + return nil +} + func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error { if decodedStruct == nil { return errors.New("decoded struct must not be nil") @@ -446,6 +595,8 @@ func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc conf return newResponseModifier(apiKey, apiVersion, addressMappingFunc, metadataResponseSchemaVersions, modifyMetadataResponse) case apiKeyFindCoordinator: return newResponseModifier(apiKey, apiVersion, addressMappingFunc, findCoordinatorResponseSchemaVersions, modifyFindCoordinatorResponse) + case apiKeyApiVersions: + return newResponseModifier(apiKey, apiVersion, addressMappingFunc, apiVersionsResponseSchemaVersions, modifyApiVersionsResponse) default: return nil, nil }