Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Discussion]: AWS Redshift Data API Ballerina Connector #7446

Open
chathushkaayash opened this issue Dec 5, 2024 · 11 comments
Open

[Discussion]: AWS Redshift Data API Ballerina Connector #7446

chathushkaayash opened this issue Dec 5, 2024 · 11 comments
Assignees
Labels

Comments

@chathushkaayash
Copy link

chathushkaayash commented Dec 5, 2024

Summary

This proposal aims to develop a Ballerina connector for the AWS Redshift Data API. The connector will offer a simplified API for executing queries, enhancing Ballerina+Redshift integration capabilities by addressing performance bottlenecks, such as open connections commonly associated with JDBC-based APIs.

Goals

  • Performance: Optimize query execution efficiently by using Redshift Data API.
  • Ease of Use: Intuitive interface for database operations.
  • Flexibility: Support parameterized queries, custom types, and streams.
  • Security: Authentication using AWS credentials.

Motivation

While a JDBC connector for Redshift is already available, its performance is limited compared to the data API. This connector is built to address performance bottlenecks, enabling faster response times and simplifying integration with Redshift databases.

Description

Client API Design

Client Definition

public type Client distinct client object {

    # Initializes AWS Redshift Data API client.
    # + connectionConfig - Configurations related to redshift data api
    # + return - Possible error when creating the client
    public isolated function init(ConnectionConfig connectionConfig) returns error?;

    # Executes the SQL query.
    #
    # + sqlQuery - The SQL query such as `DELETE FROM User WHERE city=${cityName}`
    # + databaseConfig - The database configurations such as the clusterId, databaseName, and databaseUser
    # + return - The metadata of the execution or the results of the query or an error
    remote function execute(sql:ParameterizedQuery sqlQuery, DatabaseConfig databaseConfig,
            typedesc<record {}> rowType = <>)
        returns stream<rowType, sql:Error?>|sql:ExecutionResult|sql:Error;

    # Executes multiple SQL queries.
    #
    # + sqlQueries - The SQL queries such as `DELETE FROM User WHERE city=${cityName}`
    # + databaseConfig - The database configurations such as the clusterId, databaseName, and databaseUser
    # + return - The metadata of the execution
    remote isolated function batchExecute(sql:ParameterizedQuery[] sqlQueries, DatabaseConfig databaseConfig,
            typedesc<record {}[]> rowTypes = <>)
        returns (stream<rowTypes, sql:Error?>|sql:ExecutionResult)[]|sql:Error;

    # Closes the client.
    #
    # + return - Possible error when closing the client
    remote isolated function close() returns error?;
};

ConnectionConfig

# Represents the Client configurations for AWS Redshift Data API.
# + region - The AWS region with which the connector should communicate
# + authConfig - The authentication configurations for the redshift data api
# + clientOptions - Additional configurations related to http client
public type ConnectionConfig record {|
    Region region;
    AuthConfig authConfig;
    HttpClientOptions clientOptions?;
|};

Region

