Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 40: Refactor/cleanup metadata layer #41

Open
wants to merge 41 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
1ceeac1
implement AbstractTestQueries for integration tests
adrdc Mar 15, 2021
f54a8b0
Merge branch 'main' into issue-15-presto-integration-tests
adrdc Mar 15, 2021
856d501
add missing reference to docker client
adrdc Mar 16, 2021
da8de35
Merge branch 'main' into issue-15-presto-integration-tests
adrdc Mar 17, 2021
3930e87
update header in 3 test files
adrdc Mar 17, 2021
dbe84ca
only run integration tests if -Pintegration flag is set
adrdc Mar 18, 2021
4cd341f
use InProcPravega (instead of external docker container)
adrdc Mar 22, 2021
ac70c64
rename test classes
adrdc Mar 22, 2021
60fa0d6
github actions -> jdk11
adrdc Mar 22, 2021
f95d21e
pluggable schema registry. simplified PravegaTableDescriptionSupplier
adrdc Mar 23, 2021
61d0624
working example with confluent
adrdc Mar 24, 2021
7fc93b4
allow list of component streams in multi source stream definition
adrdc Mar 24, 2021
c8d8267
add confluent lib, remove stale/test aws key config
adrdc Mar 24, 2021
c773080
Merge branch 'main' into issue-15-presto-integration-tests
adrdc Mar 24, 2021
a948bc1
add package to unit-test
adrdc Mar 24, 2021
e534d50
Merge branch 'main' into issue-15-presto-integration-tests
adrdc Mar 25, 2021
4b55331
add git rev to class header/borrowed code
adrdc Mar 25, 2021
9341a09
update README
adrdc Mar 25, 2021
8661858
Merge branch 'issue-15-presto-integration-tests' into schema-reg-play…
adrdc Mar 25, 2021
6d63a4f
bad class?
adrdc Mar 25, 2021
3ec71e3
re-add same file
adrdc Mar 25, 2021
534fa5b
add unit test, rename some files
adrdc Mar 25, 2021
7b2a4e0
separate schemas from different tests
adrdc Mar 25, 2021
cd037a9
add unit test for pravega schema registry
adrdc Mar 25, 2021
f9f31c9
unit test for table supplier, test multi source
adrdc Mar 26, 2021
0184eab
use new test util in UT
adrdc Mar 26, 2021
067a3e8
fix issue in integration test. now that we are listing vs. seeding c…
adrdc Mar 26, 2021
e78ccc7
restore formatting
adrdc Mar 26, 2021
c75d005
Merge branch 'main' into schema-reg-play-from-15
adrdc Mar 29, 2021
6905745
add maven location for confluent sr lib
adrdc Mar 29, 2021
e1214f1
Merge branch 'main' into issue-10-schema-registry
adrdc Jun 23, 2021
5bbb17e
remove confluent
adrdc Jun 23, 2021
c4cd4bf
remove spurious changes
adrdc Sep 10, 2021
fbc9508
Merge branch 'main' into issue-40-md-layer-refacto
adrdc Sep 27, 2021
acd5cf4
rename resource files to includce schema
adrdc Sep 27, 2021
5333026
relocate test util files
adrdc Sep 27, 2021
7a9fc48
replicate changes to trino
adrdc Sep 27, 2021
88bb493
change types in trino
adrdc Sep 27, 2021
c928ca7
missed files
adrdc Sep 27, 2021
31ba8be
trino: add missing resource files; use test naming scheme
adrdc Sep 27, 2021
2eede5f
Merge branch 'main' into issue-40-md-layer-refacto
adrdc Nov 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ dependencies {
compile "com.facebook.presto:presto-record-decoder:${prestoVersion}"
compile "com.facebook.presto:presto-spi:${prestoVersion}"
compile "com.facebook.presto:presto-common:${prestoVersion}"

adrdc marked this conversation as resolved.
Show resolved Hide resolved
compile group: 'io.netty', name: 'netty-all', version:"{nettyVersion}"

runtimeOnly "io.airlift:joda-to-java-time-bridge:3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class PravegaConnectorConfig
*/
private File tableDescriptionDir = new File("etc/pravega/");


adrdc marked this conversation as resolved.
Show resolved Hide resolved
@NotNull
public URI getControllerURI()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public PravegaTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
return null;
}

