diff --git a/proxy/processor_default.go b/proxy/processor_default.go
index fc243514..3277b6cd 100644
--- a/proxy/processor_default.go
+++ b/proxy/processor_default.go
@@ -158,7 +158,9 @@ func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.Requ
 			if err != nil {
 				return false, nil, err
 			}
-
+		
+		// Reminder: When adding support for new versions of the produce request, also update proxy/protocol/responses.go
+		// Change 'apiKeyProduceMaxVersion' when adding new version support
 		case 3, 4, 5, 6, 7, 8, 9, 10, 11:
 			// CorrelationID + ClientID
 			if err = acksReader.ReadAndDiscardHeaderV1Part(reader); err != nil {
diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go
index e062176a..c6802c3a 100644
--- a/proxy/protocol/responses.go
+++ b/proxy/protocol/responses.go
@@ -3,18 +3,29 @@ package protocol
 import (
 	"errors"
 	"fmt"
+	"math"
 
 	"github.com/grepplabs/kafka-proxy/config"
 )
 
 const (
+	apiKeyProduce         = 0
 	apiKeyMetadata        = 3
 	apiKeyFindCoordinator = 10
+	apiKeyApiVersions     = 18
+
+	// intercept and update ApiVersions response to prevent requests/responses that can't be parsed by Kafka-Proxy
+	apiKeyApiVersionsMaxVersion     = 4
+	apiKeyMetadataMaxVersion        = 13
+	apiKeyFindCoordinatorMaxVersion = 6
+	// produce requests are parsed by proxy/processor_default.go mustReply()
+	apiKeyProduceMaxVersion = 11
 
 	brokersKeyName = "brokers"
 	hostKeyName    = "host"
 	portKeyName    = "port"
 	nodeKeyName    = "node_id"
+	apiKeysKeyname = "api_keys"
 
 	coordinatorKeyName  = "coordinator"
 	coordinatorsKeyName = "coordinators"
@@ -23,6 +34,7 @@ const (
 var (
 	metadataResponseSchemaVersions        = createMetadataResponseSchemaVersions()
 	findCoordinatorResponseSchemaVersions = createFindCoordinatorResponseSchemaVersions()
+	apiVersionsResponseSchemaVersions     = createApiVersionsResponseSchemaVersions()
 )
 
 func createMetadataResponseSchemaVersions() []Schema {
@@ -244,7 +256,34 @@ func createMetadataResponseSchemaVersions() []Schema {
 		&SchemaTaggedFields{Name: "response_tagged_fields"},
 	)
 
-	return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7, metadataResponseV8, metadataResponseV9, metadataResponseV10, metadataResponseV11, metadataResponseV12}
+	metadataResponseV13 := NewSchema("metadata_response_v13",
+		&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
+		&CompactArray{Name: brokersKeyName, Ty: metadataBrokerSchema9},
+		&Mfield{Name: "cluster_id", Ty: TypeCompactNullableStr},
+		&Mfield{Name: "controller_id", Ty: TypeInt32},
+		&CompactArray{Name: "topic_metadata", Ty: topicMetadataSchema12},
+		&Mfield{Name: "error_code", Ty: TypeInt16},
+		&SchemaTaggedFields{Name: "response_tagged_fields"},
+	)
+
+	// Reminder: When adding support for new versions of the metadataResponse, also update proxy/protocol/responses.go
+	// Change 'apiKeyMetadataMaxVersion' when adding new version support
+	return []Schema{
+		metadataResponseV0,
+		metadataResponseV1,
+		metadataResponseV2,
+		metadataResponseV3,
+		metadataResponseV4,
+		metadataResponseV5,
+		metadataResponseV6,
+		metadataResponseV7,
+		metadataResponseV8,
+		metadataResponseV9,
+		metadataResponseV10,
+		metadataResponseV11,
+		metadataResponseV12,
+		metadataResponseV13,
+	}
 }
 
 func createFindCoordinatorResponseSchemaVersions() []Schema {
@@ -297,9 +336,119 @@ func createFindCoordinatorResponseSchemaVersions() []Schema {
 	findCoordinatorResponseV5 := findCoordinatorResponseV4
 	findCoordinatorResponseV6 := findCoordinatorResponseV5
 
+	// Reminder: When adding support for new versions of the findCoordinatorResponse, also update proxy/protocol/responses.go
+	// Change 'apiKeyFindCoordinatorMaxVersion' when adding new version support
 	return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4, findCoordinatorResponseV5, findCoordinatorResponseV6}
 }
 
+func createApiVersionsResponseSchemaVersions() []Schema {
+	apiVersionKeyV0 := NewSchema("api_versions_key_v0",
+		&Mfield{Name: "api_key", Ty: TypeInt16},
+		&Mfield{Name: "min_version", Ty: TypeInt16},
+		&Mfield{Name: "max_version", Ty: TypeInt16},
+	)
+
+	apiVersionSchemaV3 := NewSchema("api_versions_key_schema3",
+		&Mfield{Name: "api_key", Ty: TypeInt16},
+		&Mfield{Name: "min_version", Ty: TypeInt16},
+		&Mfield{Name: "max_version", Ty: TypeInt16},
+		&SchemaTaggedFields{"api_versions_tagged_fields"},
+	)
+
+	apiVersionsResponseV0 := NewSchema("api_versions_response_v0",
+		&Mfield{Name: "error_code", Ty: TypeInt16},
+		&Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0},
+	)
+
+	// Version 1 adds throttle time to the response.
+	apiVersionsResponseV1 := NewSchema("api_versions_response_v1",
+		&Mfield{Name: "error_code", Ty: TypeInt16},
+		&Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0},
+		&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
+	)
+
+	// Starting in version 2, on quota violation, brokers send out responses before throttling.
+	apiVersionsResponseV2 := apiVersionsResponseV1
+
+	// Version 3 is the first flexible version. Tagged fields are only supported in the body but
+	// not in the header. The length of the header must not change in order to guarantee the
+	// backward compatibility.
+	//
+	// Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with the supported
+	// versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned.
+	apiVersionsResponseV3 := NewSchema("api_versions_response_v3",
+		&Mfield{Name: "error_code", Ty: TypeInt16},
+		&CompactArray{Name: apiKeysKeyname, Ty: apiVersionSchemaV3},
+		&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
+		&SchemaTaggedFields{Name: "response_tagged_fields"},
+	)
+
+	// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion from being 0.
+	apiVersionsResponseV4 := apiVersionsResponseV3
+
+	// Reminder: When adding support for new versions of the findCoordinatorResponse, also update proxy/protocol/responses.go
+	// Change 'apiKeyApiVersionsMaxVersion' when adding new version support
+	return []Schema{
+		apiVersionsResponseV0,
+		apiVersionsResponseV1,
+		apiVersionsResponseV2,
+		apiVersionsResponseV3,
+		apiVersionsResponseV4,
+	}
+}
+
+func modifyApiVersionsResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
+	if decodedStruct == nil {
+		return errors.New("decoded struct must not be nil")
+	}
+	if fn == nil {
+		return errors.New("net address mapper must not be nil")
+	}
+	apiVersionsArray, ok := decodedStruct.Get(apiKeysKeyname).([]interface{})
+	if !ok {
+		return errors.New("api versions not found")
+	}
+	for _, apiVersionElement := range apiVersionsArray {
+		apiVersion := apiVersionElement.(*Struct)
+		apiKey, ok := apiVersion.Get("api_key").(int16)
+		if !ok {
+			return errors.New("api_keys.api_key not found")
+		}
+		maxVersion, ok := apiVersion.Get("max_version").(int16)
+		if !ok {
+			return errors.New("api_keys.max_version not found")
+		}
+
+		limitVersion := int16(math.MaxInt16)
+		switch apiKey {
+		case apiKeyProduce:
+			if maxVersion > apiKeyProduceMaxVersion {
+				limitVersion = apiKeyProduceMaxVersion
+			}
+		case apiKeyMetadata:
+			if maxVersion > apiKeyMetadataMaxVersion {
+				limitVersion = apiKeyMetadataMaxVersion
+			}
+		case apiKeyFindCoordinator:
+			if maxVersion > apiKeyFindCoordinatorMaxVersion {
+				limitVersion = apiKeyFindCoordinatorMaxVersion
+			}
+		case apiKeyApiVersions:
+			if maxVersion > apiKeyApiVersionsMaxVersion {
+				limitVersion = apiKeyApiVersionsMaxVersion
+			}
+		}
+		if maxVersion > limitVersion {
+			err := apiVersion.Replace("max_version", limitVersion)
+			if err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}
+
 func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
 	if decodedStruct == nil {
 		return errors.New("decoded struct must not be nil")
@@ -446,6 +595,8 @@ func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc conf
 		return newResponseModifier(apiKey, apiVersion, addressMappingFunc, metadataResponseSchemaVersions, modifyMetadataResponse)
 	case apiKeyFindCoordinator:
 		return newResponseModifier(apiKey, apiVersion, addressMappingFunc, findCoordinatorResponseSchemaVersions, modifyFindCoordinatorResponse)
+	case apiKeyApiVersions:
+		return newResponseModifier(apiKey, apiVersion, addressMappingFunc, apiVersionsResponseSchemaVersions, modifyApiVersionsResponse)
 	default:
 		return nil, nil
 	}