# An Amazon Web Services region that hosts a set of Amazon services.
public enum Region {
    AF_SOUTH_1 = "af-south-1",
    AP_EAST_1 = "ap-east-1",
    AP_NORTHEAST_1 = "ap-northeast-1",
    AP_NORTHEAST_2 = "ap-northeast-2",
    AP_NORTHEAST_3 = "ap-northeast-3",
    AP_SOUTH_1 = "ap-south-1",
    AP_SOUTH_2 = "ap-south-2",
    AP_SOUTHEAST_1 = "ap-southeast-1",
    AP_SOUTHEAST_2 = "ap-southeast-2",
    AP_SOUTHEAST_3 = "ap-southeast-3",
    AP_SOUTHEAST_4 = "ap-southeast-4",
    AWS_CN_GLOBAL = "aws-cn-global",
    AWS_GLOBAL = "aws-global",
    AWS_ISO_GLOBAL = "aws-iso-global",
    AWS_ISO_B_GLOBAL = "aws-iso-b-global",
    AWS_US_GOV_GLOBAL = "aws-us-gov-global",
    CA_WEST_1 = "ca-west-1",
    CA_CENTRAL_1 = "ca-central-1",
    CN_NORTH_1 = "cn-north-1",
    CN_NORTHWEST_1 = "cn-northwest-1",
    EU_CENTRAL_1 = "eu-central-1",
    EU_CENTRAL_2 = "eu-central-2",
    EU_ISOE_WEST_1 = "eu-isoe-west-1",
    EU_NORTH_1 = "eu-north-1",
    EU_SOUTH_1 = "eu-south-1",
    EU_SOUTH_2 = "eu-south-2",
    EU_WEST_1 = "eu-west-1",
    EU_WEST_2 = "eu-west-2",
    EU_WEST_3 = "eu-west-3",
    IL_CENTRAL_1 = "il-central-1",
    ME_CENTRAL_1 = "me-central-1",
    ME_SOUTH_1 = "me-south-1",
    SA_EAST_1 = "sa-east-1",
    US_EAST_1 = "us-east-1",
    US_EAST_2 = "us-east-2",
    US_GOV_EAST_1 = "us-gov-east-1",
    US_GOV_WEST_1 = "us-gov-west-1",
    US_ISOB_EAST_1 = "us-isob-east-1",
    US_ISO_EAST_1 = "us-iso-east-1",
    US_ISO_WEST_1 = "us-iso-west-1",
    US_WEST_1 = "us-west-1",
    US_WEST_2 = "us-west-2"
}

AuthConfig

# Auth configurations for the AWS Redshift Data API
# + awsAccessKeyId - The AWS access key ID
# + awsSecretAccessKey - The AWS secret access key
# + sessionToken - The session token if the credentials are temporary
public type AuthConfig record {|
    string awsAccessKeyId;
    string awsSecretAccessKey;
    string sessionToken?;
|};

HttpClientOptions

# Http client configurations
# + maxConcurrency - The maximum number of concurrent requests that can be made to the database
# + connectionTimeout - The maximum time to wait for a connection to be established in `seconds`
# + readTimeout - The maximum time to wait for a read operation to complete in `seconds`
# + writeTimeout - The maximum time to wait for a write operation to complete in `seconds`
public type HttpClientOptions record {|
    int maxConcurrency = 50;
    int connectionTimeout = 30;
    int readTimeout = 30;
    int writeTimeout = 30;
|};

DatabaseConfig

# Database configurations
# + clusterId - The cluster identifier
# + databaseName - The name of the database
# + databaseUser - The database user
public type DatabaseConfig record {|
    string clusterId;
    string databaseName;
    string databaseUser;
|};

Sample Usage

import ballerinax/redshiftdata;


redshiftdata:AuthConfig authConfig = {
    awsAccessKeyId: "<AWS_ACCESS_KEY_ID>",
    awsSecretAccessKey: "<AWS_SECRET_ACCESS_KEY>"
};


redshiftdata:ConnectionConfig connectionConfig = {
    region: US_EAST_2,
    authConfig: authConfig
};


redshiftdata:DatabaseConfig databaseConfig = {
    clusterId: "<CLUSTER_ID>",
    databaseName: "<DATABASE_NAME>",
    databaseUser: "<DATABASE_USER>"
};


type User record {|
    string name;
    string email;
    string city;
|};


public function main() returns error? {
    redshiftdata:Client redshift = check new (connectionConfig);
    sql:ParameterizedQuery query = `SELECT * FROM User`;
    stream<User, error?> result = check redshift->execute(query, databaseConfig);

    check result.forEach(function(User user) {
        io:println(user);
    });

}

Dependencies

  • AWS Redshift Data API Java SDK
@ThisaruGuruge
Copy link
Member

ThisaruGuruge commented Dec 6, 2024

Do we need to provide the database config each time we send a request? I'd prefer one config per client. (That's how this would be usually used).

