diff --git a/internal/connector/internal/api/admin/private/api/openapi.yaml b/internal/connector/internal/api/admin/private/api/openapi.yaml index fa70f4a12..0a7ad8af6 100644 --- a/internal/connector/internal/api/admin/private/api/openapi.yaml +++ b/internal/connector/internal/api/admin/private/api/openapi.yaml @@ -453,6 +453,22 @@ paths: schema: type: string style: simple + - description: include only deployments that have channel updates + explode: true + in: query + name: channel_updates + required: false + schema: + type: boolean + style: form + - description: include only not deleted deployments belonging to a deleted connector + explode: true + in: query + name: dangling_deployments + required: false + schema: + type: boolean + style: form - description: Page index examples: page: @@ -598,6 +614,81 @@ paths: summary: Get a connector deployment tags: - Connector Clusters Admin + patch: + description: Patch a deployment + operationId: patchConnectorClusterDeploymentAdmi + parameters: + - description: The id of the connector cluster + explode: false + in: path + name: connector_cluster_id + required: true + schema: + type: string + style: simple + - description: The id of the connector deployment + explode: false + in: path + name: deployment_id + required: true + schema: + type: string + style: simple + requestBody: + content: + application/merge-patch+json: + schema: + type: object + description: Data to patch the deployment with + required: true + responses: + "202": + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectorDeploymentAdminView' + description: The deployment matching the request + "401": + content: + application/json: + examples: + "401Example": + $ref: '#/components/examples/401Example' + schema: + $ref: '#/components/schemas/Error' + description: Auth token is invalid + "404": + content: + application/json: + examples: + "404Example": + $ref: '#/components/examples/404Example' + schema: + $ref: '#/components/schemas/Error' + description: No matching resource exists + "410": + content: + application/json: + examples: + "404Example": + $ref: '#/components/examples/410Example' + schema: + $ref: '#/components/schemas/Error' + description: The requested resource doesn't exist anymore + "500": + content: + application/json: + examples: + "500Example": + $ref: '#/components/examples/500Example' + schema: + $ref: '#/components/schemas/Error' + description: Unexpected error occurred + security: + - Bearer: [] + summary: Patch a deployment + tags: + - Connector Clusters Admin /api/connector_mgmt/v1/admin/kafka_connector_namespaces: get: operationId: getConnectorNamespaces @@ -1018,6 +1109,22 @@ paths: schema: type: string style: simple + - description: include only deployments that have channel updates + explode: true + in: query + name: channel_updates + required: false + schema: + type: boolean + style: form + - description: include only not deleted deployments belonging to a deleted connector + explode: true + in: query + name: dangling_deployments + required: false + schema: + type: boolean + style: form - description: Page index examples: page: @@ -1064,42 +1171,6 @@ paths: schema: type: string style: form - - description: | - Search criteria. - - The syntax of this parameter is similar to the syntax of the `where` clause of a - SQL statement. Allowed fields in the search are `name`, `description`, `version`, `label`, and `channel`. - Allowed operators are `<>`, `=`, or `LIKE`. - Allowed conjunctive operators are `AND` and `OR`. However, you can use a maximum of 10 conjunctions in a search query. - - Examples: - - To return a Connector Type with the name `aws-sqs-source` and the channel `stable`, use the following syntax: - - ``` - name = aws-sqs-source and channel = stable - ```[p-] - - To return a Kafka instance with a name that starts with `aws`, use the following syntax: - - ``` - name like aws%25 - ``` - - If the parameter isn't provided, or if the value is empty, then all the Connector Type - that the user has permission to see are returned. - - Note. If the query is invalid, an error is returned. - examples: - search: - value: name = aws-sqs-source and channel = stable - explode: true - in: query - name: search - required: false - schema: - type: string - style: form responses: "200": content: @@ -1249,148 +1320,6 @@ paths: summary: Get a connector tags: - Connector Clusters Admin - /api/connector_mgmt/v1/admin/kafka_connector_clusters/{connector_cluster_id}/upgrades/type: - get: - operationId: getConnectorUpgradesByType - parameters: - - description: The id of the connector cluster - explode: false - in: path - name: connector_cluster_id - required: true - schema: - type: string - style: simple - - description: Page index - examples: - page: - value: "1" - in: query - name: page - required: false - schema: - type: string - - description: Number of items in each page - examples: - size: - value: "100" - in: query - name: size - required: false - schema: - type: string - responses: - "200": - content: - application/json: - schema: - $ref: '#/components/schemas/ConnectorAvailableTypeUpgradeList' - description: The connectors that have available type upgrades - "401": - content: - application/json: - examples: - "401Example": - $ref: '#/components/examples/401Example' - schema: - $ref: '#/components/schemas/Error' - description: Auth token is invalid - "404": - content: - application/json: - examples: - "404Example": - $ref: '#/components/examples/404Example' - schema: - $ref: '#/components/schemas/Error' - description: No matching connector cluster type exists - "500": - content: - application/json: - examples: - "500Example": - $ref: '#/components/examples/500Example' - schema: - $ref: '#/components/schemas/Error' - description: Unexpected error occurred - security: - - Bearer: [] - summary: Get a list of available connector type upgrades - tags: - - Connector Clusters Admin - put: - operationId: upgradeConnectorsByType - parameters: - - description: The id of the connector cluster - explode: false - in: path - name: connector_cluster_id - required: true - schema: - type: string - style: simple - - description: Page index - examples: - page: - value: "1" - in: query - name: page - required: false - schema: - type: string - - description: Number of items in each page - examples: - size: - value: "100" - in: query - name: size - required: false - schema: - type: string - requestBody: - content: - application/json: - schema: - items: - $ref: '#/components/schemas/ConnectorAvailableTypeUpgrade' - type: array - description: List of connectors to upgrade - required: true - responses: - "204": - description: Connectors are upgraded - "401": - content: - application/json: - examples: - "401Example": - $ref: '#/components/examples/401Example' - schema: - $ref: '#/components/schemas/Error' - description: Auth token is invalid - "404": - content: - application/json: - examples: - "404Example": - $ref: '#/components/examples/404Example' - schema: - $ref: '#/components/schemas/Error' - description: No matching connector cluster exists - "500": - content: - application/json: - examples: - "500Example": - $ref: '#/components/examples/500Example' - schema: - $ref: '#/components/schemas/Error' - description: Unexpected error occurred - security: - - Bearer: [] - summary: upgrade a connector cluster - tags: - - Connector Clusters Admin /api/connector_mgmt/v1/admin/kafka_connector_clusters/{connector_cluster_id}/upgrades/operator: get: operationId: getConnectorUpgradesByOperator @@ -1719,32 +1648,6 @@ components: code: CONNECTOR-MGMT-7 reason: The requested resource doesn't exist schemas: - ConnectorAvailableTypeUpgradeList: - allOf: - - $ref: '#/components/schemas/List' - - $ref: '#/components/schemas/ConnectorAvailableTypeUpgradeList_allOf' - ConnectorAvailableTypeUpgrade: - description: An available type upgrade for a connector - example: - namespace_id: namespace_id - shard_metadata: - available_id: 6 - assigned_id: 0 - connector_id: connector_id - channel: channel - connector_type_id: connector_type_id - properties: - connector_id: - type: string - namespace_id: - type: string - connector_type_id: - type: string - channel: - type: string - shard_metadata: - $ref: '#/components/schemas/ConnectorAvailableTypeUpgrade_shard_metadata' - type: object ConnectorAvailableOperatorUpgradeList: allOf: - $ref: '#/components/schemas/List' @@ -1769,7 +1672,18 @@ components: channel: type: string operator: - $ref: '#/components/schemas/ConnectorAvailableOperatorUpgrade_operator' + $ref: '#/components/schemas/ConnectorUpgradeStatus' + type: object + ConnectorUpgradeStatus: + description: Assigned and available update ids + example: + available_id: available_id + assigned_id: assigned_id + properties: + assigned_id: + type: string + available_id: + type: string type: object ConnectorNamespaceWithTenantRequest: allOf: @@ -1819,6 +1733,36 @@ components: shard_metadata: type: object type: object + ConnectorDeploymentAdminStatus: + description: The status of connector deployment + properties: + phase: + $ref: '#/components/schemas/ConnectorState' + resource_version: + format: int64 + type: integer + shard_metadata: + $ref: '#/components/schemas/ConnectorDeploymentAdminStatus_shard_metadata' + operators: + $ref: '#/components/schemas/ConnectorDeploymentAdminStatus_operators' + conditions: + items: + $ref: '#/components/schemas/MetaV1Condition' + type: array + type: object + ConnectorShardMetadata: + description: identifies a shard metadata of a connector type. + properties: + channel: + description: the channel of the shard metadata + type: string + connector_type_id: + description: the connector type id this shard metadata refers to + type: string + revision: + description: the revision of the shard metadate + format: int64 + type: integer ConnectorDeploymentAdminViewList: allOf: - $ref: '#/components/schemas/List' @@ -2057,21 +2001,6 @@ components: - provisioning - deprovisioning type: string - ConnectorDeploymentStatus: - description: The status of connector deployment - properties: - phase: - $ref: '#/components/schemas/ConnectorState' - resource_version: - format: int64 - type: integer - operators: - $ref: '#/components/schemas/ConnectorDeploymentStatus_operators' - conditions: - items: - $ref: '#/components/schemas/MetaV1Condition' - type: array - type: object ConnectorOperator: description: identifies an operator that runs on the fleet shards used to manage connectors. @@ -2112,38 +2041,12 @@ components: - json_schema - name - version - ConnectorAvailableTypeUpgradeList_allOf: - properties: - items: - items: - $ref: '#/components/schemas/ConnectorAvailableTypeUpgrade' - type: array - ConnectorAvailableTypeUpgrade_shard_metadata: - example: - available_id: 6 - assigned_id: 0 - properties: - assigned_id: - format: int64 - type: integer - available_id: - format: int64 - type: integer ConnectorAvailableOperatorUpgradeList_allOf: properties: items: items: $ref: '#/components/schemas/ConnectorAvailableOperatorUpgrade' type: array - ConnectorAvailableOperatorUpgrade_operator: - example: - available_id: available_id - assigned_id: assigned_id - properties: - assigned_id: - type: string - available_id: - type: string ConnectorNamespaceWithTenantRequest_allOf: properties: cluster_id: @@ -2184,7 +2087,20 @@ components: spec: $ref: '#/components/schemas/ConnectorDeploymentAdminSpec' status: - $ref: '#/components/schemas/ConnectorDeploymentStatus' + $ref: '#/components/schemas/ConnectorDeploymentAdminStatus' + ConnectorDeploymentAdminStatus_shard_metadata: + description: latest available revision of deployment shared metadata + properties: + assigned: + $ref: '#/components/schemas/ConnectorShardMetadata' + available: + $ref: '#/components/schemas/ConnectorShardMetadata' + ConnectorDeploymentAdminStatus_operators: + properties: + assigned: + $ref: '#/components/schemas/ConnectorOperator' + available: + $ref: '#/components/schemas/ConnectorOperator' ConnectorDeploymentAdminViewList_allOf: properties: items: @@ -2268,12 +2184,6 @@ components: $ref: '#/components/schemas/ConnectorState' error: type: string - ConnectorDeploymentStatus_operators: - properties: - assigned: - $ref: '#/components/schemas/ConnectorOperator' - available: - $ref: '#/components/schemas/ConnectorOperator' ConnectorType_allOf: properties: name: diff --git a/internal/connector/internal/api/admin/private/api_connector_clusters_admin.go b/internal/connector/internal/api/admin/private/api_connector_clusters_admin.go index e0b4f3554..b6af86d4f 100644 --- a/internal/connector/internal/api/admin/private/api_connector_clusters_admin.go +++ b/internal/connector/internal/api/admin/private/api_connector_clusters_admin.go @@ -381,9 +381,11 @@ func (a *ConnectorClustersAdminApiService) GetClusterConnectors(ctx _context.Con // GetClusterDeploymentsOpts Optional parameters for the method 'GetClusterDeployments' type GetClusterDeploymentsOpts struct { - Page optional.String - Size optional.String - OrderBy optional.String + ChannelUpdates optional.Bool + DanglingDeployments optional.Bool + Page optional.String + Size optional.String + OrderBy optional.String } /* @@ -391,6 +393,8 @@ GetClusterDeployments Get a list of available deployments in a cluster * @param ctx _context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background(). * @param connectorClusterId The id of the cluster * @param optional nil or *GetClusterDeploymentsOpts - Optional Parameters: + * @param "ChannelUpdates" (optional.Bool) - include only deployments that have channel updates + * @param "DanglingDeployments" (optional.Bool) - include only not deleted deployments belonging to a deleted connector * @param "Page" (optional.String) - Page index * @param "Size" (optional.String) - Number of items in each page * @param "OrderBy" (optional.String) - Specifies the order by criteria. The syntax of this parameter is similar to the syntax of the `order by` clause of an SQL statement. Each query can be ordered by any of the `ConnectorType` fields. For example, to return all Connector types ordered by their name, use the following syntax: ```sql name asc ``` To return all Connector types ordered by their name _and_ version, use the following syntax: ```sql name asc, version asc ``` If the parameter isn't provided, or if the value is empty, then the results are ordered by name. @@ -414,6 +418,12 @@ func (a *ConnectorClustersAdminApiService) GetClusterDeployments(ctx _context.Co localVarQueryParams := _neturl.Values{} localVarFormParams := _neturl.Values{} + if localVarOptionals != nil && localVarOptionals.ChannelUpdates.IsSet() { + localVarQueryParams.Add("channel_updates", parameterToString(localVarOptionals.ChannelUpdates.Value(), "")) + } + if localVarOptionals != nil && localVarOptionals.DanglingDeployments.IsSet() { + localVarQueryParams.Add("dangling_deployments", parameterToString(localVarOptionals.DanglingDeployments.Value(), "")) + } if localVarOptionals != nil && localVarOptionals.Page.IsSet() { localVarQueryParams.Add("page", parameterToString(localVarOptionals.Page.Value(), "")) } @@ -1177,127 +1187,6 @@ func (a *ConnectorClustersAdminApiService) GetConnectorUpgradesByOperator(ctx _c return localVarReturnValue, localVarHTTPResponse, nil } -// GetConnectorUpgradesByTypeOpts Optional parameters for the method 'GetConnectorUpgradesByType' -type GetConnectorUpgradesByTypeOpts struct { - Page optional.String - Size optional.String -} - -/* -GetConnectorUpgradesByType Get a list of available connector type upgrades - * @param ctx _context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background(). - * @param connectorClusterId The id of the connector cluster - * @param optional nil or *GetConnectorUpgradesByTypeOpts - Optional Parameters: - * @param "Page" (optional.String) - Page index - * @param "Size" (optional.String) - Number of items in each page -@return ConnectorAvailableTypeUpgradeList -*/ -func (a *ConnectorClustersAdminApiService) GetConnectorUpgradesByType(ctx _context.Context, connectorClusterId string, localVarOptionals *GetConnectorUpgradesByTypeOpts) (ConnectorAvailableTypeUpgradeList, *_nethttp.Response, error) { - var ( - localVarHTTPMethod = _nethttp.MethodGet - localVarPostBody interface{} - localVarFormFileName string - localVarFileName string - localVarFileBytes []byte - localVarReturnValue ConnectorAvailableTypeUpgradeList - ) - - // create path and map variables - localVarPath := a.client.cfg.BasePath + "/api/connector_mgmt/v1/admin/kafka_connector_clusters/{connector_cluster_id}/upgrades/type" - localVarPath = strings.Replace(localVarPath, "{"+"connector_cluster_id"+"}", _neturl.QueryEscape(parameterToString(connectorClusterId, "")), -1) - - localVarHeaderParams := make(map[string]string) - localVarQueryParams := _neturl.Values{} - localVarFormParams := _neturl.Values{} - - if localVarOptionals != nil && localVarOptionals.Page.IsSet() { - localVarQueryParams.Add("page", parameterToString(localVarOptionals.Page.Value(), "")) - } - if localVarOptionals != nil && localVarOptionals.Size.IsSet() { - localVarQueryParams.Add("size", parameterToString(localVarOptionals.Size.Value(), "")) - } - // to determine the Content-Type header - localVarHTTPContentTypes := []string{} - - // set Content-Type header - localVarHTTPContentType := selectHeaderContentType(localVarHTTPContentTypes) - if localVarHTTPContentType != "" { - localVarHeaderParams["Content-Type"] = localVarHTTPContentType - } - - // to determine the Accept header - localVarHTTPHeaderAccepts := []string{"application/json"} - - // set Accept header - localVarHTTPHeaderAccept := selectHeaderAccept(localVarHTTPHeaderAccepts) - if localVarHTTPHeaderAccept != "" { - localVarHeaderParams["Accept"] = localVarHTTPHeaderAccept - } - r, err := a.client.prepareRequest(ctx, localVarPath, localVarHTTPMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, localVarFormFileName, localVarFileName, localVarFileBytes) - if err != nil { - return localVarReturnValue, nil, err - } - - localVarHTTPResponse, err := a.client.callAPI(r) - if err != nil || localVarHTTPResponse == nil { - return localVarReturnValue, localVarHTTPResponse, err - } - - localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - localVarHTTPResponse.Body.Close() - if err != nil { - return localVarReturnValue, localVarHTTPResponse, err - } - - if localVarHTTPResponse.StatusCode >= 300 { - newErr := GenericOpenAPIError{ - body: localVarBody, - error: localVarHTTPResponse.Status, - } - if localVarHTTPResponse.StatusCode == 401 { - var v Error - err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) - if err != nil { - newErr.error = err.Error() - return localVarReturnValue, localVarHTTPResponse, newErr - } - newErr.model = v - return localVarReturnValue, localVarHTTPResponse, newErr - } - if localVarHTTPResponse.StatusCode == 404 { - var v Error - err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) - if err != nil { - newErr.error = err.Error() - return localVarReturnValue, localVarHTTPResponse, newErr - } - newErr.model = v - return localVarReturnValue, localVarHTTPResponse, newErr - } - if localVarHTTPResponse.StatusCode == 500 { - var v Error - err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) - if err != nil { - newErr.error = err.Error() - return localVarReturnValue, localVarHTTPResponse, newErr - } - newErr.model = v - } - return localVarReturnValue, localVarHTTPResponse, newErr - } - - err = a.client.decode(&localVarReturnValue, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) - if err != nil { - newErr := GenericOpenAPIError{ - body: localVarBody, - error: err.Error(), - } - return localVarReturnValue, localVarHTTPResponse, newErr - } - - return localVarReturnValue, localVarHTTPResponse, nil -} - // GetNamespaceConnectorsOpts Optional parameters for the method 'GetNamespaceConnectors' type GetNamespaceConnectorsOpts struct { Page optional.String @@ -1431,10 +1320,11 @@ func (a *ConnectorClustersAdminApiService) GetNamespaceConnectors(ctx _context.C // GetNamespaceDeploymentsOpts Optional parameters for the method 'GetNamespaceDeployments' type GetNamespaceDeploymentsOpts struct { - Page optional.String - Size optional.String - OrderBy optional.String - Search optional.String + ChannelUpdates optional.Bool + DanglingDeployments optional.Bool + Page optional.String + Size optional.String + OrderBy optional.String } /* @@ -1442,10 +1332,11 @@ GetNamespaceDeployments Get a list of available deployments in a namespace * @param ctx _context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background(). * @param namespaceId The id of the namespace * @param optional nil or *GetNamespaceDeploymentsOpts - Optional Parameters: + * @param "ChannelUpdates" (optional.Bool) - include only deployments that have channel updates + * @param "DanglingDeployments" (optional.Bool) - include only not deleted deployments belonging to a deleted connector * @param "Page" (optional.String) - Page index * @param "Size" (optional.String) - Number of items in each page * @param "OrderBy" (optional.String) - Specifies the order by criteria. The syntax of this parameter is similar to the syntax of the `order by` clause of an SQL statement. Each query can be ordered by any of the `ConnectorType` fields. For example, to return all Connector types ordered by their name, use the following syntax: ```sql name asc ``` To return all Connector types ordered by their name _and_ version, use the following syntax: ```sql name asc, version asc ``` If the parameter isn't provided, or if the value is empty, then the results are ordered by name. - * @param "Search" (optional.String) - Search criteria. The syntax of this parameter is similar to the syntax of the `where` clause of a SQL statement. Allowed fields in the search are `name`, `description`, `version`, `label`, and `channel`. Allowed operators are `<>`, `=`, or `LIKE`. Allowed conjunctive operators are `AND` and `OR`. However, you can use a maximum of 10 conjunctions in a search query. Examples: To return a Connector Type with the name `aws-sqs-source` and the channel `stable`, use the following syntax: ``` name = aws-sqs-source and channel = stable ```[p-] To return a Kafka instance with a name that starts with `aws`, use the following syntax: ``` name like aws%25 ``` If the parameter isn't provided, or if the value is empty, then all the Connector Type that the user has permission to see are returned. Note. If the query is invalid, an error is returned. @return ConnectorDeploymentAdminViewList */ func (a *ConnectorClustersAdminApiService) GetNamespaceDeployments(ctx _context.Context, namespaceId string, localVarOptionals *GetNamespaceDeploymentsOpts) (ConnectorDeploymentAdminViewList, *_nethttp.Response, error) { @@ -1466,6 +1357,12 @@ func (a *ConnectorClustersAdminApiService) GetNamespaceDeployments(ctx _context. localVarQueryParams := _neturl.Values{} localVarFormParams := _neturl.Values{} + if localVarOptionals != nil && localVarOptionals.ChannelUpdates.IsSet() { + localVarQueryParams.Add("channel_updates", parameterToString(localVarOptionals.ChannelUpdates.Value(), "")) + } + if localVarOptionals != nil && localVarOptionals.DanglingDeployments.IsSet() { + localVarQueryParams.Add("dangling_deployments", parameterToString(localVarOptionals.DanglingDeployments.Value(), "")) + } if localVarOptionals != nil && localVarOptionals.Page.IsSet() { localVarQueryParams.Add("page", parameterToString(localVarOptionals.Page.Value(), "")) } @@ -1475,9 +1372,6 @@ func (a *ConnectorClustersAdminApiService) GetNamespaceDeployments(ctx _context. if localVarOptionals != nil && localVarOptionals.OrderBy.IsSet() { localVarQueryParams.Add("orderBy", parameterToString(localVarOptionals.OrderBy.Value(), "")) } - if localVarOptionals != nil && localVarOptionals.Search.IsSet() { - localVarQueryParams.Add("search", parameterToString(localVarOptionals.Search.Value(), "")) - } // to determine the Content-Type header localVarHTTPContentTypes := []string{} @@ -1678,46 +1572,37 @@ func (a *ConnectorClustersAdminApiService) ListConnectorClusters(ctx _context.Co return localVarReturnValue, localVarHTTPResponse, nil } -// UpgradeConnectorsByOperatorOpts Optional parameters for the method 'UpgradeConnectorsByOperator' -type UpgradeConnectorsByOperatorOpts struct { - Page optional.String - Size optional.String -} - /* -UpgradeConnectorsByOperator upgrade a connector cluster +PatchConnectorClusterDeploymentAdmi Patch a deployment +Patch a deployment * @param ctx _context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background(). * @param connectorClusterId The id of the connector cluster - * @param connectorAvailableOperatorUpgrade List of connectors to upgrade - * @param optional nil or *UpgradeConnectorsByOperatorOpts - Optional Parameters: - * @param "Page" (optional.String) - Page index - * @param "Size" (optional.String) - Number of items in each page + * @param deploymentId The id of the connector deployment + * @param body Data to patch the deployment with +@return ConnectorDeploymentAdminView */ -func (a *ConnectorClustersAdminApiService) UpgradeConnectorsByOperator(ctx _context.Context, connectorClusterId string, connectorAvailableOperatorUpgrade []ConnectorAvailableOperatorUpgrade, localVarOptionals *UpgradeConnectorsByOperatorOpts) (*_nethttp.Response, error) { +func (a *ConnectorClustersAdminApiService) PatchConnectorClusterDeploymentAdmi(ctx _context.Context, connectorClusterId string, deploymentId string, body map[string]interface{}) (ConnectorDeploymentAdminView, *_nethttp.Response, error) { var ( - localVarHTTPMethod = _nethttp.MethodPut + localVarHTTPMethod = _nethttp.MethodPatch localVarPostBody interface{} localVarFormFileName string localVarFileName string localVarFileBytes []byte + localVarReturnValue ConnectorDeploymentAdminView ) // create path and map variables - localVarPath := a.client.cfg.BasePath + "/api/connector_mgmt/v1/admin/kafka_connector_clusters/{connector_cluster_id}/upgrades/operator" + localVarPath := a.client.cfg.BasePath + "/api/connector_mgmt/v1/admin/kafka_connector_clusters/{connector_cluster_id}/deployments/{deployment_id}" localVarPath = strings.Replace(localVarPath, "{"+"connector_cluster_id"+"}", _neturl.QueryEscape(parameterToString(connectorClusterId, "")), -1) + localVarPath = strings.Replace(localVarPath, "{"+"deployment_id"+"}", _neturl.QueryEscape(parameterToString(deploymentId, "")), -1) + localVarHeaderParams := make(map[string]string) localVarQueryParams := _neturl.Values{} localVarFormParams := _neturl.Values{} - if localVarOptionals != nil && localVarOptionals.Page.IsSet() { - localVarQueryParams.Add("page", parameterToString(localVarOptionals.Page.Value(), "")) - } - if localVarOptionals != nil && localVarOptionals.Size.IsSet() { - localVarQueryParams.Add("size", parameterToString(localVarOptionals.Size.Value(), "")) - } // to determine the Content-Type header - localVarHTTPContentTypes := []string{"application/json"} + localVarHTTPContentTypes := []string{"application/merge-patch+json"} // set Content-Type header localVarHTTPContentType := selectHeaderContentType(localVarHTTPContentTypes) @@ -1734,21 +1619,21 @@ func (a *ConnectorClustersAdminApiService) UpgradeConnectorsByOperator(ctx _cont localVarHeaderParams["Accept"] = localVarHTTPHeaderAccept } // body params - localVarPostBody = &connectorAvailableOperatorUpgrade + localVarPostBody = &body r, err := a.client.prepareRequest(ctx, localVarPath, localVarHTTPMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, localVarFormFileName, localVarFileName, localVarFileBytes) if err != nil { - return nil, err + return localVarReturnValue, nil, err } localVarHTTPResponse, err := a.client.callAPI(r) if err != nil || localVarHTTPResponse == nil { - return localVarHTTPResponse, err + return localVarReturnValue, localVarHTTPResponse, err } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) localVarHTTPResponse.Body.Close() if err != nil { - return localVarHTTPResponse, err + return localVarReturnValue, localVarHTTPResponse, err } if localVarHTTPResponse.StatusCode >= 300 { @@ -1761,52 +1646,71 @@ func (a *ConnectorClustersAdminApiService) UpgradeConnectorsByOperator(ctx _cont err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) if err != nil { newErr.error = err.Error() - return localVarHTTPResponse, newErr + return localVarReturnValue, localVarHTTPResponse, newErr } newErr.model = v - return localVarHTTPResponse, newErr + return localVarReturnValue, localVarHTTPResponse, newErr } if localVarHTTPResponse.StatusCode == 404 { var v Error err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) if err != nil { newErr.error = err.Error() - return localVarHTTPResponse, newErr + return localVarReturnValue, localVarHTTPResponse, newErr } newErr.model = v - return localVarHTTPResponse, newErr + return localVarReturnValue, localVarHTTPResponse, newErr + } + if localVarHTTPResponse.StatusCode == 410 { + var v Error + err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) + if err != nil { + newErr.error = err.Error() + return localVarReturnValue, localVarHTTPResponse, newErr + } + newErr.model = v + return localVarReturnValue, localVarHTTPResponse, newErr } if localVarHTTPResponse.StatusCode == 500 { var v Error err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) if err != nil { newErr.error = err.Error() - return localVarHTTPResponse, newErr + return localVarReturnValue, localVarHTTPResponse, newErr } newErr.model = v } - return localVarHTTPResponse, newErr + return localVarReturnValue, localVarHTTPResponse, newErr } - return localVarHTTPResponse, nil + err = a.client.decode(&localVarReturnValue, localVarBody, localVarHTTPResponse.Header.Get("Content-Type")) + if err != nil { + newErr := GenericOpenAPIError{ + body: localVarBody, + error: err.Error(), + } + return localVarReturnValue, localVarHTTPResponse, newErr + } + + return localVarReturnValue, localVarHTTPResponse, nil } -// UpgradeConnectorsByTypeOpts Optional parameters for the method 'UpgradeConnectorsByType' -type UpgradeConnectorsByTypeOpts struct { +// UpgradeConnectorsByOperatorOpts Optional parameters for the method 'UpgradeConnectorsByOperator' +type UpgradeConnectorsByOperatorOpts struct { Page optional.String Size optional.String } /* -UpgradeConnectorsByType upgrade a connector cluster +UpgradeConnectorsByOperator upgrade a connector cluster * @param ctx _context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background(). * @param connectorClusterId The id of the connector cluster - * @param connectorAvailableTypeUpgrade List of connectors to upgrade - * @param optional nil or *UpgradeConnectorsByTypeOpts - Optional Parameters: + * @param connectorAvailableOperatorUpgrade List of connectors to upgrade + * @param optional nil or *UpgradeConnectorsByOperatorOpts - Optional Parameters: * @param "Page" (optional.String) - Page index * @param "Size" (optional.String) - Number of items in each page */ -func (a *ConnectorClustersAdminApiService) UpgradeConnectorsByType(ctx _context.Context, connectorClusterId string, connectorAvailableTypeUpgrade []ConnectorAvailableTypeUpgrade, localVarOptionals *UpgradeConnectorsByTypeOpts) (*_nethttp.Response, error) { +func (a *ConnectorClustersAdminApiService) UpgradeConnectorsByOperator(ctx _context.Context, connectorClusterId string, connectorAvailableOperatorUpgrade []ConnectorAvailableOperatorUpgrade, localVarOptionals *UpgradeConnectorsByOperatorOpts) (*_nethttp.Response, error) { var ( localVarHTTPMethod = _nethttp.MethodPut localVarPostBody interface{} @@ -1816,7 +1720,7 @@ func (a *ConnectorClustersAdminApiService) UpgradeConnectorsByType(ctx _context. ) // create path and map variables - localVarPath := a.client.cfg.BasePath + "/api/connector_mgmt/v1/admin/kafka_connector_clusters/{connector_cluster_id}/upgrades/type" + localVarPath := a.client.cfg.BasePath + "/api/connector_mgmt/v1/admin/kafka_connector_clusters/{connector_cluster_id}/upgrades/operator" localVarPath = strings.Replace(localVarPath, "{"+"connector_cluster_id"+"}", _neturl.QueryEscape(parameterToString(connectorClusterId, "")), -1) localVarHeaderParams := make(map[string]string) @@ -1847,7 +1751,7 @@ func (a *ConnectorClustersAdminApiService) UpgradeConnectorsByType(ctx _context. localVarHeaderParams["Accept"] = localVarHTTPHeaderAccept } // body params - localVarPostBody = &connectorAvailableTypeUpgrade + localVarPostBody = &connectorAvailableOperatorUpgrade r, err := a.client.prepareRequest(ctx, localVarPath, localVarHTTPMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, localVarFormFileName, localVarFileName, localVarFileBytes) if err != nil { return nil, err diff --git a/internal/connector/internal/api/admin/private/model_connector_available_operator_upgrade.go b/internal/connector/internal/api/admin/private/model_connector_available_operator_upgrade.go index f41298df2..718658e4c 100644 --- a/internal/connector/internal/api/admin/private/model_connector_available_operator_upgrade.go +++ b/internal/connector/internal/api/admin/private/model_connector_available_operator_upgrade.go @@ -11,9 +11,9 @@ package private // ConnectorAvailableOperatorUpgrade An available operator upgrade for a connector type ConnectorAvailableOperatorUpgrade struct { - ConnectorId string `json:"connector_id,omitempty"` - NamespaceId string `json:"namespace_id,omitempty"` - ConnectorTypeId string `json:"connector_type_id,omitempty"` - Channel string `json:"channel,omitempty"` - Operator ConnectorAvailableOperatorUpgradeOperator `json:"operator,omitempty"` + ConnectorId string `json:"connector_id,omitempty"` + NamespaceId string `json:"namespace_id,omitempty"` + ConnectorTypeId string `json:"connector_type_id,omitempty"` + Channel string `json:"channel,omitempty"` + Operator ConnectorUpgradeStatus `json:"operator,omitempty"` } diff --git a/internal/connector/internal/api/admin/private/model_connector_available_type_upgrade.go b/internal/connector/internal/api/admin/private/model_connector_available_type_upgrade.go deleted file mode 100644 index 623475179..000000000 --- a/internal/connector/internal/api/admin/private/model_connector_available_type_upgrade.go +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Connector Service Fleet Manager Admin APIs - * - * Connector Service Fleet Manager Admin is a Rest API to manage connector clusters. - * - * API version: 0.0.3 - * Generated by: OpenAPI Generator (https://openapi-generator.tech) - */ - -package private - -// ConnectorAvailableTypeUpgrade An available type upgrade for a connector -type ConnectorAvailableTypeUpgrade struct { - ConnectorId string `json:"connector_id,omitempty"` - NamespaceId string `json:"namespace_id,omitempty"` - ConnectorTypeId string `json:"connector_type_id,omitempty"` - Channel string `json:"channel,omitempty"` - ShardMetadata ConnectorAvailableTypeUpgradeShardMetadata `json:"shard_metadata,omitempty"` -} diff --git a/internal/connector/internal/api/admin/private/model_connector_available_type_upgrade_list.go b/internal/connector/internal/api/admin/private/model_connector_available_type_upgrade_list.go deleted file mode 100644 index fcf473eb1..000000000 --- a/internal/connector/internal/api/admin/private/model_connector_available_type_upgrade_list.go +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Connector Service Fleet Manager Admin APIs - * - * Connector Service Fleet Manager Admin is a Rest API to manage connector clusters. - * - * API version: 0.0.3 - * Generated by: OpenAPI Generator (https://openapi-generator.tech) - */ - -package private - -// ConnectorAvailableTypeUpgradeList struct for ConnectorAvailableTypeUpgradeList -type ConnectorAvailableTypeUpgradeList struct { - Kind string `json:"kind"` - Page int32 `json:"page"` - Size int32 `json:"size"` - Total int32 `json:"total"` - Items []ConnectorAvailableTypeUpgrade `json:"items"` -} diff --git a/internal/connector/internal/api/admin/private/model_connector_available_type_upgrade_shard_metadata.go b/internal/connector/internal/api/admin/private/model_connector_available_type_upgrade_shard_metadata.go deleted file mode 100644 index d4da410f2..000000000 --- a/internal/connector/internal/api/admin/private/model_connector_available_type_upgrade_shard_metadata.go +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Connector Service Fleet Manager Admin APIs - * - * Connector Service Fleet Manager Admin is a Rest API to manage connector clusters. - * - * API version: 0.0.3 - * Generated by: OpenAPI Generator (https://openapi-generator.tech) - */ - -package private - -// ConnectorAvailableTypeUpgradeShardMetadata struct for ConnectorAvailableTypeUpgradeShardMetadata -type ConnectorAvailableTypeUpgradeShardMetadata struct { - AssignedId int64 `json:"assigned_id,omitempty"` - AvailableId int64 `json:"available_id,omitempty"` -} diff --git a/internal/connector/internal/api/admin/private/model_connector_deployment_admin_status.go b/internal/connector/internal/api/admin/private/model_connector_deployment_admin_status.go new file mode 100644 index 000000000..1f7327e73 --- /dev/null +++ b/internal/connector/internal/api/admin/private/model_connector_deployment_admin_status.go @@ -0,0 +1,19 @@ +/* + * Connector Service Fleet Manager Admin APIs + * + * Connector Service Fleet Manager Admin is a Rest API to manage connector clusters. + * + * API version: 0.0.3 + * Generated by: OpenAPI Generator (https://openapi-generator.tech) + */ + +package private + +// ConnectorDeploymentAdminStatus The status of connector deployment +type ConnectorDeploymentAdminStatus struct { + Phase ConnectorState `json:"phase,omitempty"` + ResourceVersion int64 `json:"resource_version,omitempty"` + ShardMetadata ConnectorDeploymentAdminStatusShardMetadata `json:"shard_metadata,omitempty"` + Operators ConnectorDeploymentAdminStatusOperators `json:"operators,omitempty"` + Conditions []MetaV1Condition `json:"conditions,omitempty"` +} diff --git a/internal/connector/internal/api/admin/private/model_connector_deployment_status_operators.go b/internal/connector/internal/api/admin/private/model_connector_deployment_admin_status_operators.go similarity index 71% rename from internal/connector/internal/api/admin/private/model_connector_deployment_status_operators.go rename to internal/connector/internal/api/admin/private/model_connector_deployment_admin_status_operators.go index 6bd62629d..825f58460 100644 --- a/internal/connector/internal/api/admin/private/model_connector_deployment_status_operators.go +++ b/internal/connector/internal/api/admin/private/model_connector_deployment_admin_status_operators.go @@ -9,8 +9,8 @@ package private -// ConnectorDeploymentStatusOperators struct for ConnectorDeploymentStatusOperators -type ConnectorDeploymentStatusOperators struct { +// ConnectorDeploymentAdminStatusOperators struct for ConnectorDeploymentAdminStatusOperators +type ConnectorDeploymentAdminStatusOperators struct { Assigned ConnectorOperator `json:"assigned,omitempty"` Available ConnectorOperator `json:"available,omitempty"` } diff --git a/internal/connector/internal/api/admin/private/model_connector_deployment_admin_status_shard_metadata.go b/internal/connector/internal/api/admin/private/model_connector_deployment_admin_status_shard_metadata.go new file mode 100644 index 000000000..f79c2c543 --- /dev/null +++ b/internal/connector/internal/api/admin/private/model_connector_deployment_admin_status_shard_metadata.go @@ -0,0 +1,16 @@ +/* + * Connector Service Fleet Manager Admin APIs + * + * Connector Service Fleet Manager Admin is a Rest API to manage connector clusters. + * + * API version: 0.0.3 + * Generated by: OpenAPI Generator (https://openapi-generator.tech) + */ + +package private + +// ConnectorDeploymentAdminStatusShardMetadata latest available revision of deployment shared metadata +type ConnectorDeploymentAdminStatusShardMetadata struct { + Assigned ConnectorShardMetadata `json:"assigned,omitempty"` + Available ConnectorShardMetadata `json:"available,omitempty"` +} diff --git a/internal/connector/internal/api/admin/private/model_connector_deployment_admin_view.go b/internal/connector/internal/api/admin/private/model_connector_deployment_admin_view.go index 33b68082e..f452a536b 100644 --- a/internal/connector/internal/api/admin/private/model_connector_deployment_admin_view.go +++ b/internal/connector/internal/api/admin/private/model_connector_deployment_admin_view.go @@ -16,5 +16,5 @@ type ConnectorDeploymentAdminView struct { Href string `json:"href,omitempty"` Metadata ConnectorDeploymentAdminViewAllOfMetadata `json:"metadata,omitempty"` Spec ConnectorDeploymentAdminSpec `json:"spec,omitempty"` - Status ConnectorDeploymentStatus `json:"status,omitempty"` + Status ConnectorDeploymentAdminStatus `json:"status,omitempty"` } diff --git a/internal/connector/internal/api/admin/private/model_connector_deployment_status.go b/internal/connector/internal/api/admin/private/model_connector_deployment_status.go deleted file mode 100644 index abd985d17..000000000 --- a/internal/connector/internal/api/admin/private/model_connector_deployment_status.go +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Connector Service Fleet Manager Admin APIs - * - * Connector Service Fleet Manager Admin is a Rest API to manage connector clusters. - * - * API version: 0.0.3 - * Generated by: OpenAPI Generator (https://openapi-generator.tech) - */ - -package private - -// ConnectorDeploymentStatus The status of connector deployment -type ConnectorDeploymentStatus struct { - Phase ConnectorState `json:"phase,omitempty"` - ResourceVersion int64 `json:"resource_version,omitempty"` - Operators ConnectorDeploymentStatusOperators `json:"operators,omitempty"` - Conditions []MetaV1Condition `json:"conditions,omitempty"` -} diff --git a/internal/connector/internal/api/admin/private/model_connector_shard_metadata.go b/internal/connector/internal/api/admin/private/model_connector_shard_metadata.go new file mode 100644 index 000000000..da32d65fb --- /dev/null +++ b/internal/connector/internal/api/admin/private/model_connector_shard_metadata.go @@ -0,0 +1,20 @@ +/* + * Connector Service Fleet Manager Admin APIs + * + * Connector Service Fleet Manager Admin is a Rest API to manage connector clusters. + * + * API version: 0.0.3 + * Generated by: OpenAPI Generator (https://openapi-generator.tech) + */ + +package private + +// ConnectorShardMetadata identifies a shard metadata of a connector type. +type ConnectorShardMetadata struct { + // the channel of the shard metadata + Channel string `json:"channel,omitempty"` + // the connector type id this shard metadata refers to + ConnectorTypeId string `json:"connector_type_id,omitempty"` + // the revision of the shard metadate + Revision int64 `json:"revision,omitempty"` +} diff --git a/internal/connector/internal/api/admin/private/model_connector_available_operator_upgrade_operator.go b/internal/connector/internal/api/admin/private/model_connector_upgrade_status.go similarity index 69% rename from internal/connector/internal/api/admin/private/model_connector_available_operator_upgrade_operator.go rename to internal/connector/internal/api/admin/private/model_connector_upgrade_status.go index f27d005d3..9e12e4b4f 100644 --- a/internal/connector/internal/api/admin/private/model_connector_available_operator_upgrade_operator.go +++ b/internal/connector/internal/api/admin/private/model_connector_upgrade_status.go @@ -9,8 +9,8 @@ package private -// ConnectorAvailableOperatorUpgradeOperator struct for ConnectorAvailableOperatorUpgradeOperator -type ConnectorAvailableOperatorUpgradeOperator struct { +// ConnectorUpgradeStatus Assigned and available update ids +type ConnectorUpgradeStatus struct { AssignedId string `json:"assigned_id,omitempty"` AvailableId string `json:"available_id,omitempty"` } diff --git a/internal/connector/internal/api/dbapi/connector.go b/internal/connector/internal/api/dbapi/connector.go index b7a972b14..2de759f42 100644 --- a/internal/connector/internal/api/dbapi/connector.go +++ b/internal/connector/internal/api/dbapi/connector.go @@ -84,15 +84,17 @@ type ConnectorWithConditionsList []*ConnectorWithConditions // ConnectorDeployment Holds the deployment configuration of a connector type ConnectorDeployment struct { db.Model - Version int64 - ConnectorID string - OperatorID string - ConnectorVersion int64 - ConnectorTypeChannelId int64 - ClusterID string - NamespaceID string - AllowUpgrade bool - Status ConnectorDeploymentStatus `gorm:"foreignKey:ID"` + Version int64 + ConnectorID string + Connector Connector + OperatorID string + ConnectorVersion int64 + ConnectorShardMetadataID int64 + ConnectorShardMetadata ConnectorShardMetadata + ClusterID string + NamespaceID string + AllowUpgrade bool + Status ConnectorDeploymentStatus `gorm:"foreignKey:ID;references:ID"` } type ConnectorDeploymentList []ConnectorDeployment @@ -122,22 +124,6 @@ type ServiceAccount struct { ClientSecretRef string `gorm:"column:client_secret"` } -type ConnectorDeploymentTypeUpgrade struct { - ConnectorID string `json:"connector_id,omitempty"` - DeploymentID string `json:"deployment_id,omitempty"` - ConnectorTypeId string `json:"connector_type_id,omitempty"` - NamespaceID string `json:"namespace_id,omitempty"` - Channel string `json:"channel,omitempty"` - ShardMetadata *ConnectorTypeUpgrade `json:"shard_metadata,omitempty"` -} - -type ConnectorTypeUpgrade struct { - AssignedId int64 `json:"assigned_id,omitempty"` - AvailableId int64 `json:"available_id,omitempty"` -} - -type ConnectorDeploymentTypeUpgradeList []ConnectorDeploymentTypeUpgrade - type ConnectorDeploymentOperatorUpgrade struct { ConnectorID string `json:"connector_id,omitempty"` DeploymentID string `json:"deployment_id,omitempty"` diff --git a/internal/connector/internal/api/dbapi/connector_type.go b/internal/connector/internal/api/dbapi/connector_type.go index 1e43c4864..a4ded3f91 100644 --- a/internal/connector/internal/api/dbapi/connector_type.go +++ b/internal/connector/internal/api/dbapi/connector_type.go @@ -2,9 +2,10 @@ package dbapi import ( "encoding/json" - "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db" "time" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/api" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/errors" "gorm.io/gorm" @@ -50,11 +51,12 @@ type ConnectorTypeCapability struct { } type ConnectorShardMetadata struct { - ID int64 `gorm:"primaryKey:autoIncrement"` - ConnectorTypeId string `gorm:"primaryKey"` - Channel string `gorm:"primaryKey"` + ID int64 `gorm:"primaryKey:autoIncrement"` + ConnectorTypeId string `gorm:"index:idx_typeid_channel_revision;index:idx_typeid_channel"` + Channel string `gorm:"index:idx_typeid_channel_revision;index:idx_typeid_channel"` + Revision int64 `gorm:"index:idx_typeid_channel_revision;default:0"` + LatestRevision *int64 ShardMetadata api.JSON `gorm:"type:jsonb"` - LatestId *int64 } type ConnectorCatalogEntry struct { diff --git a/internal/connector/internal/config/connectors.go b/internal/connector/internal/config/connectors.go index aad052fcc..62fe0aa25 100644 --- a/internal/connector/internal/config/connectors.go +++ b/internal/connector/internal/config/connectors.go @@ -4,13 +4,14 @@ import ( "crypto/sha1" "encoding/json" "fmt" - "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared/utils/files" "io/fs" "io/ioutil" "os" "sort" "strings" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared/utils/files" + gherrors "github.com/pkg/errors" "time" @@ -35,7 +36,6 @@ type ConnectorsConfig struct { var _ environments.ConfigModule = &ConnectorsConfig{} type ConnectorChannelConfig struct { - Revision int64 `json:"revision,omitempty"` ShardMetadata map[string]interface{} `json:"shard_metadata,omitempty"` } diff --git a/internal/connector/internal/environments/integration.go b/internal/connector/internal/environments/integration.go index 56b368f2d..d44949760 100644 --- a/internal/connector/internal/environments/integration.go +++ b/internal/connector/internal/environments/integration.go @@ -43,6 +43,7 @@ func (b IntegrationEnvLoader) Defaults() map[string]string { func (b IntegrationEnvLoader) ModifyConfiguration(env *environments.Env) error { // Support a one-off env to allow enabling db debug in testing var databaseConfig *db.DatabaseConfig + env.MustResolveAll(&databaseConfig) if os.Getenv("DB_DEBUG") == "true" { databaseConfig.Debug = true } diff --git a/internal/connector/internal/handlers/connector_admin.go b/internal/connector/internal/handlers/connector_admin.go index 6fad5c74e..5865fee64 100644 --- a/internal/connector/internal/handlers/connector_admin.go +++ b/internal/connector/internal/handlers/connector_admin.go @@ -2,11 +2,13 @@ package handlers import ( "fmt" + "strconv" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/admin/private" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/config" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/services/authz" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/workers" "gorm.io/gorm" - "strconv" "net/http" @@ -29,7 +31,7 @@ type ConnectorAdminHandler struct { ConnectorsService services.ConnectorsService NamespaceService services.ConnectorNamespaceService QuotaConfig *config.ConnectorsQuotaConfig - ConnectorCluster *ConnectorClusterHandler // TODO eventually move deployent handling into a deployment service + ConnectorCluster *ConnectorClusterHandler //TODO: eventually move deployment handling into a deployment service ConnectorTypesService services.ConnectorTypesService } @@ -86,59 +88,6 @@ func (h *ConnectorAdminHandler) ListConnectorClusters(w http.ResponseWriter, r * handlers.HandleList(w, r, cfg) } -func (h *ConnectorAdminHandler) GetConnectorUpgradesByType(writer http.ResponseWriter, request *http.Request) { - - id := mux.Vars(request)["connector_cluster_id"] - listArgs := coreservices.NewListArguments(request.URL.Query()) - - cfg := handlers.HandlerConfig{ - Validate: []handlers.Validate{ - handlers.Validation("connector_cluster_id", &id, handlers.MinLen(1), handlers.MaxLen(maxConnectorClusterIdLength)), - }, - Action: func() (i interface{}, serviceError *errors.ServiceError) { - - upgrades, paging, serviceError := h.Service.GetAvailableDeploymentTypeUpgrades(listArgs) - if serviceError != nil { - return nil, serviceError - } - result := make([]private.ConnectorAvailableTypeUpgrade, len(upgrades)) - for j := range upgrades { - result[j] = *presenters.PresentConnectorAvailableTypeUpgrade(&upgrades[j]) - } - - i = private.ConnectorAvailableTypeUpgradeList{ - Page: int32(paging.Page), - Size: int32(paging.Size), - Total: int32(paging.Total), - Items: result, - } - return - }, - } - - handlers.HandleGet(writer, request, &cfg) -} - -func (h *ConnectorAdminHandler) UpgradeConnectorsByType(writer http.ResponseWriter, request *http.Request) { - resource := make([]private.ConnectorAvailableTypeUpgrade, 0) - id := mux.Vars(request)["connector_cluster_id"] - cfg := handlers.HandlerConfig{ - MarshalInto: &resource, - Validate: []handlers.Validate{ - handlers.Validation("connector_cluster_id", &id, handlers.MinLen(1), handlers.MaxLen(maxConnectorClusterIdLength)), - }, - Action: func() (i interface{}, serviceError *errors.ServiceError) { - - upgrades := make([]dbapi.ConnectorDeploymentTypeUpgrade, len(resource)) - for i2 := range resource { - upgrades[i2] = *presenters.ConvertConnectorAvailableTypeUpgrade(&resource[i2]) - } - return nil, h.Service.UpgradeConnectorsByType(request.Context(), id, upgrades) - }, - } - handlers.Handle(writer, request, &cfg, http.StatusNoContent) -} - func (h *ConnectorAdminHandler) GetConnectorUpgradesByOperator(writer http.ResponseWriter, request *http.Request) { id := mux.Vars(request)["connector_cluster_id"] listArgs := coreservices.NewListArguments(request.URL.Query()) @@ -470,15 +419,17 @@ func (h *ConnectorAdminHandler) DeleteConnector(writer http.ResponseWriter, requ } func (h *ConnectorAdminHandler) GetClusterDeployments(writer http.ResponseWriter, request *http.Request) { - id := mux.Vars(request)["connector_cluster_id"] + clusterId := mux.Vars(request)["connector_cluster_id"] + channelUpdates := request.URL.Query().Get("channel_updates") + danglingDeployments := request.URL.Query().Get("dangling_deployments") listArgs := coreservices.NewListArguments(request.URL.Query()) cfg := handlers.HandlerConfig{ Validate: []handlers.Validate{ - handlers.Validation("connector_cluster_id", &id, handlers.MinLen(1), handlers.MaxLen(maxConnectorClusterIdLength)), + handlers.Validation("connector_cluster_id", &clusterId, handlers.MinLen(1), handlers.MaxLen(maxConnectorClusterIdLength)), }, Action: func() (interface{}, *errors.ServiceError) { - deployments, paging, err := h.Service.ListConnectorDeployments(request.Context(), id, listArgs, 0) + deployments, paging, err := h.Service.ListConnectorDeployments(request.Context(), clusterId, channelUpdates, danglingDeployments, listArgs, 0) if err != nil { return nil, err } @@ -492,14 +443,11 @@ func (h *ConnectorAdminHandler) GetClusterDeployments(writer http.ResponseWriter result.Items = make([]private.ConnectorDeploymentAdminView, len(deployments)) for i, deployment := range deployments { - pd, err := h.ConnectorCluster.presentDeployment(request, deployment) - if err != nil { - return nil, err - } - result.Items[i], err = presenters.PresentConnectorDeploymentAdminView(pd, deployment.ClusterID) + deploymentAdminView, err := presenters.PresentConnectorDeploymentAdminView(deployment, clusterId) if err != nil { return nil, err } + result.Items[i] = deploymentAdminView } return result, nil @@ -527,15 +475,7 @@ func (h *ConnectorAdminHandler) GetConnectorDeployment(writer http.ResponseWrite if deployment.ClusterID != clusterId { return nil, coreservices.HandleGetError(`Connector deployment`, `id`, deploymentId, gorm.ErrRecordNotFound) } - pd, err := h.ConnectorCluster.presentDeployment(request, deployment) - if err != nil { - return nil, err - } - result, err := presenters.PresentConnectorDeploymentAdminView(pd, deployment.ClusterID) - if err != nil { - return nil, err - } - return result, nil + return presenters.PresentConnectorDeploymentAdminView(deployment, clusterId) }, } @@ -543,20 +483,22 @@ func (h *ConnectorAdminHandler) GetConnectorDeployment(writer http.ResponseWrite } func (h *ConnectorAdminHandler) GetNamespaceDeployments(writer http.ResponseWriter, request *http.Request) { - id := mux.Vars(request)["namespace_id"] + namespaceId := mux.Vars(request)["namespace_id"] + channelUpdates := request.URL.Query().Get("channel_updates") listArgs := coreservices.NewListArguments(request.URL.Query()) + danglingDeployments := request.URL.Query().Get("dangling_deployments") cfg := handlers.HandlerConfig{ Validate: []handlers.Validate{ - handlers.Validation("namespace_id", &id, handlers.MinLen(1), handlers.MaxLen(maxConnectorNamespaceIdLength)), + handlers.Validation("namespace_id", &namespaceId, handlers.MinLen(1), handlers.MaxLen(maxConnectorNamespaceIdLength)), }, Action: func() (interface{}, *errors.ServiceError) { if len(listArgs.Search) == 0 { - listArgs.Search = fmt.Sprintf("namespace_id = %s", id) + listArgs.Search = fmt.Sprintf("namespace_id = %s", namespaceId) } else { - listArgs.Search = fmt.Sprintf("namespace_id = %s AND (%s)", id, listArgs.Search) + listArgs.Search = fmt.Sprintf("namespace_id = %s AND (%s)", namespaceId, listArgs.Search) } - deployments, paging, err := h.Service.ListConnectorDeployments(request.Context(), "", listArgs, 0) + deployments, paging, err := h.Service.ListConnectorDeployments(request.Context(), "", channelUpdates, danglingDeployments, listArgs, 0) if err != nil { return nil, err } @@ -570,14 +512,11 @@ func (h *ConnectorAdminHandler) GetNamespaceDeployments(writer http.ResponseWrit result.Items = make([]private.ConnectorDeploymentAdminView, len(deployments)) for i, deployment := range deployments { - pd, err := h.ConnectorCluster.presentDeployment(request, deployment) - if err != nil { - return nil, err - } - result.Items[i], err = presenters.PresentConnectorDeploymentAdminView(pd, deployment.ClusterID) + deploymentAdminView, err := presenters.PresentConnectorDeploymentAdminView(deployment, deployment.ClusterID) if err != nil { return nil, err } + result.Items[i] = deploymentAdminView } return result, nil @@ -646,6 +585,65 @@ func (h *ConnectorAdminHandler) GetConnectorType(writer http.ResponseWriter, req handlers.HandleGet(writer, request, &cfg) } +func (h *ConnectorAdminHandler) PatchConnectorDeployment(writer http.ResponseWriter, request *http.Request) { + clusterId := mux.Vars(request)["connector_cluster_id"] + deploymentId := mux.Vars(request)["deployment_id"] + var resource private.ConnectorDeploymentAdminView + cfg := handlers.HandlerConfig{ + MarshalInto: &resource, + Validate: []handlers.Validate{ + handlers.Validation("connector_cluster_id", &clusterId, handlers.MinLen(1), handlers.MaxLen(maxConnectorClusterIdLength)), + handlers.Validation("deployment_id", &deploymentId, handlers.MinLen(1), handlers.MaxLen(maxConnectorIdLength)), + }, + Action: func() (i interface{}, serviceError *errors.ServiceError) { + // Get existing deployment + existingDeployment, serviceError := h.Service.GetDeployment(request.Context(), deploymentId) + if serviceError != nil { + return nil, serviceError + } + // check if the cluster ids match + if existingDeployment.ClusterID != clusterId { + return nil, coreservices.HandleGetError(`Connector existingDeployment`, `id`, deploymentId, gorm.ErrRecordNotFound) + } + + // Handle the fields that support being updated... + var updatedDeployment dbapi.ConnectorDeployment + updatedDeployment.ID = existingDeployment.ID + if len(resource.Spec.ShardMetadata) != 0 { + // channel update + updateRevision, err := workers.GetShardMetadataRevision(resource.Spec.ShardMetadata) + if err != nil { + return nil, errors.GeneralError("Error in patching deployment, updateRevision not found in shardMetadata %+v: %v", resource.Spec.ShardMetadata, err.Error()) + } + updateShardMetadata, serr := h.ConnectorTypesService.GetConnectorShardMetadata(existingDeployment.ConnectorShardMetadata.ConnectorTypeId, existingDeployment.ConnectorShardMetadata.Channel, updateRevision) + if serr != nil { + return nil, errors.GeneralError("Error in patching deployment, shardMetadata %+v not found: %v", resource.Spec.ShardMetadata, err.Error()) + } + updatedDeployment.ConnectorShardMetadataID = updateShardMetadata.ID + updatedDeployment.ConnectorShardMetadata = *updateShardMetadata + } else { + // Spec.ShardMetadata is the only updatable field for now + return presenters.PresentConnectorDeploymentAdminView(existingDeployment, clusterId) + } + + // update + uerr := h.Service.UpdateDeployment(&updatedDeployment) + if uerr != nil { + return nil, uerr + } + + // read updated deployment back + existingDeployment, serviceError = h.Service.GetDeployment(request.Context(), deploymentId) + if serviceError != nil { + return nil, serviceError + } + return presenters.PresentConnectorDeploymentAdminView(existingDeployment, clusterId) + }, + } + + handlers.Handle(writer, request, &cfg, http.StatusAccepted) +} + func (h *ConnectorAdminHandler) isEvalOrg(id string) bool { for _, eid := range h.ConnectorsConfig.ConnectorEvalOrganizations { if id == eid { diff --git a/internal/connector/internal/handlers/connector_agent.go b/internal/connector/internal/handlers/connector_agent.go index 721f01379..0c8f9bcd2 100644 --- a/internal/connector/internal/handlers/connector_agent.go +++ b/internal/connector/internal/handlers/connector_agent.go @@ -121,7 +121,7 @@ func (h *ConnectorClusterHandler) ListDeployments(w http.ResponseWriter, r *http getList := func() (list private.ConnectorDeploymentList, err *errors.ServiceError) { - resources, paging, err := h.Service.ListConnectorDeployments(ctx, connectorClusterId, listArgs, gtVersion) + resources, paging, err := h.Service.ListConnectorDeployments(ctx, connectorClusterId, "", "", listArgs, gtVersion) if err != nil { return } @@ -134,7 +134,7 @@ func (h *ConnectorClusterHandler) ListDeployments(w http.ResponseWriter, r *http } for _, resource := range resources { - converted, serviceError := h.presentDeployment(r, resource) + converted, serviceError := h.resolveConnectorRefsAndPresentDeployment(resource) if serviceError != nil { sentry.CaptureException(serviceError) glog.Errorf("failed to present connector deployment %s: %v", resource.ID, serviceError) @@ -222,58 +222,24 @@ func (h *ConnectorClusterHandler) ListDeployments(w http.ResponseWriter, r *http handlers.HandleList(w, r, cfg) } -func (h *ConnectorClusterHandler) presentDeployment(r *http.Request, resource dbapi.ConnectorDeployment) (private.ConnectorDeployment, *errors.ServiceError) { - converted, err := presenters.PresentConnectorDeployment(resource) - if err != nil { - return private.ConnectorDeployment{}, err - } - +func (h *ConnectorClusterHandler) resolveConnectorRefsAndPresentDeployment(connectorDeployment dbapi.ConnectorDeployment) (private.ConnectorDeployment, *errors.ServiceError) { // avoid ignoring this deployment altogether if there is an issue in getting secrets from the vault - ctx := r.Context() - apiSpec, invalidSecrets, err := h.Service.GetConnectorWithBase64Secrets(ctx, resource) + invalidSecrets, err := h.Connectors.ResolveConnectorRefsWithBase64Secrets(&connectorDeployment.Connector) if err != nil { if invalidSecrets { // log error in getting secrets and signal that connector spec doesn't have secrets - glog.Errorf("Error getting connector %s with base64 secrets: %s", apiSpec.ID, err) - apiSpec.ConnectorSpec = []byte("{}") + glog.Errorf("Error getting connector %s with base64 secrets: %s", connectorDeployment.Connector.ID, err) + connectorDeployment.Connector.ConnectorSpec = []byte("{}") } else { return private.ConnectorDeployment{}, err } } - converted.Metadata.ResolvedSecrets = !invalidSecrets - - pc, err := presenters.PresentConnector(&apiSpec) - if err != nil { - return private.ConnectorDeployment{}, err - } - shardMetadataJson, err := h.ConnectorTypes.GetConnectorShardMetadata(resource.ConnectorTypeChannelId) + converted, err := presenters.PresentConnectorDeployment(connectorDeployment, !invalidSecrets) if err != nil { return private.ConnectorDeployment{}, err } - shardMetadata, err2 := shardMetadataJson.ShardMetadata.Object() - if err2 != nil { - return private.ConnectorDeployment{}, errors.GeneralError("failed to convert shard metadata") - } - - converted.Spec.ShardMetadata = shardMetadata - converted.Spec.ConnectorSpec = pc.Connector - converted.Spec.DesiredState = private.ConnectorDesiredState(pc.DesiredState) - converted.Spec.ConnectorId = pc.Id - converted.Spec.Kafka = private.KafkaConnectionSettings{ - Id: pc.Kafka.Id, - Url: pc.Kafka.Url, - } - converted.Spec.SchemaRegistry = private.SchemaRegistryConnectionSettings{ - Id: pc.SchemaRegistry.Id, - Url: pc.SchemaRegistry.Url, - } - converted.Spec.ServiceAccount = private.ServiceAccount{ - ClientId: pc.ServiceAccount.ClientId, - ClientSecret: pc.ServiceAccount.ClientSecret, - } - converted.Spec.ConnectorTypeId = pc.ConnectorTypeId return converted, nil } @@ -311,7 +277,7 @@ func (h *ConnectorClusterHandler) GetDeployment(w http.ResponseWriter, r *http.R return nil, errors.NotFound("Connector deployment not found") } - return h.presentDeployment(r, resource) + return h.resolveConnectorRefsAndPresentDeployment(resource) }, } handlers.HandleGet(w, r, cfg) diff --git a/internal/connector/internal/handlers/connector_cluster.go b/internal/connector/internal/handlers/connector_cluster.go index 6f3592d3b..dcd38ba9d 100644 --- a/internal/connector/internal/handlers/connector_cluster.go +++ b/internal/connector/internal/handlers/connector_cluster.go @@ -1,6 +1,9 @@ package handlers import ( + "net/http" + "net/url" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/config" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/services/authz" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/api" @@ -8,8 +11,6 @@ import ( "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/server" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services/sso" "github.com/golang/glog" - "net/http" - "net/url" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services/signalbus" @@ -35,7 +36,7 @@ type ConnectorClusterHandler struct { Bus signalbus.SignalBus Service services.ConnectorClusterService Keycloak sso.KafkaKeycloakService - ConnectorTypes services.ConnectorTypesService + Connectors services.ConnectorsService ConnectorNamespace services.ConnectorNamespaceService Vault vault.VaultService ServerConfig *server.ServerConfig diff --git a/internal/connector/internal/handlers/connectors.go b/internal/connector/internal/handlers/connectors.go index 14f3e25c4..4f4337d82 100644 --- a/internal/connector/internal/handlers/connectors.go +++ b/internal/connector/internal/handlers/connectors.go @@ -228,7 +228,6 @@ func (h ConnectorsHandler) Patch(w http.ResponseWriter, r *http.Request) { validates := []handlers.Validate{ handlers.Validation("name", &resource.Name, handlers.MinLen(1), handlers.MaxLen(100)), handlers.Validation("connector_type_id", &resource.ConnectorTypeId, handlers.MinLen(1), handlers.MaxLen(maxKafkaNameLength)), - // handlers.Validation("kafka_id", &resource.Metadata.KafkaId, handlers.MinLen(1), handlers.MaxLen(maxKafkaNameLength)), handlers.Validation("service_account.client_id", &resource.ServiceAccount.ClientId, handlers.MinLen(1)), handlers.Validation("desired_state", (*string)(&resource.DesiredState), handlers.IsOneOf(dbapi.ValidDesiredStates...)), validateConnector(h.connectorTypesService, &resource, connectorTypeId), diff --git a/internal/connector/internal/migrations/202202070000_add_connector_namespace_tables.go b/internal/connector/internal/migrations/202202070000_add_connector_namespace_tables.go index 40b5b170a..9695e4f33 100644 --- a/internal/connector/internal/migrations/202202070000_add_connector_namespace_tables.go +++ b/internal/connector/internal/migrations/202202070000_add_connector_namespace_tables.go @@ -6,9 +6,10 @@ package migrations // is done here, even though the same type is defined in pkg/api import ( + "time" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db" "github.com/go-gormigrate/gormigrate/v2" - "time" ) func addConnectorNamespaceTables(migrationId string) *gormigrate.Migration { diff --git a/internal/connector/internal/migrations/202206130000_refactor_channel_and_shard_metadata.go b/internal/connector/internal/migrations/202206130000_refactor_channel_and_shard_metadata.go new file mode 100644 index 000000000..d8c8d2cd7 --- /dev/null +++ b/internal/connector/internal/migrations/202206130000_refactor_channel_and_shard_metadata.go @@ -0,0 +1,120 @@ +package migrations + +// Migrations should NEVER use types from other packages. Types can change +// and then migrations run on a _new_ database will fail or behave unexpectedly. +// Instead of importing types, always re-create the type in the migration, as +// is done here, even though the same type is defined in pkg/api + +import ( + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/api" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db" + "github.com/go-gormigrate/gormigrate/v2" +) + +func refactorChannelAndShardMetadata(migrationId string) *gormigrate.Migration { + type ConnectorShardMetadata struct { + ID int64 `gorm:"primaryKey:autoIncrement"` + ConnectorTypeId string `gorm:"index:idx_typeid_channel_revision;index:idx_typeid_channel"` + Channel string `gorm:"index:idx_typeid_channel_revision;index:idx_typeid_channel"` + Revision int64 `gorm:"index:idx_typeid_channel_revision;default:0"` + LatestRevision *int64 + ShardMetadata api.JSON `gorm:"type:jsonb"` + } + + type ConnectorStatusPhase string + + type ConnectorDeploymentStatus struct { + db.Model + Phase ConnectorStatusPhase + Version int64 + Conditions api.JSON `gorm:"type:jsonb"` + Operators api.JSON `gorm:"type:jsonb"` + UpgradeAvailable bool + } + + type ConnectorDesiredState string + + type KafkaConnectionSettings struct { + KafkaID string `gorm:"column:id"` + BootstrapServer string + } + + type SchemaRegistryConnectionSettings struct { + SchemaRegistryID string `gorm:"column:id"` + Url string + } + + type ServiceAccount struct { + ClientId string + ClientSecret string `gorm:"-"` + ClientSecretRef string `gorm:"column:client_secret"` + } + + type ConnectorStatus struct { + db.Model + NamespaceID *string + Phase ConnectorStatusPhase + } + + type Connector struct { + db.Model + + NamespaceId *string + CloudProvider string + Region string + MultiAZ bool + + Name string + Owner string + OrganisationId string + Version int64 `gorm:"type:bigserial;index:"` + + ConnectorTypeId string + ConnectorSpec api.JSON `gorm:"type:jsonb"` + DesiredState ConnectorDesiredState + Channel string + Kafka KafkaConnectionSettings `gorm:"embedded;embeddedPrefix:kafka_"` + SchemaRegistry SchemaRegistryConnectionSettings `gorm:"embedded;embeddedPrefix:schema_registry_"` + ServiceAccount ServiceAccount `gorm:"embedded;embeddedPrefix:service_account_"` + + Status ConnectorStatus `gorm:"foreignKey:ID"` + } + + type ConnectorDeployment struct { + db.Model + Version int64 + ConnectorID string + Connector Connector + OperatorID string + ConnectorVersion int64 + ConnectorShardMetadataID int64 + ConnectorShardMetadata ConnectorShardMetadata + ClusterID string + NamespaceID string + AllowUpgrade bool + Status ConnectorDeploymentStatus `gorm:"foreignKey:ID;references:ID"` + } + + return db.CreateMigrationFromActions(migrationId, + db.ExecAction("ALTER TABLE connector_shard_metadata DROP CONSTRAINT connector_shard_metadata_pkey", "ALTER TABLE connector_shard_metadata ADD PRIMARY KEY (id,connector_type_id,channel)"), + db.ExecAction("ALTER TABLE connector_shard_metadata ADD PRIMARY KEY (id)", "ALTER TABLE connector_shard_metadata DROP CONSTRAINT connector_shard_metadata_pkey"), + db.DropTableColumnAction(&ConnectorShardMetadata202206130000{}, "latest_id"), + db.AddTableColumnAction(&ConnectorShardMetadata{}, "revision"), + db.AddTableColumnAction(&ConnectorShardMetadata{}, "latest_revision"), + db.ExecAction("CREATE INDEX idx_typeid_channel_revision ON connector_shard_metadata(connector_type_id, channel, revision)", "DROP INDEX idx_typeid_channel_revision"), + db.ExecAction("CREATE INDEX idx_typeid_channel ON connector_shard_metadata(connector_type_id, channel)", "DROP INDEX idx_typeid_channel"), + db.RenameTableColumnAction(&ConnectorDeployment{}, "connector_type_channel_id", "connector_shard_metadata_id"), + ) +} + +type ConnectorShardMetadata202206130000 struct { + ID int64 `gorm:"primaryKey:autoIncrement"` + ConnectorTypeId string `gorm:"primaryKey"` + Channel string `gorm:"primaryKey"` + ShardMetadata api.JSON `gorm:"type:jsonb"` + LatestId *int64 +} + +func (ConnectorShardMetadata202206130000) TableName() string { + return "connector_shard_metadata" +} diff --git a/internal/connector/internal/migrations/migrations.go b/internal/connector/internal/migrations/migrations.go index 14fdb471d..7eaffb10c 100644 --- a/internal/connector/internal/migrations/migrations.go +++ b/internal/connector/internal/migrations/migrations.go @@ -42,6 +42,7 @@ var migrations = []*gormigrate.Migration{ addConnectorTypeChecksum("202204050000"), removeConnectorsDeployedColumn("202204270000"), fixConnectorNamespaceVersionTrigger("202206060000"), + refactorChannelAndShardMetadata("202206130000"), } func New(dbConfig *db.DatabaseConfig) (*db.Migration, func(), error) { diff --git a/internal/connector/internal/presenters/connector_available_operator_upgrade.go b/internal/connector/internal/presenters/connector_available_operator_upgrade.go index 4b2ff5f68..96a72cca8 100644 --- a/internal/connector/internal/presenters/connector_available_operator_upgrade.go +++ b/internal/connector/internal/presenters/connector_available_operator_upgrade.go @@ -11,7 +11,7 @@ func PresentConnectorAvailableOperatorUpgrade(req *dbapi.ConnectorDeploymentOper ConnectorTypeId: req.ConnectorTypeId, NamespaceId: req.NamespaceID, Channel: req.Channel, - Operator: private.ConnectorAvailableOperatorUpgradeOperator{ + Operator: private.ConnectorUpgradeStatus{ AssignedId: req.Operator.Assigned.Id, AvailableId: req.Operator.Available.Id, }, diff --git a/internal/connector/internal/presenters/connector_available_type_upgrade.go b/internal/connector/internal/presenters/connector_available_type_upgrade.go deleted file mode 100644 index de8355a81..000000000 --- a/internal/connector/internal/presenters/connector_available_type_upgrade.go +++ /dev/null @@ -1,32 +0,0 @@ -package presenters - -import ( - "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/admin/private" - "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/dbapi" -) - -func PresentConnectorAvailableTypeUpgrade(req *dbapi.ConnectorDeploymentTypeUpgrade) *private.ConnectorAvailableTypeUpgrade { - return &private.ConnectorAvailableTypeUpgrade{ - ConnectorId: req.ConnectorID, - ConnectorTypeId: req.ConnectorTypeId, - NamespaceId: req.NamespaceID, - Channel: req.Channel, - ShardMetadata: private.ConnectorAvailableTypeUpgradeShardMetadata{ - AssignedId: req.ShardMetadata.AssignedId, - AvailableId: req.ShardMetadata.AvailableId, - }, - } -} - -func ConvertConnectorAvailableTypeUpgrade(req *private.ConnectorAvailableTypeUpgrade) *dbapi.ConnectorDeploymentTypeUpgrade { - return &dbapi.ConnectorDeploymentTypeUpgrade{ - ConnectorID: req.ConnectorId, - ConnectorTypeId: req.ConnectorTypeId, - NamespaceID: req.NamespaceId, - Channel: req.Channel, - ShardMetadata: &dbapi.ConnectorTypeUpgrade{ - AssignedId: req.ShardMetadata.AssignedId, - AvailableId: req.ShardMetadata.AvailableId, - }, - } -} diff --git a/internal/connector/internal/presenters/connector_deployment.go b/internal/connector/internal/presenters/connector_deployment.go index b035018a4..94b1eb26b 100644 --- a/internal/connector/internal/presenters/connector_deployment.go +++ b/internal/connector/internal/presenters/connector_deployment.go @@ -2,13 +2,15 @@ package presenters import ( "encoding/json" + admin "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/admin/private" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/dbapi" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/private" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/errors" ) -func PresentConnectorDeployment(from dbapi.ConnectorDeployment) (private.ConnectorDeployment, *errors.ServiceError) { +func PresentConnectorDeployment(from dbapi.ConnectorDeployment, resolvedSecrets bool) (private.ConnectorDeployment, *errors.ServiceError) { + // prepare conditions var conditions []private.MetaV1Condition if from.Status.Conditions != nil { err := from.Status.Conditions.Unmarshal(&conditions) @@ -17,6 +19,7 @@ func PresentConnectorDeployment(from dbapi.ConnectorDeployment) (private.Connect } } + // prepare operators var operators private.ConnectorDeploymentStatusOperators if from.Status.Operators != nil { err := from.Status.Operators.Unmarshal(&operators) @@ -25,7 +28,24 @@ func PresentConnectorDeployment(from dbapi.ConnectorDeployment) (private.Connect } } + // prepare shard metadata + if len(from.ConnectorShardMetadata.ShardMetadata) == 0 { + return private.ConnectorDeployment{}, errors.GeneralError("Unable to load connector type shard metadata: %+v with id %d, for connector deployment: %s", from.ConnectorShardMetadata, from.ConnectorShardMetadataID, from.ID) + } + shardMetadataJson, errShardMetadataConversion := from.ConnectorShardMetadata.ShardMetadata.Object() + if errShardMetadataConversion != nil { + return private.ConnectorDeployment{}, errors.GeneralError("Failed to convert shard metadata to json for: %+v, for connector deployment: %s", from.ConnectorShardMetadata, from.ID) + } + + // present connector + presentedConnector, err := PresentConnector(&from.Connector) + if err != nil { + return private.ConnectorDeployment{}, err + } + + // present reference reference := PresentReference(from.ID, from) + return private.ConnectorDeployment{ Id: reference.Id, Kind: reference.Kind, @@ -34,12 +54,29 @@ func PresentConnectorDeployment(from dbapi.ConnectorDeployment) (private.Connect CreatedAt: from.CreatedAt, UpdatedAt: from.UpdatedAt, ResourceVersion: from.Version, + ResolvedSecrets: resolvedSecrets, }, Spec: private.ConnectorDeploymentSpec{ - ConnectorId: from.ConnectorID, + ConnectorId: presentedConnector.Id, OperatorId: from.OperatorID, NamespaceId: from.NamespaceID, ConnectorResourceVersion: from.ConnectorVersion, + ShardMetadata: shardMetadataJson, + ConnectorSpec: presentedConnector.Connector, + DesiredState: private.ConnectorDesiredState(presentedConnector.DesiredState), + Kafka: private.KafkaConnectionSettings{ + Id: presentedConnector.Kafka.Id, + Url: presentedConnector.Kafka.Url, + }, + SchemaRegistry: private.SchemaRegistryConnectionSettings{ + Id: presentedConnector.SchemaRegistry.Id, + Url: presentedConnector.SchemaRegistry.Url, + }, + ServiceAccount: private.ServiceAccount{ + ClientId: presentedConnector.ServiceAccount.ClientId, + ClientSecret: presentedConnector.ServiceAccount.ClientSecret, + }, + ConnectorTypeId: presentedConnector.ConnectorTypeId, }, Status: private.ConnectorDeploymentStatus{ Phase: private.ConnectorState(from.Status.Phase), @@ -50,10 +87,17 @@ func PresentConnectorDeployment(from dbapi.ConnectorDeployment) (private.Connect }, nil } -func PresentConnectorDeploymentAdminView(from private.ConnectorDeployment, clusterId string) (admin.ConnectorDeploymentAdminView, *errors.ServiceError) { +func PresentConnectorDeploymentAdminView(from dbapi.ConnectorDeployment, clusterId string) (admin.ConnectorDeploymentAdminView, *errors.ServiceError) { + // present the deployment + fromPresentedConnectorDeployment, err := PresentConnectorDeployment(from, false) + if err != nil { + return admin.ConnectorDeploymentAdminView{}, err + } + + // build conditions var conditions []admin.MetaV1Condition - if len(from.Status.Conditions) > 0 { - for _, condition := range from.Status.Conditions { + if len(fromPresentedConnectorDeployment.Status.Conditions) > 0 { + for _, condition := range fromPresentedConnectorDeployment.Status.Conditions { conditions = append(conditions, admin.MetaV1Condition{ Type: condition.Type, Reason: condition.Reason, @@ -63,43 +107,60 @@ func PresentConnectorDeploymentAdminView(from private.ConnectorDeployment, clust } } + // build shard metadata status + deploymentAdminStatusShardMetadata := admin.ConnectorDeploymentAdminStatusShardMetadata{ + Assigned: admin.ConnectorShardMetadata{ + Channel: from.ConnectorShardMetadata.Channel, + ConnectorTypeId: from.ConnectorShardMetadata.ConnectorTypeId, + Revision: from.ConnectorShardMetadata.Revision, + }, + } + if from.ConnectorShardMetadata.LatestRevision != nil { + deploymentAdminStatusShardMetadata.Available = admin.ConnectorShardMetadata{ + Channel: from.ConnectorShardMetadata.Channel, + ConnectorTypeId: from.ConnectorShardMetadata.ConnectorTypeId, + Revision: *from.ConnectorShardMetadata.LatestRevision, + } + } + view := admin.ConnectorDeploymentAdminView{ - Id: from.Id, + Id: fromPresentedConnectorDeployment.Id, Metadata: admin.ConnectorDeploymentAdminViewAllOfMetadata{ - CreatedAt: from.Metadata.CreatedAt, - UpdatedAt: from.Metadata.UpdatedAt, - ResourceVersion: from.Metadata.ResourceVersion, - ResolvedSecrets: from.Metadata.ResolvedSecrets, + CreatedAt: fromPresentedConnectorDeployment.Metadata.CreatedAt, + UpdatedAt: fromPresentedConnectorDeployment.Metadata.UpdatedAt, + ResourceVersion: fromPresentedConnectorDeployment.Metadata.ResourceVersion, + ResolvedSecrets: fromPresentedConnectorDeployment.Metadata.ResolvedSecrets, }, Spec: admin.ConnectorDeploymentAdminSpec{ - ConnectorId: from.Spec.ConnectorId, - ConnectorResourceVersion: from.Spec.ConnectorResourceVersion, - ConnectorTypeId: from.Spec.ConnectorTypeId, + ConnectorId: fromPresentedConnectorDeployment.Spec.ConnectorId, + ConnectorResourceVersion: fromPresentedConnectorDeployment.Spec.ConnectorResourceVersion, + ConnectorTypeId: fromPresentedConnectorDeployment.Spec.ConnectorTypeId, ClusterId: clusterId, - NamespaceId: from.Spec.NamespaceId, - OperatorId: from.Spec.OperatorId, - DesiredState: admin.ConnectorDesiredState(from.Spec.DesiredState), - ShardMetadata: from.Spec.ShardMetadata, + NamespaceId: fromPresentedConnectorDeployment.Spec.NamespaceId, + OperatorId: fromPresentedConnectorDeployment.Spec.OperatorId, + DesiredState: admin.ConnectorDesiredState(fromPresentedConnectorDeployment.Spec.DesiredState), + ShardMetadata: fromPresentedConnectorDeployment.Spec.ShardMetadata, }, - Status: admin.ConnectorDeploymentStatus{ - Phase: admin.ConnectorState(from.Status.Phase), - ResourceVersion: from.Status.ResourceVersion, - Operators: admin.ConnectorDeploymentStatusOperators{ + Status: admin.ConnectorDeploymentAdminStatus{ + Phase: admin.ConnectorState(fromPresentedConnectorDeployment.Status.Phase), + ResourceVersion: fromPresentedConnectorDeployment.Status.ResourceVersion, + Operators: admin.ConnectorDeploymentAdminStatusOperators{ Assigned: admin.ConnectorOperator{ - Id: from.Status.Operators.Assigned.Id, - Type: from.Status.Operators.Assigned.Type, - Version: from.Status.Operators.Assigned.Version, + Id: fromPresentedConnectorDeployment.Status.Operators.Assigned.Id, + Type: fromPresentedConnectorDeployment.Status.Operators.Assigned.Type, + Version: fromPresentedConnectorDeployment.Status.Operators.Assigned.Version, }, Available: admin.ConnectorOperator{ - Id: from.Status.Operators.Available.Id, - Type: from.Status.Operators.Available.Type, - Version: from.Status.Operators.Available.Version, + Id: fromPresentedConnectorDeployment.Status.Operators.Available.Id, + Type: fromPresentedConnectorDeployment.Status.Operators.Available.Type, + Version: fromPresentedConnectorDeployment.Status.Operators.Available.Version, }, }, - Conditions: conditions, + Conditions: conditions, + ShardMetadata: deploymentAdminStatusShardMetadata, }, } diff --git a/internal/connector/internal/presenters/object_reference.go b/internal/connector/internal/presenters/object_reference.go index 4b6671812..00e54633b 100644 --- a/internal/connector/internal/presenters/object_reference.go +++ b/internal/connector/internal/presenters/object_reference.go @@ -2,6 +2,7 @@ package presenters import ( "fmt" + admin "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/admin/private" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/dbapi" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/compat" diff --git a/internal/connector/internal/routes/route_loader.go b/internal/connector/internal/routes/route_loader.go index dfc3d61de..7dc1777f9 100644 --- a/internal/connector/internal/routes/route_loader.go +++ b/internal/connector/internal/routes/route_loader.go @@ -1,9 +1,10 @@ package routes import ( - "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services/sso" "net/http" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services/sso" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/config" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/generated" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/handlers" @@ -153,6 +154,7 @@ func (s *options) AddRoutes(mainRouter *mux.Router) error { http.MethodDelete: {auth.ConnectorFleetManagerAdminWriteRole, auth.ConnectorFleetManagerAdminFullRole}, http.MethodGet: {auth.ConnectorFleetManagerAdminReadRole, auth.ConnectorFleetManagerAdminWriteRole, auth.ConnectorFleetManagerAdminFullRole}, http.MethodPost: {auth.ConnectorFleetManagerAdminWriteRole, auth.ConnectorFleetManagerAdminFullRole}, + http.MethodPatch: {auth.ConnectorFleetManagerAdminWriteRole, auth.ConnectorFleetManagerAdminFullRole}, http.MethodPut: {auth.ConnectorFleetManagerAdminWriteRole, auth.ConnectorFleetManagerAdminFullRole}, } adminRouter.Use(auth.NewRequireIssuerMiddleware().RequireIssuer([]string{s.KeycloakService.GetConfig().OSDClusterIDPRealm.ValidIssuerURI}, kerrors.ErrorNotFound)) @@ -164,8 +166,7 @@ func (s *options) AddRoutes(mainRouter *mux.Router) error { adminRouter.HandleFunc("/kafka_connector_clusters/{connector_cluster_id}/connectors", s.ConnectorAdminHandler.GetClusterConnectors).Methods(http.MethodGet) adminRouter.HandleFunc("/kafka_connector_clusters/{connector_cluster_id}/deployments", s.ConnectorAdminHandler.GetClusterDeployments).Methods(http.MethodGet) adminRouter.HandleFunc("/kafka_connector_clusters/{connector_cluster_id}/deployments/{deployment_id}", s.ConnectorAdminHandler.GetConnectorDeployment).Methods(http.MethodGet) - adminRouter.HandleFunc("/kafka_connector_clusters/{connector_cluster_id}/upgrades/type", s.ConnectorAdminHandler.GetConnectorUpgradesByType).Methods(http.MethodGet) - adminRouter.HandleFunc("/kafka_connector_clusters/{connector_cluster_id}/upgrades/type", s.ConnectorAdminHandler.UpgradeConnectorsByType).Methods(http.MethodPut) + adminRouter.HandleFunc("/kafka_connector_clusters/{connector_cluster_id}/deployments/{deployment_id}", s.ConnectorAdminHandler.PatchConnectorDeployment).Methods(http.MethodPatch) adminRouter.HandleFunc("/kafka_connector_clusters/{connector_cluster_id}/upgrades/operator", s.ConnectorAdminHandler.GetConnectorUpgradesByOperator).Methods(http.MethodGet) adminRouter.HandleFunc("/kafka_connector_clusters/{connector_cluster_id}/upgrades/operator", s.ConnectorAdminHandler.UpgradeConnectorsByOperator).Methods(http.MethodPut) adminRouter.HandleFunc("/kafka_connector_namespaces", s.ConnectorAdminHandler.GetConnectorNamespaces).Methods(http.MethodGet) @@ -174,6 +175,9 @@ func (s *options) AddRoutes(mainRouter *mux.Router) error { adminRouter.HandleFunc("/kafka_connector_namespaces/{namespace_id}", s.ConnectorAdminHandler.DeleteConnectorNamespace).Methods(http.MethodDelete) adminRouter.HandleFunc("/kafka_connector_namespaces/{namespace_id}/connectors", s.ConnectorAdminHandler.GetNamespaceConnectors).Methods(http.MethodGet) adminRouter.HandleFunc("/kafka_connector_namespaces/{namespace_id}/deployments", s.ConnectorAdminHandler.GetNamespaceDeployments).Methods(http.MethodGet) + //TODO: add, to consistency with the {connector_cluster_id}/ counterparts + //adminRouter.HandleFunc("/kafka_connector_namespaces/{namespace_id}/deployments/{deployment_id}", s.ConnectorAdminHandler.GetConnectorDeployment).Methods(http.MethodGet) + //adminRouter.HandleFunc("/kafka_connector_namespaces/{namespace_id}/deployments/{deployment_id}", s.ConnectorAdminHandler.PatchConnectorDeployment).Methods(http.MethodGet) adminRouter.HandleFunc("/kafka_connectors/{connector_id}", s.ConnectorAdminHandler.GetConnector).Methods(http.MethodGet) adminRouter.HandleFunc("/kafka_connectors/{connector_id}", s.ConnectorAdminHandler.DeleteConnector).Methods(http.MethodDelete) adminRouter.HandleFunc("/kafka_connector_types", s.ConnectorAdminHandler.ListConnectorTypes).Methods(http.MethodGet) diff --git a/internal/connector/internal/services/connector_cluster.go b/internal/connector/internal/services/connector_cluster.go index 77efd6ae6..f78a6c18d 100644 --- a/internal/connector/internal/services/connector_cluster.go +++ b/internal/connector/internal/services/connector_cluster.go @@ -2,27 +2,24 @@ package services import ( "context" - "encoding/base64" "encoding/json" "fmt" "reflect" - - "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/services/phase" - coreServices "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services/queryparser" - "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services/sso" - "github.com/golang/glog" + "strconv" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/dbapi" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/private" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/services/phase" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/services/vault" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/api" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/auth" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/errors" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services" + coreServices "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services/queryparser" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services/signalbus" - "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared/secrets" - "github.com/spyzhov/ajson" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services/sso" + "github.com/golang/glog" "gorm.io/gorm" ) @@ -36,14 +33,12 @@ type ConnectorClusterService interface { GetConnectorClusterStatus(ctx context.Context, id string) (dbapi.ConnectorClusterStatus, *errors.ServiceError) SaveDeployment(ctx context.Context, resource *dbapi.ConnectorDeployment) *errors.ServiceError - GetConnectorWithBase64Secrets(ctx context.Context, resource dbapi.ConnectorDeployment) (dbapi.Connector, bool, *errors.ServiceError) - ListConnectorDeployments(ctx context.Context, id string, listArgs *services.ListArguments, gtVersion int64) (dbapi.ConnectorDeploymentList, *api.PagingMeta, *errors.ServiceError) + UpdateDeployment(resource *dbapi.ConnectorDeployment) *errors.ServiceError + ListConnectorDeployments(ctx context.Context, clusterId string, filterChannelUpdates string, includeDanglingDeploymentsOnly string, listArgs *services.ListArguments, gtVersion int64) (dbapi.ConnectorDeploymentList, *api.PagingMeta, *errors.ServiceError) UpdateConnectorDeploymentStatus(ctx context.Context, status dbapi.ConnectorDeploymentStatus) *errors.ServiceError FindAvailableNamespace(owner string, orgId string, namespaceId *string) (*dbapi.ConnectorNamespace, *errors.ServiceError) GetDeploymentByConnectorId(ctx context.Context, connectorID string) (dbapi.ConnectorDeployment, *errors.ServiceError) GetDeployment(ctx context.Context, id string) (dbapi.ConnectorDeployment, *errors.ServiceError) - GetAvailableDeploymentTypeUpgrades(listArgs *services.ListArguments) (dbapi.ConnectorDeploymentTypeUpgradeList, *api.PagingMeta, *errors.ServiceError) - UpgradeConnectorsByType(ctx context.Context, clusterId string, upgrades dbapi.ConnectorDeploymentTypeUpgradeList) *errors.ServiceError GetAvailableDeploymentOperatorUpgrades(listArgs *services.ListArguments) (dbapi.ConnectorDeploymentOperatorUpgradeList, *api.PagingMeta, *errors.ServiceError) UpgradeConnectorsByOperator(ctx context.Context, clusterId string, upgrades dbapi.ConnectorDeploymentOperatorUpgradeList) *errors.ServiceError CleanupDeployments() *errors.ServiceError @@ -367,30 +362,49 @@ func (k *connectorClusterService) SaveDeployment(ctx context.Context, resource * return nil } +func (k *connectorClusterService) UpdateDeployment(resource *dbapi.ConnectorDeployment) *errors.ServiceError { + dbConn := k.connectionFactory.New() + updates := dbConn.Where("id = ?", resource.ID). + Updates(resource) + if err := updates.Error; err != nil { + return services.HandleUpdateError(`Connector namespace`, err) + } + return nil +} + func GetValidDeploymentColumns() []string { - return []string{"connector_id", "connector_version", "connector_type_channel_id", "cluster_id", "operator_id", "namespace_id"} + return []string{"connector_id", "connector_version", "cluster_id", "operator_id", "namespace_id"} } // ListConnectorDeployments returns all deployments assigned to the cluster -func (k *connectorClusterService) ListConnectorDeployments(ctx context.Context, id string, listArgs *services.ListArguments, gtVersion int64) (dbapi.ConnectorDeploymentList, *api.PagingMeta, *errors.ServiceError) { +func (k *connectorClusterService) ListConnectorDeployments(ctx context.Context, clusterId string, filterChannelUpdates string, includeDanglingDeploymentsOnly string, listArgs *services.ListArguments, gtVersion int64) (dbapi.ConnectorDeploymentList, *api.PagingMeta, *errors.ServiceError) { var resourceList dbapi.ConnectorDeploymentList dbConn := k.connectionFactory.New() - dbConn = dbConn.Preload("Status") + dbConn = dbConn.Joins("Status").Joins("ConnectorShardMetadata").Joins("Connector") + pagingMeta := &api.PagingMeta{ Page: listArgs.Page, Size: listArgs.Size, } - if id != "" { - dbConn = dbConn.Where("connector_deployments.cluster_id = ?", id) + if clusterId != "" { + dbConn = dbConn.Where("connector_deployments.cluster_id = ?", clusterId) } if gtVersion != 0 { dbConn = dbConn.Where("connector_deployments.version > ?", gtVersion) } + if boolFilterChannelUpdates, _ := strconv.ParseBool(filterChannelUpdates); boolFilterChannelUpdates { + dbConn = dbConn.Where("\"ConnectorShardMetadata\".\"latest_revision\" IS NOT NULL") + } + if boolIncludeDanglingDeploymentsOnly, _ := strconv.ParseBool(includeDanglingDeploymentsOnly); boolIncludeDanglingDeploymentsOnly { + dbConn = dbConn.Where("\"Connector\".\"deleted_at\" IS NOT NULL") + } else { + dbConn = dbConn.Where("\"Connector\".\"deleted_at\" IS NULL") + } // Apply search query if len(listArgs.Search) > 0 { - queryParser := coreServices.NewQueryParser(GetValidDeploymentColumns()...) + queryParser := coreServices.NewQueryParserWithColumnPrefix("connector_deployments", GetValidDeploymentColumns()...) searchDbQuery, err := queryParser.Parse(listArgs.Search) if err != nil { return resourceList, pagingMeta, errors.NewWithCause(errors.ErrorFailedToParseSearch, err, "Unable to list connector deployments requests: %s", err.Error()) @@ -400,7 +414,7 @@ func (k *connectorClusterService) ListConnectorDeployments(ctx context.Context, // set total, limit and paging (based on https://gitlab.cee.redhat.com/service/api-guidelines#user-content-paging) total := int64(pagingMeta.Total) - dbConn.Model(&resourceList).Count(&total) + dbConn.Session(&gorm.Session{}).Model(&resourceList).Count(&total) pagingMeta.Total = int(total) if pagingMeta.Size > pagingMeta.Total { @@ -413,7 +427,8 @@ func (k *connectorClusterService) ListConnectorDeployments(ctx context.Context, // execute query if err := dbConn.Find(&resourceList).Error; err != nil { - return resourceList, pagingMeta, services.HandleGetError("Connector deployment", `cluster_id`, id, err) + return resourceList, pagingMeta, services.HandleGetError("Connector deployment", + fmt.Sprintf("filterChannelUpdates='%s' listArgs='%+v' cluster_id", filterChannelUpdates, listArgs), clusterId, err) } return resourceList, pagingMeta, nil @@ -490,82 +505,9 @@ func (k *connectorClusterService) FindAvailableNamespace(owner string, orgID str return nil, nil } -func (k *connectorClusterService) GetConnectorWithBase64Secrets(ctx context.Context, resource dbapi.ConnectorDeployment) (dbapi.Connector, bool, *errors.ServiceError) { - - dbConn := k.connectionFactory.New() - - var connector dbapi.Connector - err := dbConn.Where("id = ?", resource.ConnectorID).First(&connector).Error - if err != nil { - return connector, false, services.HandleGetError("Connector", "id", resource.ConnectorID, err) - } - - serr := getSecretsFromVaultAsBase64(&connector, k.connectorTypesService, k.vaultService) - if serr != nil { - return connector, serr.Code == errors.ErrorGeneral, serr - } - - return connector, false, nil -} - -func getSecretsFromVaultAsBase64(resource *dbapi.Connector, cts ConnectorTypesService, vault vault.VaultService) *errors.ServiceError { - ct, err := cts.Get(resource.ConnectorTypeId) - if err != nil { - return errors.BadRequest("invalid connector type id: %s", resource.ConnectorTypeId) - } - // move secrets to a vault. - - if resource.ServiceAccount.ClientSecretRef != "" { - v, err := vault.GetSecretString(resource.ServiceAccount.ClientSecretRef) - if err != nil { - return errors.GeneralError("could not get kafka client secrets from the vault: %v", err.Error()) - } - encoded := base64.StdEncoding.EncodeToString([]byte(v)) - resource.ServiceAccount.ClientSecret = encoded - } - - if len(resource.ConnectorSpec) != 0 { - updated, err := secrets.ModifySecrets(ct.JsonSchema, resource.ConnectorSpec, func(node *ajson.Node) error { - if node.Type() == ajson.Object { - ref, err := node.GetKey("ref") - if err != nil { - return err - } - r, err := ref.GetString() - if err != nil { - return err - } - v, err := vault.GetSecretString(r) - if err != nil { - return err - } - - encoded := base64.StdEncoding.EncodeToString([]byte(v)) - err = node.SetObject(map[string]*ajson.Node{ - "kind": ajson.StringNode("", "base64"), - "value": ajson.StringNode("", encoded), - }) - if err != nil { - return err - } - } else if node.Type() == ajson.Null { - // don't change.. - } else { - return fmt.Errorf("secret field must be set to an object: " + node.Path()) - } - return nil - }) - if err != nil { - return errors.GeneralError("could not get connectors secrets from the vault: %v", err.Error()) - } - resource.ConnectorSpec = updated - } - return nil -} - func (k *connectorClusterService) GetDeploymentByConnectorId(ctx context.Context, connectorID string) (resource dbapi.ConnectorDeployment, serr *errors.ServiceError) { - dbConn := k.connectionFactory.New().Where("connector_id = ?", connectorID) + dbConn := k.connectionFactory.New().Joins("Status").Joins("ConnectorShardMetadata").Joins("Connector").Where("connector_id = ?", connectorID) if err := dbConn.First(&resource).Error; err != nil { return resource, services.HandleGetError("Connector deployment", "connector_id", connectorID, err) } @@ -575,7 +517,7 @@ func (k *connectorClusterService) GetDeploymentByConnectorId(ctx context.Context func (k *connectorClusterService) GetDeployment(ctx context.Context, id string) (resource dbapi.ConnectorDeployment, serr *errors.ServiceError) { dbConn := k.connectionFactory.New() - if err := dbConn.Unscoped().Where("connector_deployments.id = ?", id).First(&resource).Error; err != nil { + if err := dbConn.Unscoped().Joins("Status").Joins("ConnectorShardMetadata").Joins("Connector").Where("connector_deployments.id = ?", id).First(&resource).Error; err != nil { return resource, services.HandleGetError("Connector deployment", "id", id, err) } @@ -586,128 +528,6 @@ func (k *connectorClusterService) GetDeployment(ctx context.Context, id string) return } -func (k *connectorClusterService) GetAvailableDeploymentTypeUpgrades(listArgs *services.ListArguments) (upgrades dbapi.ConnectorDeploymentTypeUpgradeList, paging *api.PagingMeta, serr *errors.ServiceError) { - - type Result struct { - ConnectorID string - DeploymentID string - ConnectorTypeUpgradeFrom int64 - ConnectorTypeUpgradeTo int64 - ConnectorTypeID string - NamespaceID string - Channel string - } - - results := []Result{} - dbConn := k.connectionFactory.New() - dbConn = dbConn.Table("connector_deployments") - dbConn = dbConn.Select( - "connector_deployments.connector_id AS connector_id", - "connector_deployments.id AS deployment_id", - "connector_deployments.namespace_id AS namespace_id", - "connector_shard_metadata.id AS connector_type_upgrade_from", - "connector_shard_metadata.latest_id AS connector_type_upgrade_to", - "connector_shard_metadata.connector_type_id", - "connector_shard_metadata.channel", - ) - dbConn = dbConn.Joins("LEFT JOIN connector_shard_metadata ON connector_shard_metadata.id = connector_deployments.connector_type_channel_id") - dbConn = dbConn.Joins("LEFT JOIN connector_deployment_statuses ON connector_deployment_statuses.id = connector_deployments.id") - dbConn = dbConn.Where("connector_shard_metadata.latest_id IS NOT NULL") - dbConn = dbConn.Or("connector_deployment_statuses.upgrade_available") - - if err := dbConn.Scan(&results).Error; err != nil { - return upgrades, paging, services.HandleGetError(`Connector deployment status`, `latest_id is not null or upgrade_available`, true, err) - } - - // TODO support paging - paging = &api.PagingMeta{ - Page: 0, - Size: len(results), - Total: len(results), - } - - upgrades = make(dbapi.ConnectorDeploymentTypeUpgradeList, len(results)) - for i, r := range results { - upgrades[i] = dbapi.ConnectorDeploymentTypeUpgrade{ - ConnectorID: r.ConnectorID, - DeploymentID: r.DeploymentID, - ConnectorTypeId: r.ConnectorTypeID, - NamespaceID: r.NamespaceID, - Channel: r.Channel, - } - - upgrades[i].ShardMetadata = &dbapi.ConnectorTypeUpgrade{ - AssignedId: r.ConnectorTypeUpgradeFrom, - AvailableId: r.ConnectorTypeUpgradeTo, - } - } - - return -} - -func (k *connectorClusterService) UpgradeConnectorsByType(ctx context.Context, clusterId string, upgrades dbapi.ConnectorDeploymentTypeUpgradeList) *errors.ServiceError { - - // get deployment ids from available upgrades - available, _, serr := k.GetAvailableDeploymentTypeUpgrades(&services.ListArguments{}) - if serr != nil { - return serr - } - - availableConnectors := toTypeMap(available) - reqConnectors := toTypeMap(upgrades) - - // validate reqConnectors - var errorList errors.ErrorList - for cid, upgrade := range reqConnectors { - availableUpgrade, ok := availableConnectors[cid] - if !ok { - errorList = append(errorList, errors.Conflict("Type upgrade not available for connector %v", cid)) - } - // make sure other bits match - upgrade.DeploymentID = availableUpgrade.DeploymentID - if !reflect.DeepEqual(upgrade, availableUpgrade) { - errorList = append(errorList, errors.Conflict("Type upgrade is outdated for connector %v", cid)) - } - } - if len(errorList) != 0 { - return errors.Conflict(errorList.Error()) - } - - // upgrade connector type channels - notificationAdded := false - dbConn := k.connectionFactory.New() - for cid, upgrade := range availableConnectors { - - // update connector channel id - if err := dbConn.Model(&dbapi.ConnectorDeployment{}). - Where("id = ?", upgrade.DeploymentID). - Update("ConnectorTypeChannelId", upgrade.ShardMetadata.AvailableId).Error; err != nil { - errorList = append(errorList, - services.HandleUpdateError(`Connector deployment id=`+cid, err)) - } else { - if !notificationAdded { - _ = db.AddPostCommitAction(ctx, func() { - k.bus.Notify(fmt.Sprintf("/kafka_connector_clusters/%s/deployments", clusterId)) - }) - notificationAdded = true - } - } - } - - if len(errorList) != 0 { - return services.HandleUpdateError(`Connector deployment`, errorList) - } - return nil -} - -func toTypeMap(arr []dbapi.ConnectorDeploymentTypeUpgrade) map[string]dbapi.ConnectorDeploymentTypeUpgrade { - m := make(map[string]dbapi.ConnectorDeploymentTypeUpgrade) - for _, upgrade := range arr { - m[upgrade.ConnectorID] = upgrade - } - return m -} - func (k *connectorClusterService) GetAvailableDeploymentOperatorUpgrades(listArgs *services.ListArguments) (upgrades dbapi.ConnectorDeploymentOperatorUpgradeList, paging *api.PagingMeta, serr *errors.ServiceError) { type Result struct { @@ -730,7 +550,7 @@ func (k *connectorClusterService) GetAvailableDeploymentOperatorUpgrades(listArg "connector_shard_metadata.channel", "connector_deployment_statuses.operators AS connector_operators", ) - dbConn = dbConn.Joins("LEFT JOIN connector_shard_metadata ON connector_shard_metadata.id = connector_deployments.connector_type_channel_id") + dbConn = dbConn.Joins("LEFT JOIN connector_shard_metadata ON connector_shard_metadata.id = connector_deployments.connector_shard_metadata_id") dbConn = dbConn.Joins("LEFT JOIN connector_deployment_statuses ON connector_deployment_statuses.id = connector_deployments.id") dbConn = dbConn.Where("connector_deployment_statuses.upgrade_available") @@ -740,7 +560,7 @@ func (k *connectorClusterService) GetAvailableDeploymentOperatorUpgrades(listArg // TODO support paging paging = &api.PagingMeta{ - Page: 0, + Page: 1, Size: len(results), Total: len(results), } diff --git a/internal/connector/internal/services/connector_deployment.go b/internal/connector/internal/services/connector_deployment.go index fa4d39ae5..54129328e 100644 --- a/internal/connector/internal/services/connector_deployment.go +++ b/internal/connector/internal/services/connector_deployment.go @@ -7,6 +7,7 @@ import ( "gorm.io/gorm" ) +//TODO: convert this as deployment service and move here relevant methods from services/connector_cluster.go func deleteConnectorDeployment(dbConn *gorm.DB, id string) *errors.ServiceError { // no err, deployment existed.. if err := dbConn.Where("id = ?", id).Delete(&dbapi.ConnectorDeployment{}).Error; err != nil { diff --git a/internal/connector/internal/services/connector_types.go b/internal/connector/internal/services/connector_types.go index 04c52e9ad..fabe27d02 100644 --- a/internal/connector/internal/services/connector_types.go +++ b/internal/connector/internal/services/connector_types.go @@ -7,9 +7,11 @@ package services // pane cluster. import ( - "gorm.io/gorm" + "database/sql" "strings" + "gorm.io/gorm" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/dbapi" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/config" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/presenters" @@ -28,9 +30,8 @@ type ConnectorTypesService interface { ForEachConnectorCatalogEntry(f func(id string, channel string, ccc *config.ConnectorChannelConfig) *errors.ServiceError) *errors.ServiceError PutConnectorShardMetadata(ctc *dbapi.ConnectorShardMetadata) (int64, *errors.ServiceError) - GetConnectorShardMetadata(id int64) (*dbapi.ConnectorShardMetadata, *errors.ServiceError) - GetLatestConnectorShardMetadataID(tid, channel string) (int64, *errors.ServiceError) - GetLatestConnectorShardMetadata(tid, channel string) (*dbapi.ConnectorShardMetadata, *errors.ServiceError) + GetConnectorShardMetadata(typeId, channel string, revision int64) (*dbapi.ConnectorShardMetadata, *errors.ServiceError) + GetLatestConnectorShardMetadata(typeId, channel string) (*dbapi.ConnectorShardMetadata, *errors.ServiceError) CatalogEntriesReconciled() (bool, *errors.ServiceError) DeleteUnusedAndNotInCatalog() *errors.ServiceError ListCatalogEntries(*coreService.ListArguments) ([]dbapi.ConnectorCatalogEntry, *api.PagingMeta, *errors.ServiceError) @@ -221,110 +222,102 @@ func (cts *connectorTypesService) ForEachConnectorCatalogEntry(f func(id string, return nil } -func (cts *connectorTypesService) PutConnectorShardMetadata(ctc *dbapi.ConnectorShardMetadata) (int64, *errors.ServiceError) { +func (cts *connectorTypesService) PutConnectorShardMetadata(connectorShardMetadata *dbapi.ConnectorShardMetadata) (int64, *errors.ServiceError) { var resource dbapi.ConnectorShardMetadata dbConn := cts.connectionFactory.New() - dbConn = dbConn.Select("id") - dbConn = dbConn.Where("connector_type_id = ?", ctc.ConnectorTypeId) - dbConn = dbConn.Where("channel = ?", ctc.Channel) - dbConn = dbConn.Where("shard_metadata = ?", ctc.ShardMetadata) + dbConn = dbConn.Select("id"). + Where("connector_type_id = ?", connectorShardMetadata.ConnectorTypeId). + Where("channel = ?", connectorShardMetadata.Channel). + Where("revision = ?", connectorShardMetadata.Revision) if err := dbConn.First(&resource).Error; err != nil { if services.IsRecordNotFoundError(err) { - - // We need to create the resource.... - dbConn = cts.connectionFactory.New() - if err := dbConn.Save(ctc).Error; err != nil { - return 0, errors.GeneralError("failed to create connector type channel %q: %v", ctc.Channel, err) + // We need to understand if we are inserting the shard metadata with the latest revision + // among the same connector_type_id and channel group + var currentLatestRevision sql.NullInt64 + dbConn = cts.connectionFactory.New(). + Table("connector_shard_metadata"). + Select("max(revision)"). + Where("connector_type_id = ?", connectorShardMetadata.ConnectorTypeId). + Where("channel = ?", connectorShardMetadata.Channel) + if err := dbConn.Scan(¤tLatestRevision).Error; err != nil { + return 0, errors.GeneralError("failed to find max(revision) of connector shard metadata %v: %v", connectorShardMetadata, err) + } + // And updating LatestRevision field accordingly + if currentLatestRevision.Valid && connectorShardMetadata.Revision < currentLatestRevision.Int64 { + // The shard metadata we are saving has not the latest revision, + // so we set its LatestRevision field to currentLatestRevision + connectorShardMetadata.LatestRevision = ¤tLatestRevision.Int64 + } else { + // The shard metadata we are saving has the latest revision, + // so we set its LatestRevision field to nil + connectorShardMetadata.LatestRevision = nil } - // read it back again to get it's version. + // We need to create the resource... dbConn = cts.connectionFactory.New() - dbConn = dbConn.Select("id") - dbConn = dbConn.Where("connector_type_id = ?", ctc.ConnectorTypeId) - dbConn = dbConn.Where("channel = ?", ctc.Channel) - dbConn = dbConn.Where("shard_metadata = ?", ctc.ShardMetadata) - if err := dbConn.First(&resource).Error; err != nil { - return 0, errors.NewWithCause(errors.ErrorGeneral, err, "Unable to find connector type channel after insert") + if err := dbConn.Save(connectorShardMetadata).Error; err != nil { + return 0, errors.GeneralError("failed to create connector shard metadata %v: %v", connectorShardMetadata, err) } - // update the other records to know the latest_id - dbConn = cts.connectionFactory.New() - dbConn = dbConn.Table("connector_shard_metadata") - dbConn = dbConn.Where("id <> ?", resource.ID) - dbConn = dbConn.Where("connector_type_id = ?", ctc.ConnectorTypeId) - dbConn = dbConn.Where("channel = ?", ctc.Channel) - if err := dbConn.Update(`latest_id`, resource.ID).Error; err != nil { - return 0, errors.GeneralError("failed to create connector type channel: %v", err) + // If we are inserting the latest revision we need to update other shard metadata record + // among the same connector_type_id and channel group + if connectorShardMetadata.LatestRevision == nil { + // update the other records latest_revision + dbConn = cts.connectionFactory.New(). + Table("connector_shard_metadata"). + Where("connector_type_id = ?", connectorShardMetadata.ConnectorTypeId). + Where("channel = ?", connectorShardMetadata.Channel). + Where("revision < ?", connectorShardMetadata.Revision) + if err := dbConn.Update("latest_revision", connectorShardMetadata.Revision).Error; err != nil { + return 0, errors.GeneralError("failed to update other connectors shard metadata with the latest revision from: %v", connectorShardMetadata) + } } return resource.ID, nil - } else { - return 0, errors.NewWithCause(errors.ErrorGeneral, err, "Unable to find connector type channel") + return 0, errors.NewWithCause(errors.ErrorGeneral, err, "Unable to save connector shard metadata") } } else { - // resource existed... update the ctc with the version it's been assigned in the DB... + // resource existed... update the connectorShardMetadata with the version it's been assigned in the DB... return resource.ID, nil } } -func (cts *connectorTypesService) GetConnectorShardMetadata(id int64) (*dbapi.ConnectorShardMetadata, *errors.ServiceError) { +func (cts *connectorTypesService) GetConnectorShardMetadata(typeId, channel string, revision int64) (*dbapi.ConnectorShardMetadata, *errors.ServiceError) { resource := &dbapi.ConnectorShardMetadata{} dbConn := cts.connectionFactory.New() err := dbConn. - Where("id", id). + Where(dbapi.ConnectorShardMetadata{ConnectorTypeId: typeId, Channel: channel, Revision: revision}). First(&resource).Error if err != nil { if services.IsRecordNotFoundError(err) { - return nil, errors.NotFound("connector type channel not found") + return nil, errors.NotFound("Connector type shard metadata (ConnectorTypeId: %s, Channel: %s, Revision: %v) not found.", typeId, channel, revision) } - return nil, errors.GeneralError("Unable to get connector type channel: %s", err) + return nil, errors.GeneralError("Unable to get connector type shard metadata (ConnectorTypeId: %s, Channel: %s, Revision: %v): %s", typeId, channel, revision, err) } return resource, nil } -func (cts *connectorTypesService) GetLatestConnectorShardMetadataID(tid, channel string) (int64, *errors.ServiceError) { - resource := &dbapi.ConnectorShardMetadata{} - dbConn := cts.connectionFactory.New() - - err := dbConn. - Select("id"). - Where("connector_type_id", tid). - Where("channel", channel). - Order("id desc"). - First(&resource).Error - - if err != nil { - if services.IsRecordNotFoundError(err) { - return 0, errors.NotFound("connector type channel not found") - } - return 0, errors.GeneralError("Unable to get connector type channel: %s", err) - } - return resource.ID, nil -} - -func (cts *connectorTypesService) GetLatestConnectorShardMetadata(tid, channel string) (*dbapi.ConnectorShardMetadata, *errors.ServiceError) { +func (cts *connectorTypesService) GetLatestConnectorShardMetadata(typeId, channel string) (*dbapi.ConnectorShardMetadata, *errors.ServiceError) { resource := &dbapi.ConnectorShardMetadata{} dbConn := cts.connectionFactory.New() err := dbConn. - Where("connector_type_id = ?", tid). - Where("channel = ?", channel). - Order("id desc"). + Where(dbapi.ConnectorShardMetadata{ConnectorTypeId: typeId, Channel: channel}). + Order("revision desc"). First(&resource).Error if err != nil { if services.IsRecordNotFoundError(err) { - return nil, errors.NotFound("connector type channel not found") + return nil, errors.NotFound("connector type shard metadata not found") } - return nil, errors.GeneralError("Unable to get connector type channel: %s", err) + return nil, errors.GeneralError("Unable to get connector type shard metadata: %s", err) } - return resource, nil } @@ -342,13 +335,14 @@ func (cts *connectorTypesService) CatalogEntriesReconciled() (bool, *errors.Serv return false, services.HandleGetError("Connector type", "id", typeIds, err) } - done := len(catalogChecksums) == len(connectorTypes) - if done { + done := false + if len(catalogChecksums) == len(connectorTypes) { for _, ct := range connectorTypes { if ct.Checksum == nil || *ct.Checksum != catalogChecksums[ct.ID] { - done = false + return done, nil } } + done = true } return done, nil } diff --git a/internal/connector/internal/services/connectors.go b/internal/connector/internal/services/connectors.go index 4ff822bb1..2f09072fc 100644 --- a/internal/connector/internal/services/connectors.go +++ b/internal/connector/internal/services/connectors.go @@ -2,6 +2,8 @@ package services import ( "context" + "encoding/base64" + "fmt" "strings" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/dbapi" @@ -31,6 +33,8 @@ type ConnectorsService interface { Delete(ctx context.Context, id string) *errors.ServiceError ForEach(f func(*dbapi.Connector) *errors.ServiceError, query string, args ...interface{}) []error ForceDelete(ctx context.Context, id string) *errors.ServiceError + + ResolveConnectorRefsWithBase64Secrets(resource *dbapi.Connector) (bool, *errors.ServiceError) } var _ ConnectorsService = &connectorsService{} @@ -214,7 +218,7 @@ func GetValidConnectorColumns() []string { // List returns all connectors visible to the user within the requested paging window. func (k *connectorsService) List(ctx context.Context, listArgs *services.ListArguments, clusterId string) (dbapi.ConnectorWithConditionsList, *api.PagingMeta, *errors.ServiceError) { if err := listArgs.Validate(GetValidConnectorColumns()); err != nil { - return nil, nil, errors.NewWithCause(errors.ErrorMalformedRequest, err, "Unable to list connector type requests: %s", err.Error()) + return nil, nil, errors.NewWithCause(errors.ErrorMalformedRequest, err, "Unable to list connector requests: %s", err.Error()) } dbConn := k.connectionFactory.New() @@ -391,3 +395,67 @@ func (k *connectorsService) ForceDelete(ctx context.Context, id string) *errors. } return nil } + +func (k *connectorsService) ResolveConnectorRefsWithBase64Secrets(connector *dbapi.Connector) (bool, *errors.ServiceError) { + err := getSecretsFromVaultAsBase64(connector, k.connectorTypesService, k.vaultService) + if err != nil { + return err.Code == errors.ErrorGeneral, err + } + + return false, nil +} + +func getSecretsFromVaultAsBase64(connector *dbapi.Connector, cts ConnectorTypesService, vault vault.VaultService) *errors.ServiceError { + ct, err := cts.Get(connector.ConnectorTypeId) + if err != nil { + return errors.BadRequest("invalid connector type id: %s", connector.ConnectorTypeId) + } + // move secrets to a vault. + + if connector.ServiceAccount.ClientSecretRef != "" { + v, err := vault.GetSecretString(connector.ServiceAccount.ClientSecretRef) + if err != nil { + return errors.GeneralError("could not get kafka client secrets from the vault: %v", err.Error()) + } + encoded := base64.StdEncoding.EncodeToString([]byte(v)) + connector.ServiceAccount.ClientSecret = encoded + } + + if len(connector.ConnectorSpec) != 0 { + updated, err := secrets.ModifySecrets(ct.JsonSchema, connector.ConnectorSpec, func(node *ajson.Node) error { + if node.Type() == ajson.Object { + ref, err := node.GetKey("ref") + if err != nil { + return err + } + r, err := ref.GetString() + if err != nil { + return err + } + v, err := vault.GetSecretString(r) + if err != nil { + return err + } + + encoded := base64.StdEncoding.EncodeToString([]byte(v)) + err = node.SetObject(map[string]*ajson.Node{ + "kind": ajson.StringNode("", "base64"), + "value": ajson.StringNode("", encoded), + }) + if err != nil { + return err + } + } else if node.Type() == ajson.Null { + // don't change.. + } else { + return fmt.Errorf("secret field must be set to an object: " + node.Path()) + } + return nil + }) + if err != nil { + return errors.GeneralError("could not get connectors secrets from the vault: %v", err.Error()) + } + connector.ConnectorSpec = updated + } + return nil +} diff --git a/internal/connector/internal/workers/connector_mgr.go b/internal/connector/internal/workers/connector_mgr.go index 2a4bc94ea..f7caa1e93 100644 --- a/internal/connector/internal/workers/connector_mgr.go +++ b/internal/connector/internal/workers/connector_mgr.go @@ -3,6 +3,7 @@ package workers import ( "context" "encoding/json" + "reflect" "sync" "time" @@ -153,21 +154,26 @@ func (k *ConnectorManager) Reconcile() []error { return errs } -func (k *ConnectorManager) ReconcileConnectorCatalogEntry(id string, channel string, ccc *config.ConnectorChannelConfig) *serviceError.ServiceError { +func (k *ConnectorManager) ReconcileConnectorCatalogEntry(id string, channel string, connectorChannelConfig *config.ConnectorChannelConfig) *serviceError.ServiceError { - ctc := dbapi.ConnectorShardMetadata{ + connectorShardMetadata := dbapi.ConnectorShardMetadata{ ConnectorTypeId: id, Channel: channel, } + var err error - ctc.ShardMetadata, err = json.Marshal(ccc.ShardMetadata) + connectorShardMetadata.Revision, err = GetShardMetadataRevision(connectorChannelConfig.ShardMetadata) + if err != nil { + return serviceError.GeneralError("failed to convert connector type %s, channel %s. Error in loaded connector type shard metadata %+v: %v", id, channel, connectorChannelConfig.ShardMetadata, err.Error()) + } + connectorShardMetadata.ShardMetadata, err = json.Marshal(connectorChannelConfig.ShardMetadata) if err != nil { return serviceError.GeneralError("failed to convert connector type %s, channel %s: %v", id, channel, err.Error()) } - // We store connector type channels so we can track changes and trigger redeployment of + // We store connector type channels so that we can track changes and trigger redeployment of // associated connectors upon connector type channel changes. - _, serr := k.connectorTypesService.PutConnectorShardMetadata(&ctc) + _, serr := k.connectorTypesService.PutConnectorShardMetadata(&connectorShardMetadata) if serr != nil { return serr } @@ -175,6 +181,20 @@ func (k *ConnectorManager) ReconcileConnectorCatalogEntry(id string, channel str return nil } +func GetShardMetadataRevision(connectorShardMetadata map[string]interface{}) (int64, error) { + revision, connectorRevisionFound := connectorShardMetadata["connector_revision"] + if connectorRevisionFound { + floatRevision, isfloat64 := revision.(float64) + if isfloat64 { + return int64(floatRevision), nil + } else { + return 0, errors.Errorf("connector_revision in shard metadata was not an int but a %v", reflect.TypeOf(revision).Kind()) + } + } else { + return 0, errors.Errorf("connector_revision not found in shard metadata") + } +} + func (k *ConnectorManager) reconcileAssigning(ctx context.Context, connector *dbapi.Connector) error { var namespace *dbapi.ConnectorNamespace namespace, err := k.connectorClusterService.FindAvailableNamespace(connector.Owner, connector.OrganisationId, connector.NamespaceId) @@ -186,7 +206,7 @@ func (k *ConnectorManager) reconcileAssigning(ctx context.Context, connector *db return nil } - channelVersion, err := k.connectorTypesService.GetLatestConnectorShardMetadataID(connector.ConnectorTypeId, connector.Channel) + shardMetadata, err := k.connectorTypesService.GetLatestConnectorShardMetadata(connector.ConnectorTypeId, connector.Channel) if err != nil { return errors.Wrapf(err, "failed to get latest channel version for connector request %s", connector.ID) } @@ -203,12 +223,12 @@ func (k *ConnectorManager) reconcileAssigning(ctx context.Context, connector *db Model: db.Model{ ID: api.NewID(), }, - ConnectorID: connector.ID, - ClusterID: namespace.ClusterId, - NamespaceID: namespace.ID, - ConnectorVersion: connector.Version, - ConnectorTypeChannelId: channelVersion, - Status: dbapi.ConnectorDeploymentStatus{}, + ConnectorID: connector.ID, + ClusterID: namespace.ClusterId, + NamespaceID: namespace.ID, + ConnectorVersion: connector.Version, + ConnectorShardMetadataID: shardMetadata.ID, + Status: dbapi.ConnectorDeploymentStatus{}, } if err = k.connectorClusterService.SaveDeployment(ctx, &deployment); err != nil { diff --git a/internal/connector/test/integration/connector-catalog-root/aws/aws-sqs-source-v1alpha1.json b/internal/connector/test/integration/connector-catalog-root/aws/aws-sqs-source-v1alpha1.json index 8f0e13143..6dc5dfb7a 100644 --- a/internal/connector/test/integration/connector-catalog-root/aws/aws-sqs-source-v1alpha1.json +++ b/internal/connector/test/integration/connector-catalog-root/aws/aws-sqs-source-v1alpha1.json @@ -200,7 +200,7 @@ "channels" : { "stable" : { "shard_metadata" : { - "connector_revision" : "5", + "connector_revision" : 5, "connector_type" : "source", "connector_image" : "quay.io/mock-image:77c0b8763729a9167ddfa19266d83a3512b7aa8124ca53e381d5d05f7d197a24", "operators" : [ { @@ -226,13 +226,13 @@ } }, "beta": { - "revision": 1, "shard_metadata": { + "connector_revision" : 4, "connector_image": "quay.io/mock-image:beta", "operators": [ { "type": "camel-k", - "versions": "[2.0.0]" + "version": "[2.0.0]" } ] } diff --git a/internal/connector/test/integration/connector-catalog-root/misc/log_sink_0.1.json b/internal/connector/test/integration/connector-catalog-root/misc/log_sink_0.1.json index 9d1dbb28c..efb961ece 100644 --- a/internal/connector/test/integration/connector-catalog-root/misc/log_sink_0.1.json +++ b/internal/connector/test/integration/connector-catalog-root/misc/log_sink_0.1.json @@ -151,7 +151,7 @@ "channels" : { "stable" : { "shard_metadata" : { - "connector_revision" : "5", + "connector_revision" : 5, "connector_type" : "sink", "connector_image" : "quay.io/mcs_dev/log-sink:0.0.1", "operators" : [ { diff --git a/internal/connector/test/integration/connector-catalog/aws-sqs-source-v1alpha1.json b/internal/connector/test/integration/connector-catalog/aws-sqs-source-v1alpha1.json index 8f0e13143..6dc5dfb7a 100644 --- a/internal/connector/test/integration/connector-catalog/aws-sqs-source-v1alpha1.json +++ b/internal/connector/test/integration/connector-catalog/aws-sqs-source-v1alpha1.json @@ -200,7 +200,7 @@ "channels" : { "stable" : { "shard_metadata" : { - "connector_revision" : "5", + "connector_revision" : 5, "connector_type" : "source", "connector_image" : "quay.io/mock-image:77c0b8763729a9167ddfa19266d83a3512b7aa8124ca53e381d5d05f7d197a24", "operators" : [ { @@ -226,13 +226,13 @@ } }, "beta": { - "revision": 1, "shard_metadata": { + "connector_revision" : 4, "connector_image": "quay.io/mock-image:beta", "operators": [ { "type": "camel-k", - "versions": "[2.0.0]" + "version": "[2.0.0]" } ] } diff --git a/internal/connector/test/integration/connector-catalog/log_sink_0.1.json b/internal/connector/test/integration/connector-catalog/log_sink_0.1.json index 9d1dbb28c..efb961ece 100644 --- a/internal/connector/test/integration/connector-catalog/log_sink_0.1.json +++ b/internal/connector/test/integration/connector-catalog/log_sink_0.1.json @@ -151,7 +151,7 @@ "channels" : { "stable" : { "shard_metadata" : { - "connector_revision" : "5", + "connector_revision" : 5, "connector_type" : "sink", "connector_image" : "quay.io/mcs_dev/log-sink:0.0.1", "operators" : [ { diff --git a/internal/connector/test/integration/feature_test.go b/internal/connector/test/integration/feature_test.go index 5387663f7..0c293fe79 100644 --- a/internal/connector/test/integration/feature_test.go +++ b/internal/connector/test/integration/feature_test.go @@ -32,6 +32,9 @@ func TestMain(m *testing.M) { "INSERT INTO connector_channels (channel) VALUES ('old_channel')", "INSERT INTO connector_type_channels (connector_type_id, connector_channel_channel) VALUES ('log_sink_0.1', 'old_channel')", "INSERT INTO connector_type_capabilities (connector_type_id, capability) VALUES ('log_sink_0.1', 'old_capability')", + + "INSERT INTO connector_channels (channel) VALUES ('stable')", + "INSERT INTO connector_shard_metadata (connector_type_id, channel) VALUES ('log_sink_0.1', 'stable')", }, func(c *config.ConnectorsConfig, kc *keycloak.KeycloakConfig, reconcilerConfig *workers.ReconcilerConfig) { c.ConnectorCatalogDirs = []string{"./internal/connector/test/integration/connector-catalog"} diff --git a/internal/connector/test/integration/features/connector-agent-api.feature b/internal/connector/test/integration/features/connector-agent-api.feature index f8d0722d8..a94ebdfde 100644 --- a/internal/connector/test/integration/features/connector-agent-api.feature +++ b/internal/connector/test/integration/features/connector-agent-api.feature @@ -398,7 +398,7 @@ Feature: connector agent API }, "shard_metadata": { "connector_image": "quay.io/mock-image:77c0b8763729a9167ddfa19266d83a3512b7aa8124ca53e381d5d05f7d197a24", - "connector_revision": "5", + "connector_revision": 5, "connector_type": "source", "kamelets": { "adapter": { @@ -496,7 +496,7 @@ Feature: connector agent API }, "shard_metadata": { "connector_image": "quay.io/mock-image:77c0b8763729a9167ddfa19266d83a3512b7aa8124ca53e381d5d05f7d197a24", - "connector_revision": "5", + "connector_revision": 5, "connector_type": "source", "kamelets": { "adapter": { @@ -583,7 +583,7 @@ Feature: connector agent API }, "shard_metadata": { "connector_image": "quay.io/mock-image:77c0b8763729a9167ddfa19266d83a3512b7aa8124ca53e381d5d05f7d197a24", - "connector_revision": "5", + "connector_revision": 5, "connector_type": "source", "kamelets": { "adapter": { @@ -837,7 +837,7 @@ Feature: connector agent API }, "shard_metadata": { "connector_image": "quay.io/mock-image:77c0b8763729a9167ddfa19266d83a3512b7aa8124ca53e381d5d05f7d197a24", - "connector_revision": "5", + "connector_revision": 5, "connector_type": "source", "kamelets": { "adapter": { @@ -965,7 +965,7 @@ Feature: connector agent API "kind": "ConnectorDeploymentAdminView", "metadata": { "created_at": "${response.metadata.created_at}", - "resolved_secrets": true, + "resolved_secrets": false, "resource_version": ${response.metadata.resource_version}, "updated_at": "${response.metadata.updated_at}" }, @@ -978,7 +978,7 @@ Feature: connector agent API "namespace_id": "${connector_namespace_id}", "shard_metadata": { "connector_image": "quay.io/mock-image:77c0b8763729a9167ddfa19266d83a3512b7aa8124ca53e381d5d05f7d197a24", - "connector_revision": "5", + "connector_revision": 5, "connector_type": "source", "kamelets": { "adapter": { @@ -1005,8 +1005,28 @@ Feature: connector agent API } }, "status": { + "conditions": [ + { + "last_transition_time": "2018-01-01T00:00:00Z", + "type": "Ready" + } + ], "operators": { - "assigned": {}, + "assigned": { + "id": "camel-k-1.0.0", + "type": "camel-k", + "version": "1.0.0" + }, + "available": {} + }, + "phase": "ready", + "resource_version": 45, + "shard_metadata": { + "assigned": { + "channel": "stable", + "connector_type_id": "aws-sqs-source-v1alpha1", + "revision": 5 + }, "available": {} } } @@ -1102,14 +1122,14 @@ Feature: connector agent API # Now lets verify connector upgrades due to catalog updates Given I am logged in as "Ricky Bobby" - And I GET path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/upgrades/type" + And I GET path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments?channel_updates=true" And the response code should be 200 And the response should match json: """ { "items": [], - "kind": "", - "page": 0, + "kind": "ConnectorDeploymentAdminViewList", + "page": 1, "size": 0, "total": 0 } @@ -1122,7 +1142,7 @@ Feature: connector agent API { "items": [], "kind": "", - "page": 0, + "page": 1, "size": 0, "total": 0 } @@ -1133,55 +1153,194 @@ Feature: connector agent API """ { "connector_image": "quay.io/mock-image:1.0.0", + "connector_revision": 42, "operators": [ { "type": "camel-k", - "versions": "[2.0.0]" + "version": "[2.0.0]" } ] } """ - Then I GET path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/upgrades/type" + Then I GET path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments?channel_updates=true" And the response code should be 200 And the response should match json: """ { - "items": - [{ - "connector_id": "${connector_id}", - "namespace_id": "${connector_namespace_id}", - "connector_type_id": "aws-sqs-source-v1alpha1", - "channel": "stable", - "shard_metadata": { - "assigned_id": ${response.items[0].shard_metadata.assigned_id}, - "available_id": ${response.items[0].shard_metadata.available_id} + "items": [ + { + "href": "${response.items[0].href}", + "id": "${response.items[0].id}", + "kind": "ConnectorDeploymentAdminView", + "metadata": { + "created_at": "${response.items[0].metadata.created_at}", + "resolved_secrets": false, + "resource_version": ${response.items[0].metadata.resource_version}, + "updated_at": "${response.items[0].metadata.updated_at}" + }, + "spec": { + "cluster_id": "${connector_cluster_id}", + "connector_id": "${connector_id}", + "connector_resource_version": ${response.items[0].spec.connector_resource_version}, + "connector_type_id": "aws-sqs-source-v1alpha1", + "desired_state": "ready", + "namespace_id": "${response.items[0].spec.namespace_id}", + "shard_metadata": { + "connector_image": "quay.io/mock-image:77c0b8763729a9167ddfa19266d83a3512b7aa8124ca53e381d5d05f7d197a24", + "connector_revision": 5, + "connector_type": "source", + "kamelets": { + "adapter": { + "name": "aws-sqs-source", + "prefix": "aws" + }, + "kafka": { + "name": "managed-kafka-sink", + "prefix": "kafka" + }, + "processors": { + "extract_field": "extract-field-action", + "has_header_filter": "has-header-filter-action", + "insert_field": "insert-field-action", + "throttle": "throttle-action" + } + }, + "operators": [ + { + "type": "camel-k", + "version": "[1.0.0,2.0.0)" + } + ] + } + }, + "status": { + "conditions": [ + { + "last_transition_time": "${response.items[0].status.conditions[0].last_transition_time}", + "type": "Ready" + } + ], + "operators": { + "assigned": { + "id": "camel-k-1.0.0", + "type": "camel-k", + "version": "1.0.0" + }, + "available": {} + }, + "phase": "ready", + "resource_version": 45, + "shard_metadata": { + "assigned": { + "channel": "stable", + "connector_type_id": "aws-sqs-source-v1alpha1", + "revision": 5 + }, + "available": { + "channel": "stable", + "connector_type_id": "aws-sqs-source-v1alpha1", + "revision": 42 + } + } } }], - "kind": "", - "page": 0, + "kind": "ConnectorDeploymentAdminViewList", + "page": 1, "size": 1, "total": 1 } """ - And I store the ".items[0].shard_metadata.available_id" selection from the response as ${connector_resource_version} - And I store the ".items" selection from the response as ${upgrade_items} + And I store the ".items[0].status.shard_metadata.available.revision" selection from the response as ${shard_metadata_latest_revision} + And I store the ".items[0].id" selection from the response as ${upgradable_deployment_id} + And I store the ".items" selection from the response as ${upgradable_deployments} # Upgrade by type # Should fail for Carley, who can't write - Then I PUT path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/upgrades/type" with json body: + Given I set the "Content-Type" header to "application/merge-patch+json" + When I PATCH path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments/${upgradable_deployment_id}" with json body: """ - ${upgrade_items} + { + "spec": { + "shard_metadata": { + "connector_revision": ${shard_metadata_latest_revision} + } + } + } """ - And the response code should be 404 + Then the response code should be 404 # Should work for Cal, who can write Given I am logged in as "Cal Naughton Jr." - Then I PUT path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/upgrades/type" with json body: + Given I set the "Content-Type" header to "application/merge-patch+json" + When I PATCH path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments/${upgradable_deployment_id}" with json body: """ - ${upgrade_items} + { + "spec": { + "shard_metadata": { + "connector_revision": ${shard_metadata_latest_revision} + } + } + } + """ + Then the response code should be 202 + And the response should match json: + """ + { + "href": "${response.href}", + "id": "${response.id}", + "kind": "ConnectorDeploymentAdminView", + "metadata": { + "created_at": "${response.metadata.created_at}", + "resolved_secrets": false, + "resource_version": ${response.metadata.resource_version}, + "updated_at": "${response.metadata.updated_at}" + }, + "spec": { + "cluster_id": "${connector_cluster_id}", + "connector_id": "${connector_id}", + "connector_resource_version": ${response.spec.connector_resource_version}, + "connector_type_id": "aws-sqs-source-v1alpha1", + "desired_state": "ready", + "namespace_id": "${response.spec.namespace_id}", + "shard_metadata": { + "connector_image": "quay.io/mock-image:1.0.0", + "connector_revision": 42, + "operators": [ + { + "type": "camel-k", + "version": "[2.0.0]" + } + ] + } + }, + "status": { + "conditions": [ + { + "last_transition_time": "${response.status.conditions[0].last_transition_time}", + "type": "Ready" + } + ], + "operators": { + "assigned": { + "id": "camel-k-1.0.0", + "type": "camel-k", + "version": "1.0.0" + }, + "available": {} + }, + "phase": "ready", + "resource_version": 45, + "shard_metadata": { + "assigned": { + "channel": "stable", + "connector_type_id": "aws-sqs-source-v1alpha1", + "revision": 42 + }, + "available": {} + } + } + } """ - And the response code should be 204 - And the response should match "" # agent should get updated connector type version Given I am logged in as "Shard" @@ -1191,11 +1350,12 @@ Feature: connector agent API And the ".items[0].spec.shard_metadata" selection from the response should match json: """" { - "connector_image": "quay.io/mock-image:1.0.0", + "connector_image": "quay.io/mock-image:1.0.0", + "connector_revision": 42, "operators": [ { "type": "camel-k", - "versions": "[2.0.0]" + "version": "[2.0.0]" } ] } @@ -1203,14 +1363,14 @@ Feature: connector agent API # type upgrade is not available anymore Then I am logged in as "Ricky Bobby" - And I GET path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/upgrades/type" + And I GET path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments?channel_updates=true" And the response code should be 200 And the response should match json: """ { "items": [], - "kind": "", - "page": 0, + "kind": "ConnectorDeploymentAdminViewList", + "page": 1, "size": 0, "total": 0 } @@ -1264,7 +1424,7 @@ Feature: connector agent API } }], "kind": "", - "page": 0, + "page": 1, "size": 1, "total": 1 } @@ -1314,7 +1474,7 @@ Feature: connector agent API { "items": [], "kind": "", - "page": 0, + "page": 1, "size": 0, "total": 0 } @@ -1919,18 +2079,78 @@ Feature: connector agent API } """ - # listing deployments as Admin returns a not found error for the missing connector - When I GET path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments" - Then the response code should be 404 + # listing deployments as Admin with dangling_deployments=true returns the list of dangling deployments + When I GET path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments?dangling_deployments=true" + Then the response code should be 200 And the response should match json: """ { - "id": "7", - "kind": "Error", - "href": "/api/connector_mgmt/v1/errors/7", - "code": "CONNECTOR-MGMT-7", - "reason": "Connector with id='${connector_id}' not found", - "operation_id": "${response.operation_id}" + "items": [ + { + "href": "${response.items[0].href}", + "id": "${response.items[0].id}", + "kind": "ConnectorDeploymentAdminView", + "metadata": { + "created_at": "${response.items[0].metadata.created_at}", + "resolved_secrets": false, + "resource_version": ${response.items[0].metadata.resource_version}, + "updated_at": "${response.items[0].metadata.updated_at}" + }, + "spec": { + "cluster_id": "${connector_cluster_id}", + "connector_id": "${connector_id}", + "connector_resource_version": ${response.items[0].spec.connector_resource_version}, + "connector_type_id": "log_sink_0.1", + "desired_state": "ready", + "namespace_id": "${response.items[0].spec.namespace_id}", + "shard_metadata": { + "connector_image": "quay.io/mcs_dev/log-sink:0.0.1", + "connector_revision": 5, + "connector_type": "sink", + "kamelets": { + "adapter": { + "name": "log-sink", + "prefix": "log" + }, + "kafka": { + "name": "managed-kafka-source", + "prefix": "kafka" + }, + "processors": { + "extract_field": "extract-field-action", + "has_header_filter": "has-header-filter-action", + "insert_field": "insert-field-action", + "throttle": "throttle-action" + } + }, + "operators": [ + { + "type": "camel-connector-operator", + "version": "[1.0.0,2.0.0)" + } + ] + } + }, + "status": { + "operators": { + "assigned": {}, + "available": {} + }, + "shard_metadata": { + "assigned": { + "channel": "stable", + "connector_type_id": "log_sink_0.1", + "revision": 5 + }, + "available": {} + } + } + } + ], + "kind": "ConnectorDeploymentAdminViewList", + "page": 1, + "size": 1, + "total": 1 } """ diff --git a/internal/connector/test/integration/features/connector-api.feature b/internal/connector/test/integration/features/connector-api.feature index 8ea916df6..14c21aa65 100644 --- a/internal/connector/test/integration/features/connector-api.feature +++ b/internal/connector/test/integration/features/connector-api.feature @@ -1151,7 +1151,7 @@ Feature: create a connector "kind":"Error", "href":"/api/connector_mgmt/v1/errors/17", "code":"CONNECTOR-MGMT-17", - "reason":"Unable to list connector type requests: invalid order by clause 'CAST(CHR(32)||(SELECT version()) AS NUMERIC)'", + "reason":"Unable to list connector requests: invalid order by clause 'CAST(CHR(32)||(SELECT version()) AS NUMERIC)'", "operation_id": "${response.operation_id}" } """ diff --git a/openapi/connector_mgmt-private-admin.yaml b/openapi/connector_mgmt-private-admin.yaml index 8e9fe027d..1d8e93427 100644 --- a/openapi/connector_mgmt-private-admin.yaml +++ b/openapi/connector_mgmt-private-admin.yaml @@ -221,6 +221,18 @@ paths: type: string in: path required: true + - name: channel_updates + description: include only deployments that have channel updates + schema: + type: boolean + in: query + required: false + - name: dangling_deployments + description: include only not deleted deployments belonging to a deleted connector + schema: + type: boolean + in: query + required: false - $ref: "connector_mgmt.yaml#/components/parameters/page" - $ref: "connector_mgmt.yaml#/components/parameters/size" - $ref: "connector_mgmt.yaml#/components/parameters/orderBy" @@ -319,6 +331,64 @@ paths: - Bearer: [ ] operationId: getConnectorDeployment summary: Get a connector deployment + patch: + tags: + - Connector Clusters Admin + security: + - Bearer: [ ] + operationId: patchConnectorClusterDeploymentAdmi + summary: Patch a deployment + description: Patch a deployment + requestBody: + description: Data to patch the deployment with + content: + application/merge-patch+json: + schema: + type: object + required: true + responses: + "202": + content: + application/json: + schema: + $ref: "#/components/schemas/ConnectorDeploymentAdminView" + description: The deployment matching the request + "401": + content: + application/json: + schema: + $ref: "connector_mgmt.yaml#/components/schemas/Error" + examples: + 401Example: + $ref: "#/components/examples/401Example" + description: Auth token is invalid + "404": + content: + application/json: + schema: + $ref: "connector_mgmt.yaml#/components/schemas/Error" + examples: + 404Example: + $ref: "#/components/examples/404Example" + description: No matching resource exists + "410": + content: + application/json: + schema: + $ref: "connector_mgmt.yaml#/components/schemas/Error" + examples: + 404Example: + $ref: "#/components/examples/410Example" + description: The requested resource doesn't exist anymore + "500": + content: + application/json: + schema: + $ref: "connector_mgmt.yaml#/components/schemas/Error" + examples: + 500Example: + $ref: "#/components/examples/500Example" + description: Unexpected error occurred /api/connector_mgmt/v1/admin/kafka_connector_namespaces: get: @@ -574,10 +644,21 @@ paths: type: string in: path required: true + - name: channel_updates + description: include only deployments that have channel updates + schema: + type: boolean + in: query + required: false + - name: dangling_deployments + description: include only not deleted deployments belonging to a deleted connector + schema: + type: boolean + in: query + required: false - $ref: "connector_mgmt.yaml#/components/parameters/page" - $ref: "connector_mgmt.yaml#/components/parameters/size" - $ref: "connector_mgmt.yaml#/components/parameters/orderBy" - - $ref: "connector_mgmt.yaml#/components/parameters/search" responses: "200": content: @@ -617,6 +698,7 @@ paths: operationId: getNamespaceDeployments summary: Get a list of available deployments in a namespace + /api/connector_mgmt/v1/admin/kafka_connectors/{connector_id}: parameters: - name: connector_id @@ -722,105 +804,6 @@ paths: operationId: deleteConnector summary: Delete a connector - /api/connector_mgmt/v1/admin/kafka_connector_clusters/{connector_cluster_id}/upgrades/type: - parameters: - - name: connector_cluster_id - description: The id of the connector cluster - schema: - type: string - in: path - required: true - - $ref: "connector_mgmt.yaml#/components/parameters/page" - - $ref: "connector_mgmt.yaml#/components/parameters/size" - get: - tags: - - Connector Clusters Admin - responses: - "200": - content: - application/json: - schema: - $ref: "#/components/schemas/ConnectorAvailableTypeUpgradeList" - description: The connectors that have available type upgrades - "401": - content: - application/json: - schema: - $ref: "connector_mgmt.yaml#/components/schemas/Error" - examples: - 401Example: - $ref: "connector_mgmt.yaml#/components/examples/401Example" - description: Auth token is invalid - "404": - content: - application/json: - schema: - $ref: "connector_mgmt.yaml#/components/schemas/Error" - examples: - 404Example: - $ref: "connector_mgmt.yaml#/components/examples/404Example" - description: No matching connector cluster type exists - "500": - content: - application/json: - schema: - $ref: "connector_mgmt.yaml#/components/schemas/Error" - examples: - 500Example: - $ref: "connector_mgmt.yaml#/components/examples/500Example" - description: Unexpected error occurred - security: - - Bearer: [ ] - operationId: getConnectorUpgradesByType - summary: Get a list of available connector type upgrades - - put: - tags: - - Connector Clusters Admin - responses: - "204": - description: Connectors are upgraded - "401": - content: - application/json: - schema: - $ref: "connector_mgmt.yaml#/components/schemas/Error" - examples: - 401Example: - $ref: "connector_mgmt.yaml#/components/examples/401Example" - description: Auth token is invalid - "404": - content: - application/json: - schema: - $ref: "connector_mgmt.yaml#/components/schemas/Error" - examples: - 404Example: - $ref: "connector_mgmt.yaml#/components/examples/404Example" - description: No matching connector cluster exists - "500": - content: - application/json: - schema: - $ref: "connector_mgmt.yaml#/components/schemas/Error" - examples: - 500Example: - $ref: "connector_mgmt.yaml#/components/examples/500Example" - description: Unexpected error occurred - security: - - Bearer: [ ] - operationId: upgradeConnectorsByType - summary: upgrade a connector cluster - requestBody: - description: List of connectors to upgrade - content: - application/json: - schema: - type: array - items: - $ref: "#/components/schemas/ConnectorAvailableTypeUpgrade" - required: true - /api/connector_mgmt/v1/admin/kafka_connector_clusters/{connector_cluster_id}/upgrades/operator: parameters: - name: connector_cluster_id @@ -1004,38 +987,6 @@ paths: components: schemas: - ConnectorAvailableTypeUpgradeList: - allOf: - - $ref: "connector_mgmt.yaml#/components/schemas/List" - - type: object - properties: - items: - type: array - items: - $ref: "#/components/schemas/ConnectorAvailableTypeUpgrade" - - ConnectorAvailableTypeUpgrade: - description: An available type upgrade for a connector - type: object - properties: - connector_id: - type: string - namespace_id: - type: string - connector_type_id: - type: string - channel: - type: string - shard_metadata: - type: object - properties: - assigned_id: - type: integer - format: int64 - available_id: - type: integer - format: int64 - ConnectorAvailableOperatorUpgradeList: allOf: - $ref: "connector_mgmt.yaml#/components/schemas/List" @@ -1059,12 +1010,16 @@ components: channel: type: string operator: - type: object - properties: - assigned_id: - type: string - available_id: - type: string + $ref: "#/components/schemas/ConnectorUpgradeStatus" + + ConnectorUpgradeStatus: + description: Assigned and available update ids + type: object + properties: + assigned_id: + type: string + available_id: + type: string ConnectorNamespaceWithTenantRequest: required: @@ -1128,7 +1083,7 @@ components: spec: $ref: '#/components/schemas/ConnectorDeploymentAdminSpec' status: - $ref: 'connector_mgmt-private.yaml#/components/schemas/ConnectorDeploymentStatus' + $ref: '#/components/schemas/ConnectorDeploymentAdminStatus' ConnectorDeploymentAdminSpec: description: Holds the deployment specification of a connector @@ -1157,6 +1112,49 @@ components: shard_metadata: type: object + ConnectorDeploymentAdminStatus: + description: The status of connector deployment + type: object + properties: + phase: + $ref: 'connector_mgmt.yaml#/components/schemas/ConnectorState' + resource_version: + type: integer + format: int64 + shard_metadata: + description: latest available revision of deployment shared metadata + type: object + properties: + assigned: + $ref: '#/components/schemas/ConnectorShardMetadata' + available: + $ref: '#/components/schemas/ConnectorShardMetadata' + operators: + type: object + properties: + assigned: + $ref: 'connector_mgmt-private.yaml#/components/schemas/ConnectorOperator' + available: + $ref: 'connector_mgmt-private.yaml#/components/schemas/ConnectorOperator' + conditions: + type: array + items: + $ref: 'connector_mgmt-private.yaml#/components/schemas/MetaV1Condition' + + ConnectorShardMetadata: + description: identifies a shard metadata of a connector type. + properties: + channel: + description: the channel of the shard metadata + type: string + connector_type_id: + description: the connector type id this shard metadata refers to + type: string + revision: + description: the revision of the shard metadate + type: integer + format: int64 + ConnectorDeploymentAdminViewList: allOf: - $ref: 'connector_mgmt.yaml#/components/schemas/List' diff --git a/pkg/db/migrations.go b/pkg/db/migrations.go index 683f3cdb5..885e964ff 100644 --- a/pkg/db/migrations.go +++ b/pkg/db/migrations.go @@ -246,6 +246,26 @@ func DropTableColumnsAction(table interface{}, tableName ...string) MigrationAct } } +func DropTableColumnAction(table interface{}, columnName string) MigrationAction { + caller := "" + if _, file, no, ok := runtime.Caller(1); ok { + caller = fmt.Sprintf("[ %s:%d ]", file, no) + } + return func(tx *gorm.DB, apply bool) error { + if apply { + if err := tx.Migrator().DropColumn(table, columnName); err != nil { + return errors.Wrap(err, caller) + } + } else { + if err := tx.Migrator().AddColumn(table, columnName); err != nil { + return errors.Wrap(err, caller) + } + } + return nil + + } +} + func RenameTableColumnAction(table interface{}, oldFieldName string, newFieldName string) MigrationAction { caller := "" if _, file, no, ok := runtime.Caller(1); ok { diff --git a/pkg/handlers/framework.go b/pkg/handlers/framework.go index f6734eeaa..1d9e98338 100644 --- a/pkg/handlers/framework.go +++ b/pkg/handlers/framework.go @@ -4,9 +4,10 @@ import ( "context" "encoding/json" "fmt" - "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/compat" "net/http" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/compat" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/errors" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/logger" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared" @@ -65,7 +66,7 @@ func Handle(w http.ResponseWriter, r *http.Request, cfg *HandlerConfig, httpStat // Use the following instead if you want to debug the request body: //bytes, err := ioutil.ReadAll(r.Body) //if err != nil { - // handleError(r.Context(), w, errors.MalformedRequest("Unable to read request body: %s", err)) + // errorHandler(r, w, cfg, errors.MalformedRequest("Unable to read request body: %s", err)) // return //} //fmt.Println(string(bytes)) diff --git a/pkg/services/queryparser/query_parser.go b/pkg/services/queryparser/query_parser.go index a11a187ff..3d5de0092 100644 --- a/pkg/services/queryparser/query_parser.go +++ b/pkg/services/queryparser/query_parser.go @@ -2,9 +2,10 @@ package queryparser import ( "fmt" + "strings" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared/utils/state_machine" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared/utils/stringscanner" - "strings" "github.com/pkg/errors" ) @@ -38,6 +39,7 @@ type DBQuery struct { Query string Values []interface{} ValidColumns []string + ColumnPrefix string } // QueryParser - This object is to be used to parse and validate WHERE clauses (only portion after the `WHERE` is supported) @@ -141,6 +143,9 @@ func (p *queryParser) initStateMachine() (*state_machine.State, checkUnbalancedB if !contains(p.dbqry.ValidColumns, columnName) { return fmt.Errorf("invalid column name: '%s'", token.Value) } + if p.dbqry.ColumnPrefix != "" && !strings.HasPrefix(columnName, p.dbqry.ColumnPrefix+".") { + columnName = p.dbqry.ColumnPrefix + "." + columnName + } p.dbqry.Query += columnName return nil default: @@ -217,11 +222,16 @@ func (p *queryParser) Parse(sql string) (*DBQuery, error) { } func NewQueryParser(columns ...string) QueryParser { + return NewQueryParserWithColumnPrefix("", columns...) +} + +func NewQueryParserWithColumnPrefix(columnsPrefix string, columns ...string) QueryParser { query := DBQuery{} if len(columns) == 0 { query.ValidColumns = validColumns } else { query.ValidColumns = columns } + query.ColumnPrefix = columnsPrefix return &queryParser{dbqry: query} } diff --git a/pkg/services/queryparser/query_parser_test.go b/pkg/services/queryparser/query_parser_test.go index 024693247..6ccd7fff9 100644 --- a/pkg/services/queryparser/query_parser_test.go +++ b/pkg/services/queryparser/query_parser_test.go @@ -1,49 +1,56 @@ package queryparser import ( - . "github.com/onsi/gomega" "testing" + + . "github.com/onsi/gomega" ) func Test_QueryParser(t *testing.T) { - tests := []struct { name string qry string + qryParser QueryParser outQry string outValues []interface{} wantErr bool }{ { - name: "Testing just `=` sign", - qry: "=", - wantErr: true, + name: "Testing just `=` sign", + qry: "=", + qryParser: NewQueryParser(), + wantErr: true, }, { - name: "Testing incomplete query", - qry: "name=", - wantErr: true, + name: "Testing incomplete query", + qry: "name=", + qryParser: NewQueryParser(), + wantErr: true, }, { - name: "Testing incomplete join", - qry: "name='test' and ", - wantErr: true, + name: "Testing incomplete join", + qry: "name='test' and ", + qryParser: NewQueryParser(), + wantErr: true, }, { name: "Testing escaped quote", qry: `name='test\'123'`, + qryParser: NewQueryParser(), outQry: "name = ?", outValues: []interface{}{"test'123"}, wantErr: false, }, { - name: "Testing wrong unescaped quote", - qry: `name='test'123'`, - wantErr: true, + name: "Testing wrong unescaped quote", + qry: `name='test'123'`, + qryParser: NewQueryParser(), + wantErr: true, }, { name: "Complex query with braces", qry: "((cloud_provider = Value and name = value1) and (owner <> value2 or region=b ) ) or owner=c or name=e and region LIKE '%test%'", + qryParser: NewQueryParser(), outQry: "((cloud_provider = ? and name = ?) and (owner <> ? or region = ?)) or owner = ? or name = ? and region LIKE ?", outValues: []interface{}{"Value", "value1", "value2", "b", "c", "e", "%test%"}, wantErr: false, @@ -51,6 +58,7 @@ func Test_QueryParser(t *testing.T) { { name: "Complex query with braces and quoted values with escaped quote", qry: `((cloud_provider = 'Value' and name = 'val\'ue1') and (owner = value2 or region='b' ) ) or owner=c or name=e and region LIKE '%test%'`, + qryParser: NewQueryParser(), outQry: "((cloud_provider = ? and name = ?) and (owner = ? or region = ?)) or owner = ? or name = ? and region LIKE ?", outValues: []interface{}{"Value", "val'ue1", "value2", "b", "c", "e", "%test%"}, wantErr: false, @@ -58,6 +66,7 @@ func Test_QueryParser(t *testing.T) { { name: "Complex query with braces and quoted values with spaces", qry: `((cloud_provider = 'Value' and name = 'val ue1') and (owner = ' value2 ' or region='b' ) ) or owner=c or name=e and region LIKE '%test%'`, + qryParser: NewQueryParser(), outQry: "((cloud_provider = ? and name = ?) and (owner = ? or region = ?)) or owner = ? or name = ? and region LIKE ?", outValues: []interface{}{"Value", "val ue1", " value2 ", "b", "c", "e", "%test%"}, wantErr: false, @@ -65,6 +74,7 @@ func Test_QueryParser(t *testing.T) { { name: "Complex query with braces and empty quoted values", qry: `((cloud_provider = 'Value' and name = '') and (owner = ' value2 ' or region='' ) ) or owner=c or name=e and region LIKE '%test%'`, + qryParser: NewQueryParser(), outQry: "((cloud_provider = ? and name = ?) and (owner = ? or region = ?)) or owner = ? or name = ? and region LIKE ?", outValues: []interface{}{"Value", "", " value2 ", "", "c", "e", "%test%"}, wantErr: false, @@ -82,7 +92,8 @@ func Test_QueryParser(t *testing.T) { "and name = value9 " + "and name = value10 " + "or name = value11", - wantErr: false, + qryParser: NewQueryParser(), + wantErr: false, }, { name: "11 JOINS (too many)", @@ -98,22 +109,34 @@ func Test_QueryParser(t *testing.T) { "and name = value10 " + "or name = value11 " + "and name = value12", - wantErr: true, + qryParser: NewQueryParser(), + wantErr: true, }, { - name: "Complex query with unbalanced braces", - qry: "((cloud_provider = Value and name = value1) and (owner = value2 or region=b ) or owner=c or name=e and region LIKE '%test%'", - wantErr: true, + name: "Complex query with unbalanced braces", + qry: "((cloud_provider = Value and name = value1) and (owner = value2 or region=b ) or owner=c or name=e and region LIKE '%test%'", + qryParser: NewQueryParser(), + wantErr: true, }, { - name: "Bad column name", - qry: "badcolumn=test", - wantErr: true, + name: "Bad column name", + qry: "badcolumn=test", + qryParser: NewQueryParser(), + wantErr: true, }, { - name: "Bad column name in complex query", - qry: "((cloud_provider = Value and name = value1) and (owner = value2 or region=b ) or badcolumn=c or name=e and region LIKE '%test%'", - wantErr: true, + name: "Bad column name in complex query", + qry: "((cloud_provider = Value and name = value1) and (owner = value2 or region=b ) or badcolumn=c or name=e and region LIKE '%test%'", + qryParser: NewQueryParser(), + wantErr: true, + }, + { + name: "Parse with column prefix", + qry: "((cloud_provider = Value and name = value1) and (owner <> value2 or region=b ) ) or owner=c or name=e and region LIKE '%test%'", + qryParser: NewQueryParserWithColumnPrefix("prefix"), + outQry: "((prefix.cloud_provider = ? and prefix.name = ?) and (prefix.owner <> ? or prefix.region = ?)) or prefix.owner = ? or prefix.name = ? and prefix.region LIKE ?", + outValues: []interface{}{"Value", "value1", "value2", "b", "c", "e", "%test%"}, + wantErr: false, }, } @@ -122,7 +145,7 @@ func Test_QueryParser(t *testing.T) { t.Run(tt.name, func(t *testing.T) { RegisterTestingT(t) - qry, err := NewQueryParser().Parse(tt.qry) + qry, err := tt.qryParser.Parse(tt.qry) if err != nil && !tt.wantErr { t.Errorf("QueryParser() error = %v, wantErr = %v", err, tt.wantErr)