Skip to content

Commit

Permalink
Mkhokh/8858 cursor fields for incremental update (airbytehq#14356)
Browse files Browse the repository at this point in the history
* add logic to skip incremental mode for tables which do not have a field types that could not be used as cursor

* added tests for cursor fields incremental updates

* remove not used methods

* fix tests

* fix formatting

* revert RedshiftInsertDestinationAcceptanceTest

* fix ssl clickhouse test

* fix codereview comments

* fix clickhouse strict encrypt test

* fix postgres source after merge from master

* optimize ssl clickhouse test logic

* fix db2 test

* added check for cursor field should be not nullable, fix formatting

* fixed tests, formatting, issue after merge from master

* fix merge issues

* fix test for source-jdbc and remove some code used for local testing

* added description for cockroach db tests

* test changes

* refactoring for cursor field tests after code review comments

* refactoring for cursor field tests after code review comments

* fix code review comments

* remove unused imports

* fix formatting

* fix compilation error

* fix postgres and postgress ssl test

* Fixed bucket naming for S3

* remove CHAR, NCHAR from possible cursor types

* remove SYMBOL from possible cursor types

* removed redundant configs

* fixed mysql-strict-encrypt tests

* fixed mongodb-v2 tests

* minor format changes

* bump version

* auto-bump connector version [ci skip]

* auto-bump connector version [ci skip]

* auto-bump connector version [ci skip]

* auto-bump connector version [ci skip]

* auto-bump connector version [ci skip]

* auto-bump connector version [ci skip]

* auto-bump connector version [ci skip]

* auto-bump connector version [ci skip]

* auto-bump connector version [ci skip]

* auto-bump connector version [ci skip]

Co-authored-by: vmaltsev <[email protected]>
Co-authored-by: subodh <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
4 people authored Aug 18, 2022
1 parent 11583d3 commit 5776dcc
Show file tree
Hide file tree
Showing 62 changed files with 392 additions and 140 deletions.
20 changes: 10 additions & 10 deletions airbyte-config/init/src/main/resources/seed/source_definitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@
- name: ClickHouse
sourceDefinitionId: bad83517-5e54-4a3d-9b53-63e85fbd4d7c
dockerRepository: airbyte/source-clickhouse
dockerImageTag: 0.1.11
dockerImageTag: 0.1.12
documentationUrl: https://docs.airbyte.io/integrations/sources/clickhouse
icon: cliskhouse.svg
sourceType: database
Expand All @@ -176,7 +176,7 @@
- name: Cockroachdb
sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003
dockerRepository: airbyte/source-cockroachdb
dockerImageTag: 0.1.15
dockerImageTag: 0.1.16
documentationUrl: https://docs.airbyte.io/integrations/sources/cockroachdb
icon: cockroachdb.svg
sourceType: database
Expand Down Expand Up @@ -449,7 +449,7 @@
- name: IBM Db2
sourceDefinitionId: 447e0381-3780-4b46-bb62-00a4e3c8b8e2
dockerRepository: airbyte/source-db2
dockerImageTag: 0.1.13
dockerImageTag: 0.1.14
documentationUrl: https://docs.airbyte.io/integrations/sources/db2
icon: db2.svg
sourceType: database
Expand Down Expand Up @@ -599,7 +599,7 @@
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.4.15
dockerImageTag: 0.4.16
documentationUrl: https://docs.airbyte.io/integrations/sources/mssql
icon: mssql.svg
sourceType: database
Expand Down Expand Up @@ -631,7 +631,7 @@
- name: MongoDb
sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerRepository: airbyte/source-mongodb-v2
dockerImageTag: 0.1.15
dockerImageTag: 0.1.16
documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2
icon: mongodb.svg
sourceType: database
Expand All @@ -647,7 +647,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.6.3
dockerImageTag: 0.6.4
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down Expand Up @@ -686,7 +686,7 @@
- name: Oracle DB
sourceDefinitionId: b39a7370-74c3-45a6-ac3a-380d48520a83
dockerRepository: airbyte/source-oracle
dockerImageTag: 0.3.19
dockerImageTag: 0.3.20
documentationUrl: https://docs.airbyte.io/integrations/sources/oracle
icon: oracle.svg
sourceType: database
Expand Down Expand Up @@ -797,7 +797,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.2
dockerImageTag: 1.0.3
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down Expand Up @@ -845,7 +845,7 @@
- name: Redshift
sourceDefinitionId: e87ffa8e-a3b5-f69c-9076-6011339de1f6
dockerRepository: airbyte/source-redshift
dockerImageTag: 0.3.11
dockerImageTag: 0.3.12
documentationUrl: https://docs.airbyte.io/integrations/sources/redshift
icon: redshift.svg
sourceType: database
Expand Down Expand Up @@ -948,7 +948,7 @@
- name: Snowflake
sourceDefinitionId: e2d65910-8c8b-40a1-ae7d-ee2416b2bfa2
dockerRepository: airbyte/source-snowflake
dockerImageTag: 0.1.17
dockerImageTag: 0.1.18
documentationUrl: https://docs.airbyte.io/integrations/sources/snowflake
icon: snowflake.svg
sourceType: database
Expand Down
20 changes: 10 additions & 10 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-clickhouse:0.1.11"
- dockerImage: "airbyte/source-clickhouse:0.1.12"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/clickhouse"
connectionSpecification:
Expand Down Expand Up @@ -1390,7 +1390,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-cockroachdb:0.1.15"
- dockerImage: "airbyte/source-cockroachdb:0.1.16"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/cockroachdb"
connectionSpecification:
Expand Down Expand Up @@ -4016,7 +4016,7 @@
- - "client_secret"
oauthFlowOutputParameters:
- - "refresh_token"
- dockerImage: "airbyte/source-db2:0.1.13"
- dockerImage: "airbyte/source-db2:0.1.14"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/db2"
connectionSpecification:
Expand Down Expand Up @@ -5271,7 +5271,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mssql:0.4.15"
- dockerImage: "airbyte/source-mssql:0.4.16"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql"
connectionSpecification:
Expand Down Expand Up @@ -5882,7 +5882,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-mongodb-v2:0.1.15"
- dockerImage: "airbyte/source-mongodb-v2:0.1.16"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
changelogUrl: "https://docs.airbyte.io/integrations/sources/mongodb-v2"
Expand Down Expand Up @@ -6046,7 +6046,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:0.6.3"
- dockerImage: "airbyte/source-mysql:0.6.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down Expand Up @@ -6708,7 +6708,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-oracle:0.3.19"
- dockerImage: "airbyte/source-oracle:0.3.20"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/oracle"
connectionSpecification:
Expand Down Expand Up @@ -7545,7 +7545,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.2"
- dockerImage: "airbyte/source-postgres:1.0.3"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -8145,7 +8145,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-redshift:0.3.11"
- dockerImage: "airbyte/source-redshift:0.3.12"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -9319,7 +9319,7 @@
- - "client_secret"
oauthFlowOutputParameters:
- - "refresh_token"
- dockerImage: "airbyte/source-snowflake:0.1.17"
- dockerImage: "airbyte/source-snowflake:0.1.18"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,12 @@ void setStatementField(final PreparedStatement preparedStatement,
*/
String getFullyQualifiedTableNameWithQuoting(final Connection connection, final String schemaName, final String tableName) throws SQLException;

/**
* This method will verify that filed could be used as cursor for incremental sync
*
* @param type - table field type that should be checked
* @return true is field type can be used as cursor field for incremental sync
*/
boolean isCursorType(final SourceType type);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ public final class JdbcConstants {
public static final String JDBC_COLUMN_DATA_TYPE = "DATA_TYPE";
public static final String JDBC_COLUMN_TYPE_NAME = "TYPE_NAME";
public static final String JDBC_COLUMN_SIZE = "COLUMN_SIZE";
public static final String JDBC_IS_NULLABLE = "IS_NULLABLE";

public static final String INTERNAL_SCHEMA_NAME = "schemaName";
public static final String INTERNAL_TABLE_NAME = "tableName";
public static final String INTERNAL_COLUMN_NAME = "columnName";
public static final String INTERNAL_COLUMN_TYPE = "columnType";
public static final String INTERNAL_COLUMN_TYPE_NAME = "columnTypeName";
public static final String INTERNAL_COLUMN_SIZE = "columnSize";
public static final String INTERNAL_IS_NULLABLE = "isNullable";

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_SCHEMA_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_TABLE_NAME;
import static io.airbyte.db.jdbc.JdbcUtils.ALLOWED_CURSOR_TYPES;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -101,6 +102,11 @@ public JDBCType getFieldType(final JsonNode field) {
}
}

@Override
public boolean isCursorType(JDBCType type) {
return ALLOWED_CURSOR_TYPES.contains(type);
}

@Override
public JsonSchemaType getJsonType(final JDBCType jdbcType) {
return switch (jdbcType) {
Expand Down
20 changes: 20 additions & 0 deletions airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,29 @@

package io.airbyte.db.jdbc;

import static java.sql.JDBCType.BIGINT;
import static java.sql.JDBCType.DATE;
import static java.sql.JDBCType.DECIMAL;
import static java.sql.JDBCType.DOUBLE;
import static java.sql.JDBCType.FLOAT;
import static java.sql.JDBCType.INTEGER;
import static java.sql.JDBCType.LONGVARCHAR;
import static java.sql.JDBCType.NUMERIC;
import static java.sql.JDBCType.NVARCHAR;
import static java.sql.JDBCType.REAL;
import static java.sql.JDBCType.SMALLINT;
import static java.sql.JDBCType.TIME;
import static java.sql.JDBCType.TIMESTAMP;
import static java.sql.JDBCType.TINYINT;
import static java.sql.JDBCType.VARCHAR;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Maps;
import java.sql.JDBCType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.jooq.JSONFormat;

public class JdbcUtils {
Expand All @@ -34,6 +52,8 @@ public class JdbcUtils {
public static final String USERNAME_KEY = "username";
public static final String MODE_KEY = "mode";
public static final String AMPERSAND = "&";
public static final Set<JDBCType> ALLOWED_CURSOR_TYPES = Set.of(TIMESTAMP, TIME, DATE, TINYINT, SMALLINT, INTEGER,
BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL, NVARCHAR, VARCHAR, LONGVARCHAR);
private static final JdbcSourceOperations defaultSourceOperations = new JdbcSourceOperations();

private static final JSONFormat defaultJSONFormat = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-clickhouse-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.name=airbyte/source-clickhouse-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public String createTableQuery(final String tableName,

@BeforeAll
static void init() {
CREATE_TABLE_WITHOUT_CURSOR_TYPE_QUERY = "CREATE TABLE %s (%s Array(UInt32)) ENGINE = MergeTree ORDER BY tuple();";
INSERT_TABLE_WITHOUT_CURSOR_TYPE_QUERY = "INSERT INTO %s VALUES([12, 13, 0, 1]);)";
CREATE_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY = "CREATE TABLE %s (%s Nullable(VARCHAR(20))) ENGINE = MergeTree ORDER BY tuple();";
INSERT_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY = "INSERT INTO %s VALUES('Hello world :)');";

container = new GenericContainer<>(new ImageFromDockerfile("clickhouse-test")
.withFileFromClasspath("Dockerfile", "docker/Dockerfile")
.withFileFromClasspath("clickhouse_certs.sh", "docker/clickhouse_certs.sh"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-clickhouse

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.11
LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.name=airbyte/source-clickhouse
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Duration;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.containers.wait.strategy.Wait;
Expand Down Expand Up @@ -54,6 +55,14 @@ public String createTableQuery(final String tableName, final String columnClause
+ primaryKeyClause);
}

@BeforeAll
static void init() {
CREATE_TABLE_WITHOUT_CURSOR_TYPE_QUERY = "CREATE TABLE %s (%s Array(UInt32)) ENGINE = MergeTree ORDER BY tuple();";
INSERT_TABLE_WITHOUT_CURSOR_TYPE_QUERY = "INSERT INTO %s VALUES([12, 13, 0, 1]);)";
CREATE_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY = "CREATE TABLE %s (%s Nullable(VARCHAR(20))) ENGINE = MergeTree ORDER BY tuple();";
INSERT_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY = "INSERT INTO %s VALUES('Hello world :)');";
}

@Override
@AfterEach
public void tearDown() throws SQLException {
Expand Down
Loading

0 comments on commit 5776dcc

Please sign in to comment.