We can get the database configs at the init method and use them internally when we send a request.

@ayeshLK
Copy link
Member

ayeshLK commented Dec 6, 2024

Do we need to provide the database config each time we send a request? I'd prefer one config per client. (THat's how this would be usually used).

We can get the database configs at the init method and use them internally when we send a request.

Agreed. But, Redshift Data API is bit flexible in that context. A user can query different database with each request. That is why we thought of providing that flexibility in the Ballerina API.

But IMO we can improve it like this. If a user wants to use a single database instance for all queries, he/she can provide a Database Config during the client initialization. And when executing a query we can get that and use that as the db config. But, if user does not provide a DB config during the client initialization, he/she can provide the db-config with the client action.

I think we can print a waring during the runtime if the user provides both the options. And we can specify a priority in our internal implementation to which db-config to use.

@ThisaruGuruge WDYT ?

@chathushkaayash chathushkaayash changed the title [Proposal]: AWS Redshift Data API Ballerina Connector [Discussion]: AWS Redshift Data API Ballerina Connector Dec 6, 2024
@ThisaruGuruge
Copy link
Member

IMO, we should follow the other DB connectors, where we create a single client for a given connection. Even though the Data API is flexible, I don't think the 80% case would be to connect multiple DBs in a single application. Even if we have multiple DBs in a single application, the more intuitive approach would be to keep a client and use it for a single DB (even though we won't be keeping open connections like the other connectors).

@shafreenAnfar @daneshk thoughts?

@shafreenAnfar
Copy link
Contributor

How does the API look like in Java, Go and C#?

@chathushkaayash
Copy link
Author

chathushkaayash commented Dec 9, 2024

How does the API look like in Java, Go and C#?

Java

In the Java SDK, executing a query and retrieving the results involves invoking three distinct API operations:

  • executeStatement
    Executes a SQL query or command on an Amazon Redshift database. This method is used to start a database operation, such as querying, inserting, or updating data. Returns a unique statement ID for tracking the query.

      ExecuteStatementRequest statementRequest = ExecuteStatementRequest.builder()
                  .clusterIdentifier(databaseConfig.clusterId)
                  .database(databaseConfig.databaseName)
                  .dbUser(databaseConfig.databaseUser)
                  .sql(sqlStatement)
                  .build();
    
      ExecuteStatementResponse response = getAsyncDataClient().executeStatement(statementRequest).join();
  • describeStatement
    Retrieves the status and metadata of a previously executed statement using its statement ID. It provides details about the execution progress.

      DescribeStatementRequest describeRequest = DescribeStatementRequest.builder().id(statementId).build();
    
      DescribeStatementResponse describeResponse = getAsyncDataClient().describeStatement(describeRequest).join();
  • getStatementResult
    Fetches the result set of a successfully executed statement. This method retrieves the data returned by a query.

      GetStatementResultRequest resultRequest = GetStatementResultRequest.builder()
                      .id(statementId)
                      .build();
    
      GetStatementResultResponse resultResponse = getAsyncDataClient().getStatementResult(resultRequest).join();

    For more details, refer to the

    GO

    • ExecuteStatement
   execstmt_req, execstmt_err := redshiftclient.ExecuteStatement(&redshiftdataapiservice.ExecuteStatementInput{
 	ClusterIdentifier: aws.String(redshift_cluster_id),
 	DbUser:            aws.String(redshift_user),
 	Database:          aws.String(redshift_database),
 	Sql:               aws.String(query),
   })
  • DescribeStatement
  descstmt_req, descstmt_err := redshiftclient.DescribeStatement(&redshiftdataapiservice.DescribeStatementInput{
	Id: execstmt_req.Id,
  })
  • GetStatementResult
  getresult_req, getresult_err := redshiftclient.GetStatementResult(&redshiftdataapiservice.DetStatementResultInput{
	Id: execstmt_req.Id,
  })

For more details, refer to the

C#

There is not SDK available in C#

@daneshk
Copy link
Member

