Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 10: Extend schema registry integration #34

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1ceeac1
implement AbstractTestQueries for integration tests
adrdc Mar 15, 2021
f54a8b0
Merge branch 'main' into issue-15-presto-integration-tests
adrdc Mar 15, 2021
856d501
add missing reference to docker client
adrdc Mar 16, 2021
da8de35
Merge branch 'main' into issue-15-presto-integration-tests
adrdc Mar 17, 2021
3930e87
update header in 3 test files
adrdc Mar 17, 2021
dbe84ca
only run integration tests if -Pintegration flag is set
adrdc Mar 18, 2021
4cd341f
use InProcPravega (instead of external docker container)
adrdc Mar 22, 2021
ac70c64
rename test classes
adrdc Mar 22, 2021
60fa0d6
github actions -> jdk11
adrdc Mar 22, 2021
f95d21e
pluggable schema registry. simplified PravegaTableDescriptionSupplier
adrdc Mar 23, 2021
61d0624
working example with confluent
adrdc Mar 24, 2021
7fc93b4
allow list of component streams in multi source stream definition
adrdc Mar 24, 2021
c8d8267
add confluent lib, remove stale/test aws key config
adrdc Mar 24, 2021
c773080
Merge branch 'main' into issue-15-presto-integration-tests
adrdc Mar 24, 2021
a948bc1
add package to unit-test
adrdc Mar 24, 2021
e534d50
Merge branch 'main' into issue-15-presto-integration-tests
adrdc Mar 25, 2021
4b55331
add git rev to class header/borrowed code
adrdc Mar 25, 2021
9341a09
update README
adrdc Mar 25, 2021
8661858
Merge branch 'issue-15-presto-integration-tests' into schema-reg-play…
adrdc Mar 25, 2021
6d63a4f
bad class?
adrdc Mar 25, 2021
3ec71e3
re-add same file
adrdc Mar 25, 2021
534fa5b
add unit test, rename some files
adrdc Mar 25, 2021
7b2a4e0
separate schemas from different tests
adrdc Mar 25, 2021
cd037a9
add unit test for pravega schema registry
adrdc Mar 25, 2021
f9f31c9
unit test for table supplier, test multi source
adrdc Mar 26, 2021
0184eab
use new test util in UT
adrdc Mar 26, 2021
067a3e8
fix issue in integration test. now that we are listing vs. seeding c…
adrdc Mar 26, 2021
e78ccc7
restore formatting
adrdc Mar 26, 2021
c75d005
Merge branch 'main' into schema-reg-play-from-15
adrdc Mar 29, 2021
6905745
add maven location for confluent sr lib
adrdc Mar 29, 2021
e1214f1
Merge branch 'main' into issue-10-schema-registry
adrdc Jun 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ repositories {
maven {
url = uri('https://repo.maven.apache.org/maven2')
}

maven {
url = uri('https://packages.confluent.io/maven/')
}
}

dependencies {
Expand Down Expand Up @@ -56,6 +60,8 @@ dependencies {
compile "com.facebook.presto:presto-spi:${prestoVersion}"
compile "com.facebook.presto:presto-common:${prestoVersion}"

compile "io.confluent:kafka-schema-registry-client:${confluentVersion}"

compile group: 'io.netty', name: 'netty-all', version:"{nettyVersion}"

runtimeOnly "io.airlift:joda-to-java-time-bridge:3"
Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ airliftTestingVersion=0.191
airliftUnitsVersion=1.3
avroVersion=1.8.1
commonsVersion=3.7
confluentVersion=6.1.0
checkstyleToolVersion=8.23
everitJsonSchemaVersion=1.12.1
guavaVersion=26.0-jre
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class PravegaConnectorConfig
*/
private File tableDescriptionDir = new File("etc/pravega/");


private URI confluentSchemaRegistry;

@NotNull
public URI getControllerURI()
{
Expand Down Expand Up @@ -105,4 +108,16 @@ public PravegaConnectorConfig setHideInternalColumns(boolean hideInternalColumns
this.hideInternalColumns = hideInternalColumns;
return this;
}

@Config("pravega.confluentSchemaRegistry")
public PravegaConnectorConfig setConfluentSchemaRegistry(URI confluentSchemaRegistry)
{
this.confluentSchemaRegistry = confluentSchemaRegistry;
return this;
}

public URI getConfluentSchemaRegistry()
{
return confluentSchemaRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public PravegaTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
return null;
}

return new PravegaTableHandle(connectorId,
schemaTableName.getSchemaName(),
return new PravegaTableHandle(schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
table.getObjectName(),
table.getObjectType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ private static ReaderType readerType(PravegaProperties properties)

private void buildKVSplits(PravegaTableHandle pravegaTableHandle, ImmutableList.Builder<ConnectorSplit> splits)
{
pravegaTableHandle.getOjectArgs().orElseThrow(() ->
pravegaTableHandle.getObjectArgs().orElseThrow(() ->
new IllegalArgumentException("no KF defined for " + pravegaTableHandle));

for (String kf : pravegaTableHandle.getOjectArgs().get()) {
for (String kf : pravegaTableHandle.getObjectArgs().get()) {
PravegaSplit split =
new PravegaSplit(connectorId,
ObjectType.KV_TABLE,
Expand All @@ -130,7 +130,7 @@ private void buildKVSplits(PravegaTableHandle pravegaTableHandle, ImmutableList.
splits.add(split);
}

log.info("created " + pravegaTableHandle.getOjectArgs().get().size() + " kv splits");
log.info("created " + pravegaTableHandle.getObjectArgs().get().size() + " kv splits");
}

private void buildStreamSplits(final PravegaProperties properties,
Expand All @@ -139,7 +139,7 @@ private void buildStreamSplits(final PravegaProperties properties,
{
// TODO: Enable begin and end cuts to be configurable: https://github.com/pravega/pravega-sql/issues/24
List<String> sourceStreams = multiSourceStream(pravegaTableHandle)
? pravegaTableHandle.getOjectArgs().orElseThrow(
? pravegaTableHandle.getObjectArgs().orElseThrow(
() -> new IllegalArgumentException("no args for multi source table found"))
: Collections.singletonList(pravegaTableHandle.getObjectName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.pravega.connectors.presto;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -67,6 +66,16 @@ public PravegaStreamDescription(PravegaStreamDescription streamDescription, List
this.event = Optional.of(event);
}

public PravegaStreamDescription(PravegaStreamDescription streamDescription, List<PravegaStreamFieldGroup> event, List<String> objectArgs)
{
this.tableName = streamDescription.tableName;
this.schemaName = streamDescription.schemaName;
this.objectName = streamDescription.objectName;
this.objectType = streamDescription.objectType;
this.objectArgs = Optional.of(objectArgs);
this.event = Optional.of(event);
}

@JsonProperty
public Optional<String> getSchemaName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class PravegaStreamFieldGroup
private final String dataFormat;
private final Optional<String> dataSchema;
private final Optional<List<PravegaStreamFieldDescription>> fields;
private final Optional<String> mapping;
private final Optional<String> mapping; // column prefix

@JsonCreator
public PravegaStreamFieldGroup(
Expand Down
Loading