Skip to content
This repository has been archived by the owner on Jan 16, 2023. It is now read-only.

Commit

Permalink
table => stream, column => field (airbytehq#346)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Sep 18, 2020
1 parent 786ff09 commit c2b96ee
Show file tree
Hide file tree
Showing 30 changed files with 248 additions and 283 deletions.
16 changes: 8 additions & 8 deletions dataline-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1191,25 +1191,25 @@ components:
description: describes the available schema.
type: object
required:
- tables
- streams
properties:
tables:
streams:
type: array
items:
$ref: "#/components/schemas/SourceSchemaTable"
SourceSchemaTable:
$ref: "#/components/schemas/SourceSchemaStream"
SourceSchemaStream:
type: object
required:
- name
- columns
- fields
properties:
name:
type: string
columns:
fields:
type: array
items:
$ref: "#/components/schemas/SourceSchemaColumn"
SourceSchemaColumn:
$ref: "#/components/schemas/SourceSchemaField"
SourceSchemaField:
type: object
required:
- name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@
"schema": {
"description": "describes the available schema.",
"type": "object",
"required": ["tables"],
"required": ["streams"],
"additionalProperties": false,
"properties": {
"tables": {
"streams": {
"type": "array",
"items": {
"$ref": "StandardDataSchema.json#/definitions/table"
"$ref": "StandardDataSchema.json#/definitions/stream"
}
}
}
},
"table": {
"stream": {
"type": "object",
"required": ["name", "columns"],
"required": ["name", "fields"],
"additionalProperties": false,
"properties": {
"name": {
Expand All @@ -29,15 +29,15 @@
"selected": {
"type": "boolean"
},
"columns": {
"fields": {
"type": "array",
"items": {
"$ref": "StandardDataSchema.json#/definitions/column"
"$ref": "StandardDataSchema.json#/definitions/field"
}
}
}
},
"column": {
"field": {
"type": "object",
"required": ["name", "dataType", "selected"],
"additionalProperties": false,
Expand All @@ -49,7 +49,7 @@
"$ref": "DataType.json"
},
"selected": {
"description": "whether or not the column will be replicated.",
"description": "whether or not the field will be replicated.",
"type": "boolean"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ public void cleanUp() {
public void testReadFirstTime() throws Exception {
Schema schema = Jsons.deserialize(MoreResources.readResource("simple_postgres_source_schema.json"), Schema.class);

// select all tables and all columns
schema.getTables().forEach(t -> t.setSelected(true));
schema.getTables().forEach(t -> t.getColumns().forEach(c -> c.setSelected(true)));
// select all streams and all fields
schema.getStreams().forEach(t -> t.setSelected(true));
schema.getStreams().forEach(t -> t.getFields().forEach(c -> c.setSelected(true)));

StandardSync syncConfig = new StandardSync().withSyncMode(StandardSync.SyncMode.FULL_REFRESH).withSchema(schema);
SourceConnectionImplementation sourceImpl =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"tables": [
"streams": [
{
"name": "id_and_name",
"selected": false,
"columns": [
"fields": [
{
"name": "name",
"dataType": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,16 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.dataline.commons.json.JsonValidationException;
import io.dataline.commons.json.Jsons;
import io.dataline.config.Column;
import io.dataline.config.DataType;
import io.dataline.config.DestinationConnectionImplementation;
import io.dataline.config.Field;
import io.dataline.config.Schema;
import io.dataline.config.SourceConnectionImplementation;
import io.dataline.config.StandardSync;
import io.dataline.config.StandardSyncSchedule;
import io.dataline.config.Table;
import io.dataline.config.Stream;
import io.dataline.config.persistence.ConfigNotFoundException;
import io.dataline.config.persistence.ConfigRepository;
import io.dataline.integrations.Integrations;
import io.dataline.scheduler.job_factory.SyncJobFactory;
import io.dataline.scheduler.persistence.SchedulerPersistence;
import java.io.IOException;
Expand All @@ -56,51 +50,27 @@

class JobSchedulerTest {

private static final SourceConnectionImplementation SOURCE_CONNECTION_IMPLEMENTATION;
private static final DestinationConnectionImplementation DESTINATION_CONNECTION_IMPLEMENTATION;
private static final StandardSync STANDARD_SYNC;
private static final StandardSyncSchedule STANDARD_SYNC_SCHEDULE;
private static final long JOB_ID = 12L;
private Job previousJob;

static {
final UUID workspaceId = UUID.randomUUID();
final UUID sourceImplementationId = UUID.randomUUID();
final UUID sourceSpecificationId = Integrations.POSTGRES_TAP.getSpecId();

JsonNode implementationJson = Jsons.jsonNode(ImmutableMap.builder()
.put("apiKey", "123-abc")
.put("hostname", "dataline.io")
.build());

SOURCE_CONNECTION_IMPLEMENTATION = new SourceConnectionImplementation()
.withWorkspaceId(workspaceId)
.withSourceSpecificationId(sourceSpecificationId)
.withSourceImplementationId(sourceImplementationId)
.withConfiguration(implementationJson)
.withTombstone(false);

final UUID destinationImplementationId = UUID.randomUUID();
final UUID destinationSpecificationId = Integrations.POSTGRES_TARGET.getSpecId();

DESTINATION_CONNECTION_IMPLEMENTATION = new DestinationConnectionImplementation()
.withWorkspaceId(workspaceId)
.withDestinationSpecificationId(destinationSpecificationId)
.withDestinationImplementationId(destinationImplementationId)
.withConfiguration(implementationJson)
.withTombstone(false);

final Column column = new Column()
final Field field = new Field()
.withDataType(DataType.STRING)
.withName("id")
.withSelected(true);

final Table table = new Table()
final Stream stream = new Stream()
.withName("users")
.withColumns(Lists.newArrayList(column))
.withFields(Lists.newArrayList(field))
.withSelected(true);

final Schema schema = new Schema().withTables(Lists.newArrayList(table));
final Schema schema = new Schema().withStreams(Lists.newArrayList(stream));

final UUID connectionId = UUID.randomUUID();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.dataline.commons.json.Jsons;
import io.dataline.config.Column;
import io.dataline.config.DataType;
import io.dataline.config.DestinationConnectionImplementation;
import io.dataline.config.Field;
import io.dataline.config.JobCheckConnectionConfig;
import io.dataline.config.JobConfig;
import io.dataline.config.JobDiscoverSchemaConfig;
Expand All @@ -46,7 +46,7 @@
import io.dataline.config.Schema;
import io.dataline.config.SourceConnectionImplementation;
import io.dataline.config.StandardSync;
import io.dataline.config.Table;
import io.dataline.config.Stream;
import io.dataline.db.DatabaseHelper;
import io.dataline.integrations.Integrations;
import io.dataline.scheduler.Job;
Expand Down Expand Up @@ -113,18 +113,18 @@ class DefaultSchedulerPersistenceTest {
.withConfiguration(implementationJson)
.withTombstone(false);

final Column column = new Column()
final Field field = new Field()
.withDataType(DataType.STRING)
.withName("id")
.withSelected(true);

final Table table = new Table()
final Stream stream = new Stream()
.withName("users")
.withColumns(Lists.newArrayList(column))
.withFields(Lists.newArrayList(field))
.withSelected(true);

final Schema schema = new Schema()
.withTables(Lists.newArrayList(table));
.withStreams(Lists.newArrayList(stream));

final UUID connectionId = UUID.randomUUID();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,63 +25,63 @@
package io.dataline.server.converters;

import io.dataline.api.model.SourceSchema;
import io.dataline.api.model.SourceSchemaColumn;
import io.dataline.api.model.SourceSchemaTable;
import io.dataline.api.model.SourceSchemaField;
import io.dataline.api.model.SourceSchemaStream;
import io.dataline.commons.enums.Enums;
import io.dataline.config.Column;
import io.dataline.config.DataType;
import io.dataline.config.Field;
import io.dataline.config.Schema;
import io.dataline.config.Table;
import io.dataline.config.Stream;
import java.util.List;
import java.util.stream.Collectors;

public class SchemaConverter {

public static Schema toPersistenceSchema(SourceSchema sourceSchema) {
final List<Table> persistenceTables =
sourceSchema.getTables().stream()
final List<Stream> persistenceStreams =
sourceSchema.getStreams().stream()
.map(
apiTable -> {
final List<Column> persistenceColumns =
apiTable.getColumns().stream()
apiStream -> {
final List<Field> persistenceFields =
apiStream.getFields().stream()
.map(
apiColumn -> new Column()
.withName(apiColumn.getName())
.withDataType(Enums.convertTo(apiColumn.getDataType(), DataType.class))
.withSelected(apiColumn.getSelected()))
apiField -> new Field()
.withName(apiField.getName())
.withDataType(Enums.convertTo(apiField.getDataType(), DataType.class))
.withSelected(apiField.getSelected()))
.collect(Collectors.toList());

return new Table()
.withName(apiTable.getName())
.withColumns(persistenceColumns)
.withSelected(persistenceColumns.stream().anyMatch(Column::getSelected));
return new Stream()
.withName(apiStream.getName())
.withFields(persistenceFields)
.withSelected(persistenceFields.stream().anyMatch(Field::getSelected));
})
.collect(Collectors.toList());

return new Schema().withTables(persistenceTables);
return new Schema().withStreams(persistenceStreams);
}

public static SourceSchema toApiSchema(Schema persistenceSchema) {
final List<SourceSchemaTable> persistenceTables =
persistenceSchema.getTables().stream()
final List<SourceSchemaStream> persistenceStreams =
persistenceSchema.getStreams().stream()
.map(
persistenceTable -> {
final List<SourceSchemaColumn> apiColumns =
persistenceTable.getColumns().stream()
persistenceStream -> {
final List<SourceSchemaField> apiFields =
persistenceStream.getFields().stream()
.map(
persistenceColumn -> new SourceSchemaColumn()
.name(persistenceColumn.getName())
.dataType(Enums.convertTo(persistenceColumn.getDataType(), io.dataline.api.model.DataType.class))
.selected(persistenceColumn.getSelected()))
persistenceField -> new SourceSchemaField()
.name(persistenceField.getName())
.dataType(Enums.convertTo(persistenceField.getDataType(), io.dataline.api.model.DataType.class))
.selected(persistenceField.getSelected()))
.collect(Collectors.toList());

return new SourceSchemaTable()
.name(persistenceTable.getName())
.columns(apiColumns);
return new SourceSchemaStream()
.name(persistenceStream.getName())
.fields(apiFields);
})
.collect(Collectors.toList());

return new SourceSchema().tables(persistenceTables);
return new SourceSchema().streams(persistenceStreams);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public ConnectionRead createConnection(ConnectionCreate connectionCreate)
if (connectionCreate.getSyncSchema() != null) {
standardSync.withSchema(SchemaConverter.toPersistenceSchema(connectionCreate.getSyncSchema()));
} else {
standardSync.withSchema(new Schema().withTables(Collections.emptyList()));
standardSync.withSchema(new Schema().withStreams(Collections.emptyList()));
}

configRepository.writeStandardSync(standardSync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public SourceImplementationDiscoverSchemaRead discoverSchemaForSourceImplementat

final StandardDiscoverSchemaOutput output = job.getOutput().map(JobOutput::getDiscoverSchema)
// the job should always produce an output, but if does not, we fall back on an empty schema.
.orElse(new StandardDiscoverSchemaOutput().withSchema(new Schema().withTables(Collections.emptyList())));
.orElse(new StandardDiscoverSchemaOutput().withSchema(new Schema().withStreams(Collections.emptyList())));

LOGGER.debug("output = " + output);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,30 @@

import com.google.common.collect.Lists;
import io.dataline.api.model.SourceSchema;
import io.dataline.api.model.SourceSchemaColumn;
import io.dataline.api.model.SourceSchemaTable;
import io.dataline.api.model.SourceSchemaField;
import io.dataline.api.model.SourceSchemaStream;
import io.dataline.commons.enums.Enums;
import io.dataline.config.Column;
import io.dataline.config.DataType;
import io.dataline.config.Field;
import io.dataline.config.Schema;
import io.dataline.config.Table;
import io.dataline.config.Stream;
import org.junit.jupiter.api.Test;

class SchemaConverterTest {

private static final Schema SCHEMA = new Schema()
.withTables(Lists.newArrayList(new Table()
.withStreams(Lists.newArrayList(new Stream()
.withName("users")
.withSelected(true)
.withColumns(Lists.newArrayList(new Column()
.withFields(Lists.newArrayList(new Field()
.withDataType(DataType.STRING)
.withName("id")
.withSelected(true)))));

private static final SourceSchema API_SCHEMA = new SourceSchema()
.tables(Lists.newArrayList(new SourceSchemaTable()
.streams(Lists.newArrayList(new SourceSchemaStream()
.name("users")
.columns(Lists.newArrayList(new SourceSchemaColumn()
.fields(Lists.newArrayList(new SourceSchemaField()
.dataType(io.dataline.api.model.DataType.STRING)
.name("id")
.selected(true)))));
Expand Down
Loading

0 comments on commit c2b96ee

Please sign in to comment.