daneshk commented Dec 10, 2024

I am +1 for sticking with the current SQL connector APIs, with a few minor adjustments. Here are my suggestions:

  • Instead of having a single execute API for all CRUD operations, let's implement a query API for select queries. We can use execute for data manipulation queries such as insert, delete, and update. Additionally, we could introduce a queryRow API to fetch a single value instead of a stream of result sets.
  • We should restrict the batchExecute API to data manipulation operations only, as it is typically not used to query result sets.
  • Let's create a separate API to execute stored procedures, similar to how it's done in SQL connectors.

As we do HTTP calls underneath, we need to think through how we could map HTTP call to our APIs

@chathushkaayash
Copy link
Author

Design Review Notes

  • Conduct a research on Async and Sync Redhshift Java Clients. The research should cover following aspects:

    • Underlying transport protocol used (HTTPv1.1 or HTTPv2.0 etc)
    • Whether the connection uses SSL
    • Performance impact when using Async and Sync client
  • The Ballerina API should try to mimic the existing Java API as this would reduce the overhead for developers coming from another background (Java or Golang) to quickly adapt to the Ballerina Redshift connector.

  • Rather than passing multiple parameters to client API methods (like query, db-config, timeout etc) define a single record type which contains all the parameters and use it as an included record param in the API.

  • Update the client API definition with the init method.

  • Define a separate API to retrieve results for queries executed using batchExecute.

  • Finalize the polling mechanism when checking the state of the query and expose the polling interval in ConnectionConfig.

@chathushkaayash
Copy link
Author

Synchronous and Asynchronous Client Comparison

Redshift Data API Java SDK: Synchronous and Asynchronous Client Comparison

@chathushkaayash
Copy link
Author

chathushkaayash commented Dec 16, 2024

Client API Design

Client Definition

public type Client distinct client object {
    # Initializes AWS Redshift Data API client.
    #
    # + connectionConfig - Configurations related to redshift data api
    # + return - The `redshiftdata:Client` or `redshiftdata:Error` if the initialization fails
    public isolated function init(*ConnectionConfig connectionConfig) returns Error?;

    # Executes the SQL query.
    #
    # + sqlStatement - The SQL statement to be executed
    # + databaseConfig - The database configurations.
    # + return - The statementId that can be used to retrieve the results or an error
    remote function executeStatement(sql:ParameterizedQuery sqlStatement, DatabaseConfig databaseConfig? = ())
    returns string|Error;

    # Executes the SQL queries in a batch.
    #
    # + sqlStatements - The SQL statements to be executed
    # + databaseConfig - The database configurations.
    # + return - The statementIds that can be used to retrieve the results or an error
    remote function batchExecuteStatement(sql:ParameterizedQuery[] sqlStatements, DatabaseConfig? databaseConfig = ())
    returns string[]|Error;

    # Retrieves the results of a previously executed SQL statement.
    #
    # + statementId - The identifier of the SQL statement
    # + timeout - The timeout in seconds to retrieve the results.
    # + rowTypes - The typedesc of the record to which the result needs to be returned
    # + return - Stream of records in the type of rowTypes or possible error
    remote isolated function getQueryResult(string statementId, decimal? timeout = (),
            typedesc<record {}> rowTypes = <>)
    returns stream<rowTypes, sql:Error?>|Error;

    # Retrieves the execution result of a previously executed SQL statement.
    #
    # + statementId - The identifier of the SQL statement
    # + timeout - The timeout in seconds to retrieve the results.
    # + return - Metadata of the query execution as an sql:ExecutionResult or an error
    remote isolated function getExecutionResult(string statementId, decimal? timeout = ())
    returns sql:ExecutionResult|Error;
};

ConnectionConfig