return new PravegaTableHandle(connectorId,
schemaTableName.getSchemaName(),
return new PravegaTableHandle(schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
table.getObjectName(),
table.getObjectType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ private static ReaderType readerType(PravegaProperties properties)

private void buildKVSplits(PravegaTableHandle pravegaTableHandle, ImmutableList.Builder<ConnectorSplit> splits)
{
pravegaTableHandle.getOjectArgs().orElseThrow(() ->
pravegaTableHandle.getObjectArgs().orElseThrow(() ->
new IllegalArgumentException("no KF defined for " + pravegaTableHandle));

for (String kf : pravegaTableHandle.getOjectArgs().get()) {
for (String kf : pravegaTableHandle.getObjectArgs().get()) {
PravegaSplit split =
new PravegaSplit(connectorId,
ObjectType.KV_TABLE,
Expand All @@ -130,7 +130,7 @@ private void buildKVSplits(PravegaTableHandle pravegaTableHandle, ImmutableList.
splits.add(split);
}

log.info("created " + pravegaTableHandle.getOjectArgs().get().size() + " kv splits");
log.info("created " + pravegaTableHandle.getObjectArgs().get().size() + " kv splits");
}

private void buildStreamSplits(final PravegaProperties properties,
Expand All @@ -139,7 +139,7 @@ private void buildStreamSplits(final PravegaProperties properties,
{
// TODO: Enable begin and end cuts to be configurable: https://github.com/pravega/pravega-sql/issues/24
List<String> sourceStreams = multiSourceStream(pravegaTableHandle)
? pravegaTableHandle.getOjectArgs().orElseThrow(
? pravegaTableHandle.getObjectArgs().orElseThrow(
() -> new IllegalArgumentException("no args for multi source table found"))
: Collections.singletonList(pravegaTableHandle.getObjectName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.pravega.connectors.presto;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -67,6 +66,16 @@ public PravegaStreamDescription(PravegaStreamDescription streamDescription, List
this.event = Optional.of(event);
}

public PravegaStreamDescription(PravegaStreamDescription streamDescription, List<PravegaStreamFieldGroup> event, List<String> objectArgs)
{
this.tableName = streamDescription.tableName;
this.schemaName = streamDescription.schemaName;
this.objectName = streamDescription.objectName;
this.objectType = streamDescription.objectType;
this.objectArgs = Optional.of(objectArgs);
this.event = Optional.of(event);
}

@JsonProperty
public Optional<String> getSchemaName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class PravegaStreamFieldGroup
private final String dataFormat;
private final Optional<String> dataSchema;
private final Optional<List<PravegaStreamFieldDescription>> fields;
private final Optional<String> mapping;
private final Optional<String> mapping; // column prefix

@JsonCreator
public PravegaStreamFieldGroup(
Expand Down

Large diffs are not rendered by default.

36 changes: 21 additions & 15 deletions src/main/java/io/pravega/connectors/presto/PravegaTableHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@
public final class PravegaTableHandle
implements ConnectorTableHandle
{
/**
* connector id
*/
private final String connectorId;

/**
* The schema name for this table. Is set through configuration and read
*/
Expand All @@ -56,6 +51,11 @@ public final class PravegaTableHandle
*/
private final String objectName;

/**
* optional
* for ObjectType.STREAM, this is list of composite streams in a multi source stream
* for ObjectType.KV_TABLE this is list of key families
*/
private final Optional<List<String>> objectArgs;

private final List<PravegaObjectSchema> schema;
Expand All @@ -64,7 +64,6 @@ public final class PravegaTableHandle

@JsonCreator
public PravegaTableHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("objectName") String objectName,
Expand All @@ -73,7 +72,6 @@ public PravegaTableHandle(
@JsonProperty("schema") List<PravegaObjectSchema> schema,
@JsonProperty("schemaRegistryGroupId") String schemaRegistryGroupId)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.objectName = requireNonNull(objectName, "objectName is null");
Expand All @@ -83,10 +81,20 @@ public PravegaTableHandle(
this.schemaRegistryGroupId = requireNonNull(schemaRegistryGroupId, "schemaRegistryGroupId is null");
}

@JsonProperty
public String getConnectorId()
public PravegaTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("objectName") String objectName,
@JsonProperty("objectType") ObjectType objectType,
@JsonProperty("objectArgs") Optional<List<String>> objectArgs)
{
return connectorId;
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.objectName = requireNonNull(objectName, "objectName is null");
this.objectType = requireNonNull(objectType, "objectType is null");
this.objectArgs = objectArgs;
this.schema = null;
this.schemaRegistryGroupId = null;
}

@JsonProperty
Expand Down Expand Up @@ -114,7 +122,7 @@ public String getObjectName()
}

@JsonProperty
public Optional<List<String>> getOjectArgs()
public Optional<List<String>> getObjectArgs()
{
return objectArgs;
}
Expand All @@ -139,7 +147,7 @@ public SchemaTableName toSchemaTableName()
@Override
public int hashCode()
{
return Objects.hash(connectorId, schemaName, tableName, objectName, objectType, schema);
return Objects.hash(schemaName, tableName, objectName, objectType, schema);
}

@Override
Expand All @@ -153,8 +161,7 @@ public boolean equals(Object obj)
}

PravegaTableHandle other = (PravegaTableHandle) obj;
return Objects.equals(this.connectorId, other.connectorId)
&& Objects.equals(this.schemaName, other.schemaName)
return Objects.equals(this.schemaName, other.schemaName)
&& Objects.equals(this.tableName, other.tableName)
&& Objects.equals(this.objectName, other.objectName)
&& Objects.equals(this.objectType, other.objectType)
Expand All @@ -165,7 +172,6 @@ public boolean equals(Object obj)
public String toString()
{
return toStringHelper(this)
.add("connectorId", connectorId)
.add("schemaName", schemaName)
.add("tableName", tableName)
.add("objectName", objectName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) Pravega Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.pravega.connectors.presto.schemamanagement;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.annotations.VisibleForTesting;
import io.pravega.connectors.presto.PravegaConnectorConfig;
import io.pravega.connectors.presto.PravegaStreamDescription;
import io.pravega.connectors.presto.PravegaStreamFieldGroup;
import io.pravega.connectors.presto.PravegaTableHandle;

import java.util.ArrayList;
import java.util.List;

public class CompositeSchemaRegistry
implements SchemaSupplier, SchemaRegistry {
private final List<SchemaSupplier> schemaSuppliers;

private final List<SchemaRegistry> schemaRegistries;

public CompositeSchemaRegistry(PravegaConnectorConfig config, JsonCodec<PravegaStreamDescription> streamDescriptionCodec) {
schemaSuppliers = new ArrayList<>();
schemaRegistries = new ArrayList<>();

// local will override, always add first
if (config.getTableDescriptionDir() != null &&
config.getTableDescriptionDir().exists() &&
config.getTableDescriptionDir().isDirectory()) {
LocalSchemaRegistry schemaRegistry =
new LocalSchemaRegistry(config.getTableDescriptionDir(), streamDescriptionCodec);
schemaSuppliers.add(schemaRegistry);
schemaRegistries.add(schemaRegistry);
}

if (config.getSchemaRegistryURI() != null) {
PravegaSchemaRegistry schemaRegistry =
new PravegaSchemaRegistry(config.getControllerURI(), config.getSchemaRegistryURI());
schemaSuppliers.add(schemaRegistry);
schemaRegistries.add(schemaRegistry);
}
}

@VisibleForTesting
public CompositeSchemaRegistry(List<SchemaSupplier> schemaSuppliers, List<SchemaRegistry> schemaRegistries)
{
this.schemaSuppliers = schemaSuppliers;
this.schemaRegistries = schemaRegistries;
}

@Override
public List<String> listSchemas()
{
final List<String> schemas = new ArrayList<>();
schemaSuppliers.forEach(p -> schemas.addAll(p.listSchemas()));
return schemas;
}

@Override
public List<PravegaTableHandle> listTables(String schema)
{
final List<PravegaTableHandle> tables = new ArrayList<>();
schemaSuppliers.forEach(p -> tables.addAll(p.listTables(schema)));
return tables;
}

@Override
public List<PravegaStreamFieldGroup> getSchema(SchemaTableName schemaTableName) {
for (SchemaRegistry schemaRegistry : schemaRegistries) {
List<PravegaStreamFieldGroup> schema = schemaRegistry.getSchema(schemaTableName);
if (schema != null) {
return schema;
}
}
return null;
}

@Override
public PravegaStreamDescription getTable(SchemaTableName schemaTableName)
{
for (SchemaRegistry schemaRegistry : schemaRegistries) {
PravegaStreamDescription streamDescription = schemaRegistry.getTable(schemaTableName);
if (streamDescription != null) {
return streamDescription;
}
}
return null;
}
}
Loading