Skip to content

Commit

Permalink
Propagate array item types from SQL to Avro schema
Browse files Browse the repository at this point in the history
  • Loading branch information
shnapz committed Nov 26, 2024
1 parent 9c72eac commit 1dc7af2
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public static Schema createSchemaByReadingOneRow(
try (Statement statement = connection.createStatement()) {
final ResultSet resultSet = statement.executeQuery(queryBuilderArgs.sqlQueryWithLimitOne());

resultSet.next();

final Schema schema =
createAvroSchema(
resultSet,
Expand Down Expand Up @@ -107,7 +109,7 @@ public static Schema createAvroSchema(
.prop("tableName", tableName)
.prop("connectionUrl", connectionUrl)
.fields();
return createAvroFields(meta, builder, useLogicalTypes).endRecord();
return createAvroFields(resultSet, builder, useLogicalTypes).endRecord();
}

static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLException {
Expand All @@ -123,11 +125,13 @@ static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLExcep
}

private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
final ResultSetMetaData meta,
final SchemaBuilder.FieldAssembler<Schema> builder,
final ResultSet resultSet,
final SchemaBuilder.FieldAssembler<Schema> builder,
final boolean useLogicalTypes)
throws SQLException {

ResultSetMetaData meta = resultSet.getMetaData();

for (int i = 1; i <= meta.getColumnCount(); i++) {

final String columnName;
Expand All @@ -153,9 +157,15 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>>
fieldSchemaBuilder = field.type().unionOf().nullBuilder().endNull().and();

Integer arrayItemType = null;
if (columnType == ARRAY) {
arrayItemType = resultSet.getArray(i).getBaseType();

Check warning on line 162 in dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java

View check run for this annotation

Codecov / codecov/patch

dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java#L162

Added line #L162 was not covered by tests
}

final SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>> schemaFieldAssembler =
setAvroColumnType(
columnType,
arrayItemType,
meta.getPrecision(i),
columnClassName,
useLogicalTypes,
Expand All @@ -181,6 +191,7 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
private static SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>
setAvroColumnType(
final int columnType,
final Integer arrayItemType,
final int precision,
final String columnClassName,
final boolean useLogicalTypes,
Expand Down Expand Up @@ -225,10 +236,12 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
} else {
return field.bytesType();
}
case ARRAY:
return setAvroColumnType(arrayItemType, null, precision, columnClassName,
useLogicalTypes, field.array().items());

Check warning on line 241 in dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java

View check run for this annotation

Codecov / codecov/patch

dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java#L240-L241

Added lines #L240 - L241 were not covered by tests
case BINARY:
case VARBINARY:
case LONGVARBINARY:
case ARRAY:
case BLOB:
return field.bytesType();
case DOUBLE:
Expand Down
8 changes: 8 additions & 0 deletions e2e/e2e.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#!/usr/bin/env bash

echo $1

# fail on error
set -o errexit
set -o nounset
set -o pipefail

readonly SCRIPT_PATH="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null && pwd)"
readonly PROJECT_PATH="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." >/dev/null && pwd)"
readonly MOUNT_OUTPUT=0

# This file contatins psql views with complex types to validate and troubleshoot dbeam

Expand Down Expand Up @@ -65,9 +68,14 @@ pack() {
}

run_docker_dbeam() {
OUTPUT_MOUNT_EXP=""
if [ "$MOUNT_OUTPUT" -eq 1 ]; then
OUTPUT_MOUNT_EXP=--mount="type=bind,source=$SCRIPT_PATH,target=$SCRIPT_PATH"
fi
time docker run --interactive --rm \
--net="$DOCKER_NETWORK" \
--mount="type=bind,source=$PROJECT_PATH/dbeam-core/target,target=/dbeam" \
$OUTPUT_MOUNT_EXP \
--memory=1G \
--entrypoint=/usr/bin/java \
"$JAVA_DOCKER_IMAGE" \
Expand Down

0 comments on commit 1dc7af2

Please sign in to comment.