# Additional configurations related to redshift data api
#
# + region - The AWS region with which the connector should communicate
# + authConfig - The authentication configurations for the redshift data api
# + databaseConfig - The database configurations
# This can be overridden in the individual execute and batchExecute requests.
# + timeout - The timeout to be used to get the query results and execution results in `seconds`
# + pollingInterval - The polling interval to be used to get the query results and execution results in `seconds`
public type ConnectionConfig record {|
    Region region;
    AuthConfig authConfig;
    DatabaseConfig databaseConfig;
    decimal timeout = 30;
    decimal pollingInterval = 5;
|};

Region

# An Amazon Web Services region that hosts a set of Amazon services.
public enum Region {
    AF_SOUTH_1 = "af-south-1",
    AP_EAST_1 = "ap-east-1"
    // more regions
}

AuthConfig

# Auth configurations for the redshift data api
#
# + awsAccessKeyId - The AWS access key ID
# + awsSecretAccessKey - The AWS secret access key
# + sessionToken - The session token if the credentials are temporary
public type AuthConfig record {|
    string awsAccessKeyId;
    string awsSecretAccessKey;
    string sessionToken?;
|};

DatabaseConfig

# Database configurations
#
# + clusterId - The cluster identifier
# + databaseName - The name of the database
# + databaseUser - The database user
public type DatabaseConfig record {|
    string clusterId;
    string databaseName;
    string databaseUser;
|};

Errors

public type Error distinct error;

Sample Usage

import ballerinax/redshiftdata;


redshiftdata:AuthConfig authConfig = {
    awsAccessKeyId: "<AWS_ACCESS_KEY_ID>",
    awsSecretAccessKey: "<AWS_SECRET_ACCESS_KEY>"
};


redshiftdata:DatabaseConfig databaseConfig = {
    clusterId: "<CLUSTER_ID>",
    databaseName: "<DATABASE_NAME>",
    databaseUser: "<DATABASE_USER>"
};


redshiftdata:ConnectionConfig connectionConfig = {
    region: "af-south-1",
    authConfig: authConfig,
    databaseConfig: databaseConfig
};


type User record {|
    string name;
    string email;
    string city;
|};


public function main() returns error? {


    redshiftdata:Client redshift = check new (connectionConfig);


    // Insert records
    User[] users = [
        {name: "John Doe", email: "[email protected]", city: "New York"},
        {name: "Jane Doe", email: "[email protected]", city: "California"}
    ];


    sql:ParameterizedQuery[] insertQueries = from User user in users
        select `INSERT INTO users (name, email, city) VALUES (${user.name}, ${user.email}, ${user.city})`;


    string[] insertRequestIds = check redshift->batchExecuteStatement(insertQueries);


    // Retrieve the execution result of the first insert statement
    sql:ExecutionResult insertResults = check redshift->getExecutionResult(insertRequestIds[0]);


    // Retrieve records
    string retrieveRequestId = check redshift->executeStatement(`SELECT * FROM users`, databaseConfig);


    // Retrieve the query result
    stream<User, sql:Error?> result = check redshift->getQueryResult(retrieveRequestId, 40);


    check result.forEach(function(User user) {
        io:println(user);
    });


}

Redshift Data API Java SDK: Synchronous vs. Asynchronous Client Comparison

Key Details from the Comparison

Aspect Sync Client Async Client
Nature of Operations Each method call blocks the calling thread until the operation completes. Asynchronous, returning a CompletableFuture for each operation.
Performance Throughput: 4703.4/sec (5% improvement). Throughput: 4441.0/sec.
Protocol TLS 1.2 TLS 1.3
  • Test Scenarios: Retrieval of users from the database using the following APIs:
    • executeStatement
    • describeStatement
    • getStatementResult

Decision

Given the slightly better performance and simpler handling provided by the synchronous client, I decided to proceed with the synchronous client.

Thread Handling in Asynchronous Client

  • The async client uses AmazonAsyncHttpClient, leveraging asynchronous processing with CompletableFuture and a series of pipeline stages for request execution and response handling.
  • Each HTTP request is processed in a non-blocking manner, with different stages of the pipeline running in separate threads, including tasks like signing the request, applying headers, executing the HTTP call. The client utilizes dedicated threads for each stage in the pipeline, which allows for concurrent request handling without blocking the main thread, ensuring efficient resource usage and responsiveness. This architecture ensures that the client can handle multiple requests simultaneously.

close() Method

  • The close() method in the SdkAutoCloseable interface is part of the AWS SDK's resource management mechanism.
  • Purpose: To release resources (e.g., HTTP connections) held by SDK client objects when no longer needed.
  • Managed by SDK: The close() method is already overridden and managed by the Redshift Data SDK.

Decision

This method will not be exposed in the Client.


JDBC Client-inspired vs. Java SDK-inspired Version

When deciding between a JDBC Client-inspired version and a Java SDK-inspired version, I opted for the SDK-inspired approach. Since a JDBC connector already exists, creating another client with a similar approach seemed redundant. However, I aimed to design the SDK-inspired version in a way that makes it adaptable and user-friendly.

In terms of design, my focus was on simplicity and usability, while ensuring the client retains the advanced capabilities like connecting to different databases, available when using the Java SDK.

Decision

Implement the Java SDK-inspired Version


Global DatabaseConfigs (Per Client) vs. Local DatabaseConfigs (Per Request)

When deciding between Global DatabaseConfigs and Local DatabaseConfigs, I prioritized what would be most convenient for the user. Since it’s common to use a single database per client, I chose to implement global configurations. However, if a different database is needed, you can override the global configuration by providing a new database config for the API, leveraging the flexibility offered by the SDK.

Decision

Define Database Configs Per Client

@ayeshLK
Copy link
Member

ayeshLK commented Dec 17, 2024

@chathushkaayash in the ConnectionConfig record do we need to specify the timeout and pollingInterval as they are required only for two operations. Rather doing that we can specify a new record and use it as an input parameter for the operation which requires those configurations.

public type ExecutionConfig record {|
    decimal timeout = 30;
    decimal pollingInterval = 5;
|};

public type Client distinct client object {
    // other methods

    remote isolated function getQueryResult(string statementId, *ExecutionConfig executionConfig,
            typedesc<record {}> rowTypes = <>)
    returns stream<rowTypes, sql:Error?>|Error;

    remote isolated function getExecutionResult(string statementId, *ExecutionConfig executionConfig)
    returns sql:ExecutionResult|Error;
}

@chathushkaayash
Copy link
Author

Batch Statement Execution in Redshift Data API

When executing batch statements using the batchExecuteStatement() API in ballerina, we encounter the following workflow in the native code.

  1. Batch Execution:

    • We pass an array of SQL statements to batchExecuteStatement() and receive a Statement ID (e.g., a0f0b47f-47b9-4619-a56f-f8d88b1d01dd).
  2. Retrieving Sub-Statement Data:

    • Using describeStatement() with the main Statement ID, we can retrieve data related to each sub-statement.

    • Example of SubStatement Data:

      SubStatementData(
          CreatedAt=2024-12-19T04:32:02.996Z,
          Duration=18456087,
          HasResultSet=true,
          Id=a0f0b47f-47b9-4619-a56f-f8d88b1d01dd:1,
          QueryString=SELECT * FROM Users,
          RedshiftQueryId=1070443,
          ResultRows=23,
          ResultSize=600,
          Status=FINISHED,
          UpdatedAt=2024-12-19T04:32:03.583Z
      )
      
    • For a batch execution:

      • Main Statement ID: e69143bb-b6a8-40c2-a919-89abc954a1d0
      • Batch Execute Sub-Statement IDs: ["e69143bb-b6a8-40c2-a919-89abc954a1d0:1", "e69143bb-b6a8-40c2-a919-89abc954a1d0:2"]
  3. Fetching Results:

    • Using getStatementResult(), we can retrieve the result for each sub-statement ID individually, such as for a0f0b47f-47b9-4619-a56f-f8d88b1d01dd:1 or a0f0b47f-47b9-4619-a56f-f8d88b1d01dd:2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants