diff --git a/build.gradle b/build.gradle index bd33990..9a26e81 100644 --- a/build.gradle +++ b/build.gradle @@ -24,6 +24,10 @@ repositories { maven { url = uri('https://repo.maven.apache.org/maven2') } + + maven { + url = uri('https://packages.confluent.io/maven/') + } } dependencies { @@ -56,6 +60,8 @@ dependencies { compile "com.facebook.presto:presto-spi:${prestoVersion}" compile "com.facebook.presto:presto-common:${prestoVersion}" + compile "io.confluent:kafka-schema-registry-client:${confluentVersion}" + compile group: 'io.netty', name: 'netty-all', version:"{nettyVersion}" runtimeOnly "io.airlift:joda-to-java-time-bridge:3" diff --git a/gradle.properties b/gradle.properties index 4db7ff3..cdb197a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -18,6 +18,7 @@ airliftTestingVersion=0.191 airliftUnitsVersion=1.3 avroVersion=1.8.1 commonsVersion=3.7 +confluentVersion=6.1.0 checkstyleToolVersion=8.23 everitJsonSchemaVersion=1.12.1 guavaVersion=26.0-jre diff --git a/src/main/java/io/pravega/connectors/presto/PravegaConnectorConfig.java b/src/main/java/io/pravega/connectors/presto/PravegaConnectorConfig.java index e209adc..e4b086c 100644 --- a/src/main/java/io/pravega/connectors/presto/PravegaConnectorConfig.java +++ b/src/main/java/io/pravega/connectors/presto/PravegaConnectorConfig.java @@ -50,6 +50,9 @@ public class PravegaConnectorConfig */ private File tableDescriptionDir = new File("etc/pravega/"); + + private URI confluentSchemaRegistry; + @NotNull public URI getControllerURI() { @@ -105,4 +108,16 @@ public PravegaConnectorConfig setHideInternalColumns(boolean hideInternalColumns this.hideInternalColumns = hideInternalColumns; return this; } + + @Config("pravega.confluentSchemaRegistry") + public PravegaConnectorConfig setConfluentSchemaRegistry(URI confluentSchemaRegistry) + { + this.confluentSchemaRegistry = confluentSchemaRegistry; + return this; + } + + public URI getConfluentSchemaRegistry() + { + return confluentSchemaRegistry; + } } diff --git a/src/main/java/io/pravega/connectors/presto/PravegaMetadata.java b/src/main/java/io/pravega/connectors/presto/PravegaMetadata.java index 93744d1..21bbddd 100644 --- a/src/main/java/io/pravega/connectors/presto/PravegaMetadata.java +++ b/src/main/java/io/pravega/connectors/presto/PravegaMetadata.java @@ -104,8 +104,7 @@ public PravegaTableHandle getTableHandle(ConnectorSession session, SchemaTableNa return null; } - return new PravegaTableHandle(connectorId, - schemaTableName.getSchemaName(), + return new PravegaTableHandle(schemaTableName.getSchemaName(), schemaTableName.getTableName(), table.getObjectName(), table.getObjectType(), diff --git a/src/main/java/io/pravega/connectors/presto/PravegaSplitManager.java b/src/main/java/io/pravega/connectors/presto/PravegaSplitManager.java index 4f4a7d0..81f4630 100644 --- a/src/main/java/io/pravega/connectors/presto/PravegaSplitManager.java +++ b/src/main/java/io/pravega/connectors/presto/PravegaSplitManager.java @@ -116,10 +116,10 @@ private static ReaderType readerType(PravegaProperties properties) private void buildKVSplits(PravegaTableHandle pravegaTableHandle, ImmutableList.Builder splits) { - pravegaTableHandle.getOjectArgs().orElseThrow(() -> + pravegaTableHandle.getObjectArgs().orElseThrow(() -> new IllegalArgumentException("no KF defined for " + pravegaTableHandle)); - for (String kf : pravegaTableHandle.getOjectArgs().get()) { + for (String kf : pravegaTableHandle.getObjectArgs().get()) { PravegaSplit split = new PravegaSplit(connectorId, ObjectType.KV_TABLE, @@ -130,7 +130,7 @@ private void buildKVSplits(PravegaTableHandle pravegaTableHandle, ImmutableList. splits.add(split); } - log.info("created " + pravegaTableHandle.getOjectArgs().get().size() + " kv splits"); + log.info("created " + pravegaTableHandle.getObjectArgs().get().size() + " kv splits"); } private void buildStreamSplits(final PravegaProperties properties, @@ -139,7 +139,7 @@ private void buildStreamSplits(final PravegaProperties properties, { // TODO: Enable begin and end cuts to be configurable: https://github.com/pravega/pravega-sql/issues/24 List sourceStreams = multiSourceStream(pravegaTableHandle) - ? pravegaTableHandle.getOjectArgs().orElseThrow( + ? pravegaTableHandle.getObjectArgs().orElseThrow( () -> new IllegalArgumentException("no args for multi source table found")) : Collections.singletonList(pravegaTableHandle.getObjectName()); diff --git a/src/main/java/io/pravega/connectors/presto/PravegaStreamDescription.java b/src/main/java/io/pravega/connectors/presto/PravegaStreamDescription.java index 77454d3..c98f94b 100644 --- a/src/main/java/io/pravega/connectors/presto/PravegaStreamDescription.java +++ b/src/main/java/io/pravega/connectors/presto/PravegaStreamDescription.java @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package io.pravega.connectors.presto; import com.fasterxml.jackson.annotation.JsonCreator; @@ -67,6 +66,16 @@ public PravegaStreamDescription(PravegaStreamDescription streamDescription, List this.event = Optional.of(event); } + public PravegaStreamDescription(PravegaStreamDescription streamDescription, List event, List objectArgs) + { + this.tableName = streamDescription.tableName; + this.schemaName = streamDescription.schemaName; + this.objectName = streamDescription.objectName; + this.objectType = streamDescription.objectType; + this.objectArgs = Optional.of(objectArgs); + this.event = Optional.of(event); + } + @JsonProperty public Optional getSchemaName() { diff --git a/src/main/java/io/pravega/connectors/presto/PravegaStreamFieldGroup.java b/src/main/java/io/pravega/connectors/presto/PravegaStreamFieldGroup.java index 1164df4..b211214 100644 --- a/src/main/java/io/pravega/connectors/presto/PravegaStreamFieldGroup.java +++ b/src/main/java/io/pravega/connectors/presto/PravegaStreamFieldGroup.java @@ -33,7 +33,7 @@ public class PravegaStreamFieldGroup private final String dataFormat; private final Optional dataSchema; private final Optional> fields; - private final Optional mapping; + private final Optional mapping; // column prefix @JsonCreator public PravegaStreamFieldGroup( diff --git a/src/main/java/io/pravega/connectors/presto/PravegaTableDescriptionSupplier.java b/src/main/java/io/pravega/connectors/presto/PravegaTableDescriptionSupplier.java index b638f33..a1c764a 100644 --- a/src/main/java/io/pravega/connectors/presto/PravegaTableDescriptionSupplier.java +++ b/src/main/java/io/pravega/connectors/presto/PravegaTableDescriptionSupplier.java @@ -21,50 +21,21 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Streams; -import io.pravega.client.ClientConfig; -import io.pravega.client.admin.StreamManager; -import io.pravega.client.stream.Stream; -import io.pravega.schemaregistry.client.SchemaRegistryClient; -import io.pravega.schemaregistry.client.SchemaRegistryClientConfig; -import io.pravega.schemaregistry.client.SchemaRegistryClientFactory; -import io.pravega.schemaregistry.contract.data.GroupProperties; -import io.pravega.schemaregistry.contract.data.SchemaWithVersion; -import io.pravega.schemaregistry.contract.data.SerializationFormat; + +import io.pravega.connectors.presto.schemamanagement.CompositeSchemaRegistry; import javax.inject.Inject; -import java.io.File; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; +import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static io.pravega.connectors.presto.ProtobufCommon.encodeSchema; -import static io.pravega.connectors.presto.util.PravegaNameUtils.groupId; -import static io.pravega.connectors.presto.util.PravegaNameUtils.kvFieldMapping; -import static io.pravega.connectors.presto.util.PravegaNameUtils.kvTable; import static io.pravega.connectors.presto.util.PravegaNameUtils.multiSourceStream; -import static io.pravega.connectors.presto.util.PravegaNameUtils.temp_streamNameToTableName; -import static io.pravega.connectors.presto.util.PravegaNameUtils.temp_tableNameToStreamName; -import static io.pravega.connectors.presto.util.PravegaSchemaUtils.GROUP_PROPERTIES_INLINE_KEY; -import static io.pravega.connectors.presto.util.PravegaSchemaUtils.GROUP_PROPERTIES_INLINE_KV_KEY; -import static io.pravega.connectors.presto.util.PravegaSchemaUtils.GROUP_PROPERTIES_INLINE_KV_VALUE; -import static io.pravega.connectors.presto.util.PravegaSchemaUtils.INLINE_SUFFIX; -import static io.pravega.connectors.presto.util.PravegaSchemaUtils.readSchema; -import static io.pravega.connectors.presto.util.PravegaStreamDescUtils.mapFieldsFromSchema; -import static java.nio.file.Files.readAllBytes; -import static java.util.Arrays.asList; import static java.util.Objects.requireNonNull; // pravega scope is a namespace for streams. stream is unique within scope. @@ -83,28 +54,28 @@ public class PravegaTableDescriptionSupplier { private static final Logger log = Logger.get(PravegaTableDescriptionSupplier.class); - private final PravegaConnectorConfig pravegaConnectorConfig; + private final CompositeSchemaRegistry schemaRegistry; private Cache schemaCache; private Cache> tableCache; + // whether we have listed tables from this schema or not + private final HashMap tableListMap = new HashMap<>(); + private JsonCodec streamDescriptionCodec; - // "inline" means that event was written using schema registry wrapped client and schema encoding id - // is within the raw event data in pravega @Inject PravegaTableDescriptionSupplier(PravegaConnectorConfig pravegaConnectorConfig, JsonCodec streamDescriptionCodec) { requireNonNull(pravegaConnectorConfig, "pravegaConfig is null"); - this.pravegaConnectorConfig = pravegaConnectorConfig; this.streamDescriptionCodec = streamDescriptionCodec; // there will be many successive calls to listSchemas + listTables in short time period // do not reach out to pravega each time as it is unlikely things would have changed // enhancement issue - can we determine if there are changes/removals and selectively update? - // https://github.com/StreamingDataPlatform/pravega-sql/issues/101 + // https://github.com/pravega/presto-connector/issues/30 this.schemaCache = CacheBuilder.newBuilder() .expireAfterWrite(pravegaConnectorConfig.getTableCacheExpireSecs(), TimeUnit.SECONDS) .build(); @@ -112,16 +83,17 @@ public class PravegaTableDescriptionSupplier this.tableCache = CacheBuilder.newBuilder() .expireAfterWrite(pravegaConnectorConfig.getTableCacheExpireSecs(), TimeUnit.SECONDS) .build(); + + this.schemaRegistry = new CompositeSchemaRegistry(pravegaConnectorConfig, streamDescriptionCodec); } @VisibleForTesting - public PravegaTableDescriptionSupplier(PravegaConnectorConfig pravegaConnectorConfig, - Cache schemaCache, - Cache> tableCache) + public PravegaTableDescriptionSupplier(CompositeSchemaRegistry schemaRegistry) { - this.pravegaConnectorConfig = pravegaConnectorConfig; - this.schemaCache = schemaCache; - this.tableCache = tableCache; + this.schemaRegistry = schemaRegistry; + + this.schemaCache = CacheBuilder.newBuilder().build(); + this.tableCache = CacheBuilder.newBuilder().build(); } public List listSchemas() @@ -130,94 +102,72 @@ public List listSchemas() // they are inserted to cache at same time so will all be same state final List schemas = schemaCache.asMap().keySet().stream().collect(Collectors.toList()); if (schemas.isEmpty()) { - listLocalSchemas().forEach(schema -> schemaCache.put(schema, new Object())); - - try (StreamManager streamManager = - StreamManager.create(ClientConfig.builder().controllerURI( - pravegaConnectorConfig.getControllerURI()).build())) { - Streams.stream(streamManager.listScopes()).filter(s -> !internalObject(s)).forEach(schema -> { - schemas.add(schema); - schemaCache.put(schema, new Object()); - }); - } + schemaRegistry.listSchemas().forEach(schema -> { + schemas.add(schema); + schemaCache.put(schema, new Object()); + }); } else { - log.info("serving listSchemas() from cache"); + log.debug("serving listSchemas() from cache"); } return schemas; } public List listTables(Optional schema) { - List schemas = schema.isPresent() ? Collections.singletonList(schema.get()) : listSchemas(); + List schemas = schema.map(Collections::singletonList).orElseGet(this::listSchemas); - StreamManager streamManager = null; + List tableList = new ArrayList<>(); - try { - List tableList = new ArrayList<>(); + for (String s : schemas) { + List tableListForSchema = + tableCache.asMap().keySet().stream() + .filter(streamDesc -> streamDesc.getSchemaTableName().getSchemaName().startsWith(s)) + .collect(Collectors.toList()); - for (String s : schemas) { - List tableListForSchema = - tableCache.asMap().keySet().stream() - .filter(streamDesc -> streamDesc.getSchemaTableName().getSchemaName().startsWith(s)) - .collect(Collectors.toList()); + if (tableListForSchema.isEmpty() || tableListMap.get(s) == null) { - // not all tables inserted to cache at same time - if (tableListForSchema.isEmpty()) { - if (streamManager == null) { - streamManager = StreamManager.create( - ClientConfig.builder().controllerURI(pravegaConnectorConfig.getControllerURI()).build()); - } + List compositeStreams = new ArrayList<>(); - List compositeStreams = new ArrayList<>(); + schemaRegistry.listTables(s).forEach(table -> { - // local takes precedence. list before pravega. ifAbsent used later to not clobber. - listLocalTables(s).forEach(table -> { - PravegaTableName pravegaTableName = new PravegaTableName(s, table); + // we hide component streams (components of multi-source streams) from view + boolean hidden = + compositeStreams.stream().anyMatch(p -> p.matcher(table.getTableName()).matches()); - // don't clobber existing entry - if (tableCache.getIfPresent(pravegaTableName) == null || - !tableCache.getIfPresent(pravegaTableName).isPresent()) { - tableCache.put(pravegaTableName, Optional.empty()); - } + PravegaTableName pravegaTableName = new PravegaTableName(s, table.getTableName(), hidden); - // load .json def to get stream name in order to determine type - PravegaStreamDescription localTable = getLocalTable(pravegaTableName.getSchemaTableName()); - if (multiSourceStream(localTable)) { - compositeStreams.add(Pattern.compile(localTable.getObjectName())); - } - }); - - // (underlying streams used by kv table are seen as internal and thus are skipped) - Streams.stream(streamManager.listStreams(s)) - .filter(stream -> !internalStream(stream)) - .forEach(stream -> { - boolean hidden = - compositeStreams.stream().anyMatch(p -> p.matcher(stream.getStreamName()).matches()); - // ifAbsent - don't clobber table description if we have it - PravegaTableName tableName = new PravegaTableName(s, temp_streamNameToTableName(stream.getStreamName()), hidden); - if (tableCache.getIfPresent(tableName) == null || - !tableCache.getIfPresent(tableName).isPresent()) { - tableCache.put(tableName, Optional.empty()); - } + // don't clobber existing entry + if (tableCache.getIfPresent(pravegaTableName) == null || + !tableCache.getIfPresent(pravegaTableName).isPresent()) { + tableCache.put(pravegaTableName, Optional.empty()); + } + + if (multiSourceStream(table)) { + // if component streams specified look for exact match when hiding + if (table.getObjectArgs().isPresent()) { + table.getObjectArgs().get().forEach(arg -> { + compositeStreams.add(Pattern.compile("^" + arg + "$")); }); - } - else { - log.info("serving listTables(%s) from cache", s); - } - - tableList.addAll(tableCache.asMap().keySet().stream() - .filter(pravegaStreamDescription -> - pravegaStreamDescription.getSchemaTableName().getSchemaName().startsWith(s)) - .collect(Collectors.toList())); + } + else { + // regex, fuzzy match + compositeStreams.add(Pattern.compile(table.getObjectName())); + } + } + }); + tableListMap.put(s, true); // we have now listed tables from the schema } - return tableList; - } - finally { - if (streamManager != null) { - streamManager.close(); + else { + log.debug("serving listTables(%s) from cache", s); } + + tableList.addAll(tableCache.asMap().keySet().stream() + .filter(pravegaStreamDescription -> + pravegaStreamDescription.getSchemaTableName().getSchemaName().startsWith(s)) + .collect(Collectors.toList())); } + return tableList; } public PravegaStreamDescription getTable(SchemaTableName schemaTableName) @@ -225,278 +175,63 @@ public PravegaStreamDescription getTable(SchemaTableName schemaTableName) PravegaTableName pravegaTableName = new PravegaTableName(schemaTableName); Optional cachedTable = tableCache.getIfPresent(pravegaTableName); if (cachedTable != null && cachedTable.isPresent()) { - log.info("serving getTable(%s) from cache", schemaTableName); + log.debug("serving getTable(%s) from cache", schemaTableName); return cachedTable.get(); } - PravegaStreamDescription table = getLocalTable(schemaTableName); - if (table != null) { - log.info("found local schema for '%s'", schemaTableName); - - // kv this is list of key family (defined in local schema file) - // for multi source stream this is list of composite streams (empty here, to be filled in later) - Optional> objectArgs = table.getObjectArgs(); - - // field definitions can come from 1 of 4 places - // (1) defined in local .json schema ("event/fields") - // (2) uri in "dataSchema" field - // (3) lookup from a source stream (if multi source stream) - // (4) lookup directly in schema registry (if kv table) - - Optional> fieldGroups = Optional.empty(); + PravegaStreamDescription table = schemaRegistry.getTable(schemaTableName); + if (table == null) { + return null; + } - if (fieldsDefined(table)) { - // case (1) - no-op - log.info("fields defined in schema file %s", schemaTableName); - fieldGroups = Optional.of(new LinkedList<>(table.getEvent().get())); - } - else if (table.getEvent().isPresent() && - table.getEvent().get().get(0).getDataSchema().isPresent()) { - fieldGroups = Optional.of(new LinkedList<>()); - - // case (2) uri containing schema - List finalFieldGroups = fieldGroups.get(); - for (int i = 0; i < table.getEvent().get().size(); i++) { - PravegaStreamFieldGroup event = table.getEvent().get().get(i); - String colPrefix = event.getMapping().orElse( - table.getEvent().get().size() > 1 ? kvFieldMapping(i) : ""); - Optional dataSchema = Optional.of(readSchema(event.getDataSchema().get())); - PravegaStreamFieldGroup fieldGroup = - new PravegaStreamFieldGroup(event.getDataFormat(), - Optional.empty(), - dataSchema, - Optional.of( - mapFieldsFromSchema(colPrefix, event.getDataFormat(), dataSchema.get()))); - finalFieldGroups.add(fieldGroup); - } - } - else if (kvTable(table)) { - fieldGroups = fieldGroupsFromSchemaRegistry(schemaTableName); + if (multiSourceStream(table)) { + // if component streams not already specified, look them up from pravega based on regex + List objectArgs = table.getObjectArgs().isPresent() + ? table.getObjectArgs().get() + : multiSourceStreamComponents(schemaTableName, table.getObjectName()); + if (objectArgs.isEmpty()) { + throw new IllegalArgumentException("could not get component streams for " + schemaTableName); } - if (multiSourceStream(table)) { - // stream name will be some regex. - // find all the possible source streams. - Pattern pattern = Pattern.compile(table.getObjectName()); - - List sourceTableNames = - listTables(Optional.of(schemaTableName.getSchemaName())).stream() - .filter(t -> pattern.matcher(t.getSchemaTableName().getTableName()).matches()) - .collect(Collectors.toList()); - - objectArgs = Optional.of(sourceTableNames.stream() - .map(PravegaTableName::getSchemaTableName) - .map(SchemaTableName::getTableName) - .collect(Collectors.toList())); - - if (!fieldGroups.isPresent()) { - // case (3) schema not already defined, look one up - // lookup actual schema from any of them - implies all sources are the same - PravegaStreamDescription sourceTable = sourceTableNames.isEmpty() - ? null - : getTable(sourceTableNames.get(0).getSchemaTableName()); - if (sourceTable == null) { - throw new IllegalArgumentException("no stream found for multi source"); - } - fieldGroups = Optional.of(new LinkedList<>()); - fieldGroups.get().add(new PravegaStreamFieldGroup( - sourceTable.getEvent().get().get(0).getDataFormat(), - Optional.empty(), - sourceTable.getEvent().get().get(0).getDataSchema(), - Optional.of(sourceTable.getEvent().get().get(0).getFields()))); - } + List fieldGroups = table.getEvent().orElse(new ArrayList<>(1)); + if (fieldGroups.isEmpty()) { + fieldGroups = schemaRegistry.getSchema(new SchemaTableName(schemaTableName.getSchemaName(), objectArgs.get(0))); } - fieldGroups.orElseThrow(() -> - new IllegalArgumentException("unable to determine schema for " + schemaTableName)); - - // our final table definition. use schema that we looked up, and set all source stream names here - table = new PravegaStreamDescription(schemaTableName.getTableName(), - Optional.of(schemaTableName.getSchemaName()), - table.getObjectName(), - Optional.of(table.getObjectType()), - objectArgs, - fieldGroups); - - tableCache.put(pravegaTableName, Optional.of(table)); - return table; + table = new PravegaStreamDescription(table, fieldGroups, objectArgs); + } + else if (!fieldsDefined(table)) { + table = new PravegaStreamDescription(table, schemaRegistry.getSchema(schemaTableName)); } - Optional> fieldGroups = fieldGroupsFromSchemaRegistry(schemaTableName); - - table = new PravegaStreamDescription( - schemaTableName.getTableName(), - Optional.of(schemaTableName.getSchemaName()), - temp_tableNameToStreamName(schemaTableName.getTableName()), - Optional.of(ObjectType.STREAM), - Optional.empty() /* args */, - fieldGroups); tableCache.put(pravegaTableName, Optional.of(table)); return table; } - /** - * construct PravegaStreamFieldGroup by looking up schema in schema registry - * - * @param schemaTableName - * @return - */ - private Optional> fieldGroupsFromSchemaRegistry(final SchemaTableName schemaTableName) - { - log.info("look up description of '%s' from pravega", schemaTableName); - String groupName = groupId(schemaTableName.getSchemaName(), temp_tableNameToStreamName(schemaTableName.getTableName())); - - SchemaRegistryClientConfig registryConfig = - SchemaRegistryClientConfig.builder() - .schemaRegistryUri(pravegaConnectorConfig.getSchemaRegistryURI()).build(); - SchemaRegistryClient registryClient = SchemaRegistryClientFactory.withDefaultNamespace(registryConfig); - - List fieldGroups = new ArrayList<>(2); - - GroupProperties properties = - registryClient.getGroupProperties(groupName); - - List schemas = registryClient.getSchemas(groupName); - if (schemas.size() == 0 || schemas.size() > 2) { - throw new IllegalStateException(schemaTableName + " has " + schemas.size() + " registered schemas. expecting either 1 or 2"); - } - - // kv table will have > 1 schema. key+value likely different types - boolean kv = schemas.size() > 1; - - for (int i = 0; i < schemas.size(); i++) { - // colPrefix used for display so can differentiate between fields from key or value - String colPrefix = kv ? kvFieldMapping(i) : ""; - - SerializationFormat format = schemas.get(i).getSchemaInfo().getSerializationFormat(); - fieldGroups.add(new PravegaStreamFieldGroup( - dataFormat(properties.getProperties(), format, kv, i), - Optional.of(colPrefix), - dataSchema(format, schemas.get(i)), - Optional.of(mapFieldsFromSchema(colPrefix, format, schemas.get(i))))); - } - - return Optional.of(fieldGroups); - } - - private static boolean fieldsDefined(PravegaStreamDescription table) - { - // event is optional, fields within event is also optional - // for kv table - 0 or 2 schemas. so fine to just check for 1. - return table.getEvent().isPresent() && (table.getEvent().get().get(0).getFields() != null); - } - - private List listLocalSchemas() + private List multiSourceStreamComponents(SchemaTableName schemaTableName, String sourcePattern) { - return localSchemaStream() - .map(file -> file.getName().split("\\.")[0]) - .collect(Collectors.toList()); - } + Pattern pattern = Pattern.compile(sourcePattern); - // scope.stream -> schema.table - private List listLocalTables(String schema) - { - return localSchemaStream() - .filter(file -> file.getName().endsWith(".json")) - .filter(file -> file.getName().startsWith(schema)) - .filter(file -> file.getName().split("\\.").length == 3) - .map(file -> file.getName().split("\\.")[1]) + return listTables(Optional.of(schemaTableName.getSchemaName())).stream() + .map(PravegaTableName::getSchemaTableName) + .map(SchemaTableName::getTableName) + .filter(tableName -> pattern.matcher(tableName).matches()) .collect(Collectors.toList()); } - private PravegaStreamDescription getLocalTable(SchemaTableName schemaTableName) + private static boolean fieldsDefined(PravegaStreamDescription table) { - try { - File file = new File(pravegaConnectorConfig.getTableDescriptionDir(), - String.format("%s.%s.json", schemaTableName.getSchemaName(), schemaTableName.getTableName())); - if (!file.exists()) { - return null; - } - return streamDescriptionCodec.fromJson(readAllBytes(file.toPath())); - } - catch (IOException e) { - log.error("%s", e); - throw new UncheckedIOException(e); + if (!table.getEvent().isPresent() || + table.getEvent().get().isEmpty()) { + return false; } - catch (RuntimeException e) { - log.error("%s", e); - throw e; - } - } - private java.util.stream.Stream localSchemaStream() - { - return listFiles(pravegaConnectorConfig.getTableDescriptionDir()).stream() - .filter(file -> file.isFile() && file.getName().endsWith(".json")); - } - - private static List listFiles(File dir) - { - if ((dir != null) && dir.isDirectory()) { - File[] files = dir.listFiles(); - if (files != null) { - log.debug("Considering files: %s", asList(files)); - return ImmutableList.copyOf(files); + for (PravegaStreamFieldGroup fieldGroup : table.getEvent().get()) { + if (fieldGroup.getFields() == null) { + return false; } - } - return ImmutableList.of(); - } - - private static String dataFormat(ImmutableMap groupProperties, - SerializationFormat format, - boolean kvTable, - int kvIdx) - { - /* - TODO: auto-detect https://github.com/pravega/pravega-sql/issues/58 - - (1) no schema registry. - (2) Register and evolve schemas in registry but do not use registry client while writing data - (3) Register schemas in the registry and use registry client to encode schema Id with payload - "inline" is for #3. for e.g. "avro" -> "avro-inline". PravegaRecordSetProvider is interested in this - hopefully this can all go away (see linked issue 58 above) - - but for now the following is our convention - if "inline" exists in our properties, all data uses SR - else if it is a kv table key+value may be different. both, neither, or either may use SR - look for "inlinekey" / "inlinevalue" - */ - - String key = GROUP_PROPERTIES_INLINE_KEY; - - if (kvTable && !groupProperties.containsKey(key)) { - key = kvIdx == 0 ? GROUP_PROPERTIES_INLINE_KV_KEY : GROUP_PROPERTIES_INLINE_KV_VALUE; } - - String finalFormat = format == SerializationFormat.Custom - ? format.getFullTypeName().toLowerCase(Locale.ENGLISH) - : format.name().toLowerCase(Locale.ENGLISH); - return finalFormat + (groupProperties.containsKey(key) ? INLINE_SUFFIX : ""); - } - - private static Optional dataSchema(SerializationFormat format, SchemaWithVersion schemaWithVersion) - { - // it is intentional that nothing is returned for Custom - // pass schema to row decoders. refer to PravegaRecordSetProvider - switch (format) { - case Protobuf: - return Optional.of(encodeSchema(schemaWithVersion)); - case Avro: - return Optional.of(new String(schemaWithVersion.getSchemaInfo().getSchemaData().array(), StandardCharsets.UTF_8)); - default: - return Optional.empty(); - } - } - - private static boolean internalStream(Stream stream) - { - return internalObject(stream.getStreamName()); - } - - private static boolean internalObject(String object) - { - return object.startsWith("_") /* pravega internal */ || - object.endsWith("-SC") /* application internal - stream cuts */; + return true; } } diff --git a/src/main/java/io/pravega/connectors/presto/PravegaTableHandle.java b/src/main/java/io/pravega/connectors/presto/PravegaTableHandle.java index 4cc23fb..c7456c6 100644 --- a/src/main/java/io/pravega/connectors/presto/PravegaTableHandle.java +++ b/src/main/java/io/pravega/connectors/presto/PravegaTableHandle.java @@ -34,11 +34,6 @@ public final class PravegaTableHandle implements ConnectorTableHandle { - /** - * connector id - */ - private final String connectorId; - /** * The schema name for this table. Is set through configuration and read */ @@ -56,6 +51,11 @@ public final class PravegaTableHandle */ private final String objectName; + /** + * optional + * for ObjectType.STREAM, this is list of composite streams in a multi source stream + * for ObjectType.KV_TABLE this is list of key families + */ private final Optional> objectArgs; private final List schema; @@ -64,7 +64,6 @@ public final class PravegaTableHandle @JsonCreator public PravegaTableHandle( - @JsonProperty("connectorId") String connectorId, @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, @JsonProperty("objectName") String objectName, @@ -73,7 +72,6 @@ public PravegaTableHandle( @JsonProperty("schema") List schema, @JsonProperty("schemaRegistryGroupId") String schemaRegistryGroupId) { - this.connectorId = requireNonNull(connectorId, "connectorId is null"); this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); this.objectName = requireNonNull(objectName, "objectName is null"); @@ -83,10 +81,20 @@ public PravegaTableHandle( this.schemaRegistryGroupId = requireNonNull(schemaRegistryGroupId, "schemaRegistryGroupId is null"); } - @JsonProperty - public String getConnectorId() + public PravegaTableHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") String tableName, + @JsonProperty("objectName") String objectName, + @JsonProperty("objectType") ObjectType objectType, + @JsonProperty("objectArgs") Optional> objectArgs) { - return connectorId; + this.schemaName = requireNonNull(schemaName, "schemaName is null"); + this.tableName = requireNonNull(tableName, "tableName is null"); + this.objectName = requireNonNull(objectName, "objectName is null"); + this.objectType = requireNonNull(objectType, "objectType is null"); + this.objectArgs = objectArgs; + this.schema = null; + this.schemaRegistryGroupId = null; } @JsonProperty @@ -114,7 +122,7 @@ public String getObjectName() } @JsonProperty - public Optional> getOjectArgs() + public Optional> getObjectArgs() { return objectArgs; } @@ -139,7 +147,7 @@ public SchemaTableName toSchemaTableName() @Override public int hashCode() { - return Objects.hash(connectorId, schemaName, tableName, objectName, objectType, schema); + return Objects.hash(schemaName, tableName, objectName, objectType, schema); } @Override @@ -153,8 +161,7 @@ public boolean equals(Object obj) } PravegaTableHandle other = (PravegaTableHandle) obj; - return Objects.equals(this.connectorId, other.connectorId) - && Objects.equals(this.schemaName, other.schemaName) + return Objects.equals(this.schemaName, other.schemaName) && Objects.equals(this.tableName, other.tableName) && Objects.equals(this.objectName, other.objectName) && Objects.equals(this.objectType, other.objectType) @@ -165,7 +172,6 @@ public boolean equals(Object obj) public String toString() { return toStringHelper(this) - .add("connectorId", connectorId) .add("schemaName", schemaName) .add("tableName", tableName) .add("objectName", objectName) diff --git a/src/main/java/io/pravega/connectors/presto/schemamanagement/CompositeSchemaRegistry.java b/src/main/java/io/pravega/connectors/presto/schemamanagement/CompositeSchemaRegistry.java new file mode 100644 index 0000000..82322c4 --- /dev/null +++ b/src/main/java/io/pravega/connectors/presto/schemamanagement/CompositeSchemaRegistry.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto.schemamanagement; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.annotations.VisibleForTesting; +import io.pravega.connectors.presto.PravegaConnectorConfig; +import io.pravega.connectors.presto.PravegaStreamDescription; +import io.pravega.connectors.presto.PravegaStreamFieldGroup; +import io.pravega.connectors.presto.PravegaTableHandle; + +import java.util.ArrayList; +import java.util.List; + +public class CompositeSchemaRegistry + implements SchemaSupplier, SchemaRegistry { + private final List schemaSuppliers; + + private final List schemaRegistries; + + public CompositeSchemaRegistry(PravegaConnectorConfig config, JsonCodec streamDescriptionCodec) { + schemaSuppliers = new ArrayList<>(); + schemaRegistries = new ArrayList<>(); + + // local will override, always add first + if (config.getTableDescriptionDir() != null && + config.getTableDescriptionDir().exists() && + config.getTableDescriptionDir().isDirectory()) { + LocalSchemaRegistry schemaRegistry = + new LocalSchemaRegistry(config.getTableDescriptionDir(), streamDescriptionCodec); + schemaSuppliers.add(schemaRegistry); + schemaRegistries.add(schemaRegistry); + } + + if (config.getSchemaRegistryURI() != null) { + PravegaSchemaRegistry schemaRegistry = + new PravegaSchemaRegistry(config.getControllerURI(), config.getSchemaRegistryURI()); + schemaSuppliers.add(schemaRegistry); + schemaRegistries.add(schemaRegistry); + } + + if (config.getConfluentSchemaRegistry() != null) { + ConfluentSchemaRegistry schemaRegistry = + new ConfluentSchemaRegistry(config.getConfluentSchemaRegistry()); + schemaRegistries.add(schemaRegistry); + } + } + + @VisibleForTesting + public CompositeSchemaRegistry(List schemaSuppliers, List schemaRegistries) + { + this.schemaSuppliers = schemaSuppliers; + this.schemaRegistries = schemaRegistries; + } + + @Override + public List listSchemas() + { + final List schemas = new ArrayList<>(); + schemaSuppliers.forEach(p -> schemas.addAll(p.listSchemas())); + return schemas; + } + + @Override + public List listTables(String schema) + { + final List tables = new ArrayList<>(); + schemaSuppliers.forEach(p -> tables.addAll(p.listTables(schema))); + return tables; + } + + @Override + public List getSchema(SchemaTableName schemaTableName) { + for (SchemaRegistry schemaRegistry : schemaRegistries) { + List schema = schemaRegistry.getSchema(schemaTableName); + if (schema != null) { + return schema; + } + } + return null; + } + + @Override + public PravegaStreamDescription getTable(SchemaTableName schemaTableName) + { + for (SchemaRegistry schemaRegistry : schemaRegistries) { + PravegaStreamDescription streamDescription = schemaRegistry.getTable(schemaTableName); + if (streamDescription != null) { + return streamDescription; + } + } + return null; + } +} diff --git a/src/main/java/io/pravega/connectors/presto/schemamanagement/ConfluentSchemaRegistry.java b/src/main/java/io/pravega/connectors/presto/schemamanagement/ConfluentSchemaRegistry.java new file mode 100644 index 0000000..c8b55bc --- /dev/null +++ b/src/main/java/io/pravega/connectors/presto/schemamanagement/ConfluentSchemaRegistry.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto.schemamanagement; + +import com.facebook.presto.spi.SchemaTableName; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.pravega.connectors.presto.ObjectType; +import io.pravega.connectors.presto.PravegaStreamDescription; +import io.pravega.connectors.presto.PravegaStreamFieldDescription; +import io.pravega.connectors.presto.PravegaStreamFieldGroup; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static io.pravega.connectors.presto.util.PravegaNameUtils.temp_tableNameToStreamName; +import static io.pravega.connectors.presto.util.PravegaSchemaUtils.AVRO; +import static io.pravega.connectors.presto.util.PravegaStreamDescUtils.mapFieldsFromSchema; + +public class ConfluentSchemaRegistry + implements SchemaRegistry +{ + private final SchemaRegistryClient schemaRegistryClient; + + public ConfluentSchemaRegistry(URI registryURI) + { + this.schemaRegistryClient = new CachedSchemaRegistryClient(registryURI.toASCIIString(), Integer.MAX_VALUE); + } + + @Override + public List getSchema(SchemaTableName schemaTableName) + { + try { + SchemaMetadata metadata = schemaRegistryClient.getLatestSchemaMetadata(format(schemaTableName)); + if (!metadata.getSchemaType().equalsIgnoreCase(AVRO)) { + throw new UnsupportedOperationException("schema type '" + metadata.getSchemaType() + "' is not supported"); + } + + List fields = + mapFieldsFromSchema("", AVRO, metadata.getSchema()); + + return Collections.singletonList( + new PravegaStreamFieldGroup(AVRO, + Optional.empty(), + Optional.of(metadata.getSchema()), + Optional.of(fields))); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + catch (RestClientException e) { + if (e.getStatus() == 404) { + return null; + } + throw new RuntimeException(e); + } + } + + @Override + public PravegaStreamDescription getTable(SchemaTableName schemaTableName) + { + List schema = getSchema(schemaTableName); + if (schema == null) { + return null; + } + + return new PravegaStreamDescription( + schemaTableName.getTableName(), + Optional.of(schemaTableName.getSchemaName()), + temp_tableNameToStreamName(schemaTableName.getTableName()), + Optional.of(ObjectType.STREAM), + Optional.empty() /* args */, + Optional.of(schema)); + } + + static String format(SchemaTableName schemaTableName) + { + return String.format("%s-%s", schemaTableName.getSchemaName(), schemaTableName.getTableName()); + } +} \ No newline at end of file diff --git a/src/main/java/io/pravega/connectors/presto/schemamanagement/LocalSchemaRegistry.java b/src/main/java/io/pravega/connectors/presto/schemamanagement/LocalSchemaRegistry.java new file mode 100644 index 0000000..2012f65 --- /dev/null +++ b/src/main/java/io/pravega/connectors/presto/schemamanagement/LocalSchemaRegistry.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto.schemamanagement; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.collect.ImmutableList; +import io.pravega.connectors.presto.ObjectType; +import io.pravega.connectors.presto.PravegaStreamDescription; +import io.pravega.connectors.presto.PravegaStreamFieldGroup; +import io.pravega.connectors.presto.PravegaTableHandle; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static io.pravega.connectors.presto.util.PravegaNameUtils.kvFieldMapping; +import static io.pravega.connectors.presto.util.PravegaStreamDescUtils.resolveAllSchemas; +import static java.nio.file.Files.readAllBytes; + +public class LocalSchemaRegistry + implements SchemaSupplier, SchemaRegistry +{ + private final File localTableDir; + + private final JsonCodec streamDescriptionCodec; + + public LocalSchemaRegistry(File localTableDir, JsonCodec streamDescriptionCodec) + { + this.localTableDir = localTableDir; + this.streamDescriptionCodec = streamDescriptionCodec; + } + + @Override + public List listSchemas() + { + // file name format: {schema}.{table}.json + return localSchemaStream() + // ensures matches full file name format + .filter(file -> file.getName().split("\\.").length == 3) + .map(file -> file.getName().split("\\.")[0]) + .distinct() + .collect(Collectors.toList()); + } + + @Override + public List listTables(String schema) + { + final List tables = new ArrayList<>(); + + // file name format: {schema}.{table}.json + localSchemaStream() + .filter(file -> file.getName().startsWith(schema)) + // ensures matches full file name format + .filter(file -> file.getName().split("\\.").length == 3) + // {table} from file name + .map(file -> file.getName().split("\\.")[1]) + .map(file -> getLocalTable(new SchemaTableName(schema, file))) + .forEach(table -> { + tables.add(new PravegaTableHandle(table.getSchemaName().get(), + table.getTableName(), + table.getObjectName(), + table.getObjectType(), + table.getObjectArgs())); + }); + + return tables; + } + + @Override + public List getSchema(SchemaTableName schemaTableName) { + PravegaStreamDescription streamDescription = getLocalTable(schemaTableName); + return streamDescription == null ? null : streamDescription.getEvent().orElse(null); + } + + @Override + public PravegaStreamDescription getTable(SchemaTableName schemaTableName) + { + // reads table definition from local file + // if table definition has pointers to schema, read it and populate fields + // (for e.g. local schema file or url to schema) + PravegaStreamDescription table = getLocalTable(schemaTableName); + + // either not found or no fields, nothing to do. will be resolved later + if (table == null || !table.getEvent().isPresent()) { + return table; + } + + // fields already defined + if (table.getEvent().get().stream().noneMatch( + schema -> schema.getDataSchema().isPresent())) { + return table; + } + + // at least 1 schema for a fieldGroup must be resolved. read schema from local file or url + List finalSchemas = + resolveAllSchemas(localTableDir, table.getEvent().get(), (i) -> columnPrefix(table, i)); + + return new PravegaStreamDescription(table, finalSchemas); + } + + static String columnPrefix(PravegaStreamDescription table, int schemaIndex) { + // if kv table, returns something like key/{fieldName}, value/{fieldName} + return table.getObjectType() == ObjectType.KV_TABLE ? kvFieldMapping(schemaIndex) : ""; + } + + private java.util.stream.Stream localSchemaStream() + { + return listFiles(localTableDir).stream() + .filter(file -> file.isFile() && file.getName().endsWith(".json")); + } + + private static List listFiles(File dir) + { + if ((dir != null) && dir.isDirectory()) { + File[] files = dir.listFiles(); + if (files != null) { + return ImmutableList.copyOf(files); + } + } + return ImmutableList.of(); + } + + private PravegaStreamDescription getLocalTable(SchemaTableName schemaTableName) + { + try { + File file = new File(localTableDir, String.format("%s.%s.json", + schemaTableName.getSchemaName(), schemaTableName.getTableName())); + return !file.exists() ? + null + : streamDescriptionCodec.fromJson(readAllBytes(file.toPath())); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/src/main/java/io/pravega/connectors/presto/schemamanagement/PravegaSchemaRegistry.java b/src/main/java/io/pravega/connectors/presto/schemamanagement/PravegaSchemaRegistry.java new file mode 100644 index 0000000..fece340 --- /dev/null +++ b/src/main/java/io/pravega/connectors/presto/schemamanagement/PravegaSchemaRegistry.java @@ -0,0 +1,204 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto.schemamanagement; + +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Streams; +import io.pravega.client.ClientConfig; +import io.pravega.client.admin.StreamManager; +import io.pravega.connectors.presto.ObjectType; +import io.pravega.connectors.presto.PravegaStreamDescription; +import io.pravega.connectors.presto.PravegaStreamFieldGroup; +import io.pravega.connectors.presto.PravegaTableHandle; +import io.pravega.schemaregistry.client.SchemaRegistryClient; +import io.pravega.schemaregistry.client.SchemaRegistryClientConfig; +import io.pravega.schemaregistry.client.SchemaRegistryClientFactory; +import io.pravega.schemaregistry.client.exceptions.RegistryExceptions; +import io.pravega.schemaregistry.contract.data.GroupProperties; +import io.pravega.schemaregistry.contract.data.SchemaWithVersion; +import io.pravega.schemaregistry.contract.data.SerializationFormat; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Optional; + +import static io.pravega.connectors.presto.ProtobufCommon.encodeSchema; +import static io.pravega.connectors.presto.util.PravegaNameUtils.groupId; +import static io.pravega.connectors.presto.util.PravegaNameUtils.internalObject; +import static io.pravega.connectors.presto.util.PravegaNameUtils.internalStream; +import static io.pravega.connectors.presto.util.PravegaNameUtils.kvFieldMapping; +import static io.pravega.connectors.presto.util.PravegaNameUtils.temp_streamNameToTableName; +import static io.pravega.connectors.presto.util.PravegaNameUtils.temp_tableNameToStreamName; +import static io.pravega.connectors.presto.util.PravegaSchemaUtils.GROUP_PROPERTIES_INLINE_KEY; +import static io.pravega.connectors.presto.util.PravegaSchemaUtils.GROUP_PROPERTIES_INLINE_KV_KEY; +import static io.pravega.connectors.presto.util.PravegaSchemaUtils.GROUP_PROPERTIES_INLINE_KV_VALUE; +import static io.pravega.connectors.presto.util.PravegaSchemaUtils.INLINE_SUFFIX; +import static io.pravega.connectors.presto.util.PravegaStreamDescUtils.mapFieldsFromSchema; + +public class PravegaSchemaRegistry + implements SchemaSupplier, SchemaRegistry +{ + private final SchemaRegistryClient registryClient; + + private final StreamManager streamManager; + + public PravegaSchemaRegistry(URI controllerURI, URI schemaRegistryURI) + { + SchemaRegistryClientConfig registryConfig = + SchemaRegistryClientConfig.builder().schemaRegistryUri(schemaRegistryURI).build(); + this.registryClient = SchemaRegistryClientFactory.withDefaultNamespace(registryConfig); + this.streamManager = StreamManager.create(ClientConfig.builder().controllerURI(controllerURI).build()); + } + + @VisibleForTesting + public PravegaSchemaRegistry(SchemaRegistryClient registryClient, StreamManager streamManager) + { + this.registryClient = registryClient; + this.streamManager = streamManager; + } + + @Override + public List listSchemas() + { + List schemas = new ArrayList<>(); + Streams.stream(streamManager.listScopes()).filter(s -> !internalObject(s)).forEach(schemas::add); + return schemas; + } + + @Override + public List listTables(String schema) + { + // (underlying streams used by kv table are seen as internal and thus are skipped) + List tables = new ArrayList<>(); + Streams.stream(streamManager.listStreams(schema)) + .filter(stream -> !internalStream(stream)) + .forEach(stream -> { + tables.add(new PravegaTableHandle(schema, + temp_streamNameToTableName(stream.getStreamName()), + stream.getStreamName(), + ObjectType.STREAM, + Optional.empty())); + }); + return tables; + } + + @Override + public List getSchema(SchemaTableName schemaTableName) { + String groupName = groupId(schemaTableName.getSchemaName(), + temp_tableNameToStreamName(schemaTableName.getTableName())); + + GroupProperties properties; + List schemas; + + try { + properties = registryClient.getGroupProperties(groupName); + schemas = registryClient.getSchemas(groupName); + } + catch (RegistryExceptions.ResourceNotFoundException e) { + return null; + } + + if (schemas.size() == 0 || schemas.size() > 2) { + throw new IllegalStateException(schemaTableName + " has " + schemas.size() + " registered schemas. expecting either 1 or 2"); + } + + // kv table will have > 1 schema. key+value likely different types + boolean kv = schemas.size() > 1; + List fieldGroups = new ArrayList<>(2); + for (int i = 0; i < schemas.size(); i++) { + // colPrefix used for display so can differentiate between fields from key or value + String colPrefix = kv ? kvFieldMapping(i) : ""; + + SerializationFormat format = schemas.get(i).getSchemaInfo().getSerializationFormat(); + fieldGroups.add(new PravegaStreamFieldGroup( + dataFormat(properties.getProperties(), format, kv, i), + Optional.of(colPrefix), + dataSchema(format, schemas.get(i)), + Optional.of(mapFieldsFromSchema(colPrefix, format, schemas.get(i))))); + } + + return fieldGroups; + } + + @Override + public PravegaStreamDescription getTable(SchemaTableName schemaTableName) + { + List schema = getSchema(schemaTableName); + if (schema == null) { + return null; + } + + return new PravegaStreamDescription( + schemaTableName.getTableName(), + Optional.of(schemaTableName.getSchemaName()), + temp_tableNameToStreamName(schemaTableName.getTableName()), + Optional.of(ObjectType.STREAM), + Optional.empty() /* args */, + Optional.of(schema)); + } + + private static String dataFormat(ImmutableMap groupProperties, + SerializationFormat format, + boolean kvTable, + int kvIdx) + { + /* + TODO: auto-detect https://github.com/pravega/presto-connector/issues/20 + + (1) no schema registry. + (2) Register and evolve schemas in registry but do not use registry client while writing data + (3) Register schemas in the registry and use registry client to encode schema Id with payload + "inline" is for #3. for e.g. "avro" -> "avro-inline". PravegaRecordSetProvider is interested in this + + hopefully this can all go away (see linked issue 58 above) + + but for now the following is our convention + if "inline" exists in our properties, all data uses SR + else if it is a kv table key+value may be different. both, neither, or either may use SR + look for "inlinekey" / "inlinevalue" + */ + + String key = GROUP_PROPERTIES_INLINE_KEY; + + if (kvTable && !groupProperties.containsKey(key)) { + key = kvIdx == 0 ? GROUP_PROPERTIES_INLINE_KV_KEY : GROUP_PROPERTIES_INLINE_KV_VALUE; + } + + String finalFormat = format == SerializationFormat.Custom + ? format.getFullTypeName().toLowerCase(Locale.ENGLISH) + : format.name().toLowerCase(Locale.ENGLISH); + return finalFormat + (groupProperties.containsKey(key) ? INLINE_SUFFIX : ""); + } + + public static Optional dataSchema(SerializationFormat format, SchemaWithVersion schemaWithVersion) + { + // it is intentional that nothing is returned for Custom + // pass schema to row decoders. refer to PravegaRecordSetProvider + switch (format) { + case Protobuf: + return Optional.of(encodeSchema(schemaWithVersion)); + case Avro: + return Optional.of(new String(schemaWithVersion.getSchemaInfo().getSchemaData().array(), StandardCharsets.UTF_8)); + default: + return Optional.empty(); + } + } +} diff --git a/src/main/java/io/pravega/connectors/presto/schemamanagement/SchemaRegistry.java b/src/main/java/io/pravega/connectors/presto/schemamanagement/SchemaRegistry.java new file mode 100644 index 0000000..0a5c28c --- /dev/null +++ b/src/main/java/io/pravega/connectors/presto/schemamanagement/SchemaRegistry.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto.schemamanagement; + +import com.facebook.presto.spi.SchemaTableName; +import io.pravega.connectors.presto.PravegaStreamDescription; +import io.pravega.connectors.presto.PravegaStreamFieldGroup; + +import java.util.List; + +/** + * return schema for the given schema.table + */ +public interface SchemaRegistry +{ + PravegaStreamDescription getTable(SchemaTableName schemaTableName); + + List getSchema(SchemaTableName schemaTableName); +} diff --git a/src/main/java/io/pravega/connectors/presto/schemamanagement/SchemaSupplier.java b/src/main/java/io/pravega/connectors/presto/schemamanagement/SchemaSupplier.java new file mode 100644 index 0000000..c73bcea --- /dev/null +++ b/src/main/java/io/pravega/connectors/presto/schemamanagement/SchemaSupplier.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto.schemamanagement; + +import io.pravega.connectors.presto.PravegaTableHandle; + +import java.util.List; + +/** + * a source of schemas, and tables + * this may be reading files on local disk or listing scopes+streams from pravega + */ +public interface SchemaSupplier +{ + List listSchemas(); + + List listTables(String schema); +} diff --git a/src/main/java/io/pravega/connectors/presto/util/PravegaNameUtils.java b/src/main/java/io/pravega/connectors/presto/util/PravegaNameUtils.java index c4ba35b..2ea7a5c 100644 --- a/src/main/java/io/pravega/connectors/presto/util/PravegaNameUtils.java +++ b/src/main/java/io/pravega/connectors/presto/util/PravegaNameUtils.java @@ -16,6 +16,7 @@ package io.pravega.connectors.presto.util; +import io.pravega.client.stream.Stream; import io.pravega.connectors.presto.ObjectType; import io.pravega.connectors.presto.PravegaStreamDescription; import io.pravega.connectors.presto.PravegaTableHandle; @@ -51,20 +52,24 @@ public static String groupId(String scope, String stream) return scope + "." + stream; } - // test stream name - if not valid pravega stream name assume it is regex for multi source public static boolean multiSourceStream(PravegaStreamDescription object) { + // if stream name is a regex, or if there are object args + // (objectArgs for stream are comma sep list of component streams) return object.getObjectType() == ObjectType.STREAM && - multiSourceStream(object.getObjectName()); + (multiSourceStream(object.getObjectName()) || object.getObjectArgs().isPresent()); } public static boolean multiSourceStream(PravegaTableHandle object) { + // if stream name is a regex, or if there are object args + // (objectArgs for stream are comma sep list of component streams) return object.getObjectType() == ObjectType.STREAM && - multiSourceStream(object.getObjectName()); + (multiSourceStream(object.getObjectName()) || object.getObjectArgs().isPresent()); } - private static boolean multiSourceStream(String stream) + // test stream name - if not valid pravega stream name assume it is regex for multi source + public static boolean multiSourceStream(String stream) { try { // test pattern for stream names pravega will allow @@ -108,4 +113,15 @@ public static String streamCutName(String stream) { return stream + STREAM_CUT_PREFIX; } + + public static boolean internalStream(Stream stream) + { + return internalObject(stream.getStreamName()); + } + + public static boolean internalObject(String object) + { + return object.startsWith("_") /* pravega internal */ || + object.endsWith("-SC") /* application internal - stream cuts */; + } } diff --git a/src/main/java/io/pravega/connectors/presto/util/PravegaSchemaUtils.java b/src/main/java/io/pravega/connectors/presto/util/PravegaSchemaUtils.java index 1938c6b..4fbfb71 100644 --- a/src/main/java/io/pravega/connectors/presto/util/PravegaSchemaUtils.java +++ b/src/main/java/io/pravega/connectors/presto/util/PravegaSchemaUtils.java @@ -20,6 +20,7 @@ import com.facebook.presto.spi.PrestoException; import com.google.common.io.CharStreams; +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -54,7 +55,7 @@ private PravegaSchemaUtils() public static final String NESTED_RECORD_SEPARATOR = "/"; - public static String readSchema(String dataSchemaLocation) + public static String readSchema(File schemaDir, String dataSchemaLocation) { InputStream inputStream = null; try { @@ -65,7 +66,7 @@ public static String readSchema(String dataSchemaLocation) catch (MalformedURLException e) { // try again before failing log.warn("invalid URL: " + dataSchemaLocation); - inputStream = new FileInputStream(dataSchemaLocation); + inputStream = new FileInputStream(new File(schemaDir, dataSchemaLocation)); } } else { diff --git a/src/main/java/io/pravega/connectors/presto/util/PravegaStreamDescUtils.java b/src/main/java/io/pravega/connectors/presto/util/PravegaStreamDescUtils.java index adfed8c..96c435c 100644 --- a/src/main/java/io/pravega/connectors/presto/util/PravegaStreamDescUtils.java +++ b/src/main/java/io/pravega/connectors/presto/util/PravegaStreamDescUtils.java @@ -18,6 +18,7 @@ import com.facebook.presto.common.type.Type; import com.google.protobuf.Descriptors; import io.pravega.connectors.presto.PravegaStreamFieldDescription; +import io.pravega.connectors.presto.PravegaStreamFieldGroup; import io.pravega.connectors.presto.ProtobufCommon; import io.pravega.schemaregistry.contract.data.SchemaWithVersion; import io.pravega.schemaregistry.contract.data.SerializationFormat; @@ -28,10 +29,12 @@ import org.everit.json.schema.Schema; import org.everit.json.schema.StringSchema; +import java.io.File; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.BooleanType.BOOLEAN; @@ -40,6 +43,7 @@ import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType; import static io.pravega.connectors.presto.util.PravegaSchemaUtils.AVRO; import static io.pravega.connectors.presto.util.PravegaSchemaUtils.NESTED_RECORD_SEPARATOR; +import static io.pravega.connectors.presto.util.PravegaSchemaUtils.readSchema; import static org.apache.avro.Schema.Type.RECORD; /** @@ -343,4 +347,39 @@ private static String nestedPrefixFor(String prefix, String name) ? name : prefix + NESTED_RECORD_SEPARATOR + name; } + + /** + * PravegaStreamFieldGroup may contain pointer to schema (local file, or url) + * for these, read the schema and build the field definitions + * + * @param schemaDir directory where we can find the schema + * @param fieldGroups fieldGroups to look through + * @param columnPrefix function to return columnPrefix to be used for the fields in the group + * @return list of PravegaStreamFieldGroup with all schemas resolved + */ + public static List resolveAllSchemas(File schemaDir, + List fieldGroups, + Function columnPrefix) + { + // fields already defined + if (fieldGroups.stream().noneMatch( + schema -> schema.getDataSchema().isPresent())) { + return fieldGroups; + } + + // at least 1 schema for a fieldGroup must be resolved. read schema from local file or url + List finalSchemas = new ArrayList<>(fieldGroups.size()); + for (int i = 0; i < fieldGroups.size(); i++) { + PravegaStreamFieldGroup fieldGroup = fieldGroups.get(i); + if (fieldGroup.getDataSchema().isPresent()) { + String dataSchema = readSchema(schemaDir, fieldGroup.getDataSchema().get()); + List fields = + mapFieldsFromSchema(columnPrefix.apply(i), fieldGroup.getDataFormat(), dataSchema); + finalSchemas.add(new PravegaStreamFieldGroup(fieldGroup, dataSchema, fields)); + } else { + finalSchemas.add(fieldGroup); + } + } + return finalSchemas; + } } diff --git a/src/test/java/io/pravega/connectors/presto/PravegaTableDescriptionSupplierTest.java b/src/test/java/io/pravega/connectors/presto/PravegaTableDescriptionSupplierTest.java new file mode 100644 index 0000000..2c869d1 --- /dev/null +++ b/src/test/java/io/pravega/connectors/presto/PravegaTableDescriptionSupplierTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto; + +import com.facebook.presto.spi.SchemaTableName; +import io.pravega.connectors.presto.util.SchemaRegistryUtil; +import org.testng.annotations.Test; + +import java.util.List; + +import static io.pravega.connectors.presto.util.TestSchemas.EMPLOYEE_AVSC; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +@Test +public class PravegaTableDescriptionSupplierTest +{ + private static final String SCHEMA = "ut"; + + @Test + public void testTableDoesNotExist() + { + PravegaTableDescriptionSupplier tableSupplier = + new PravegaTableDescriptionSupplier(new SchemaRegistryUtil().getSchemaRegistry()); + assertNull(tableSupplier.getTable(new SchemaTableName(SCHEMA, "stream1"))); + } + + @Test + public void testMultiSourceStreamRegex() + { + SchemaRegistryUtil schemaRegistryUtil = new SchemaRegistryUtil(); + schemaRegistryUtil.addLocalSchema(SCHEMA); + + PravegaTableDescriptionSupplier tableSupplier = + new PravegaTableDescriptionSupplier(schemaRegistryUtil.getSchemaRegistry()); + + + schemaRegistryUtil.addSchema(SCHEMA); + schemaRegistryUtil.addTable(new SchemaTableName(SCHEMA, "stream1"), EMPLOYEE_AVSC); + schemaRegistryUtil.addTable(new SchemaTableName(SCHEMA, "stream2"), EMPLOYEE_AVSC); + schemaRegistryUtil.addTable(new SchemaTableName(SCHEMA, "stream3"), EMPLOYEE_AVSC); + + PravegaStreamDescription table = + tableSupplier.getTable(new SchemaTableName(SCHEMA, "multiregex")); + + assertNotNull(table); + assertTrue(table.getObjectArgs().isPresent()); + + List components = table.getObjectArgs().get(); + assertEquals(components.size(), 3); + assertEquals(components.stream().sorted().toArray(), new String[]{"stream1", "stream2", "stream3"}); + } + + @Test + public void testMultiSourceStreamExplicit() + { + // same setup as regex. but multi source def. only has 2 component streams. + SchemaRegistryUtil schemaRegistryUtil = new SchemaRegistryUtil(); + schemaRegistryUtil.addLocalSchema(SCHEMA); + + PravegaTableDescriptionSupplier tableSupplier = + new PravegaTableDescriptionSupplier(schemaRegistryUtil.getSchemaRegistry()); + + schemaRegistryUtil.addSchema(SCHEMA); + schemaRegistryUtil.addTable(new SchemaTableName(SCHEMA, "stream1"), EMPLOYEE_AVSC); + schemaRegistryUtil.addTable(new SchemaTableName(SCHEMA, "stream2"), EMPLOYEE_AVSC); + schemaRegistryUtil.addTable(new SchemaTableName(SCHEMA, "stream3"), EMPLOYEE_AVSC); + + PravegaStreamDescription table = + tableSupplier.getTable(new SchemaTableName(SCHEMA, "multiexplicit")); + + assertNotNull(table); + assertTrue(table.getObjectArgs().isPresent()); + + List components = table.getObjectArgs().get(); + assertEquals(components.size(), 2); + assertEquals(components.stream().sorted().toArray(), new String[]{"stream1", "stream3"}); + } +} diff --git a/src/test/java/io/pravega/connectors/presto/integration/PravegaKeyValueTest.java b/src/test/java/io/pravega/connectors/presto/integration/PravegaKeyValueTest.java index 1091887..99307e3 100644 --- a/src/test/java/io/pravega/connectors/presto/integration/PravegaKeyValueTest.java +++ b/src/test/java/io/pravega/connectors/presto/integration/PravegaKeyValueTest.java @@ -26,7 +26,7 @@ import java.util.List; import static com.facebook.presto.testing.TestingSession.testSessionBuilder; -import static io.pravega.connectors.presto.integration.PravegaTestUtils.getKvStreamDesc; +import static io.pravega.connectors.presto.util.PravegaTestUtils.getKvStreamDesc; import static org.testng.Assert.assertEquals; @Test diff --git a/src/test/java/io/pravega/connectors/presto/integration/PravegaQueryRunner.java b/src/test/java/io/pravega/connectors/presto/integration/PravegaQueryRunner.java index 51ce818..ecf62d6 100644 --- a/src/test/java/io/pravega/connectors/presto/integration/PravegaQueryRunner.java +++ b/src/test/java/io/pravega/connectors/presto/integration/PravegaQueryRunner.java @@ -15,37 +15,38 @@ */ package io.pravega.connectors.presto.integration; -import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.log.Logger; import com.facebook.airlift.log.Logging; import com.facebook.presto.Session; import com.facebook.presto.common.QualifiedObjectName; -import com.facebook.presto.metadata.Metadata; -import com.facebook.presto.spi.SchemaTableName; + import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.DistributedQueryRunner; import com.facebook.presto.tests.TestingPrestoClient; import com.facebook.presto.tpch.TpchPlugin; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; + import com.google.common.collect.ImmutableMap; import io.airlift.tpch.TpchTable; import io.pravega.client.admin.StreamManager; + import io.pravega.connectors.presto.PravegaPlugin; -import io.pravega.connectors.presto.PravegaStreamDescription; import io.pravega.connectors.presto.PravegaTableDescriptionSupplier; -import io.pravega.connectors.presto.PravegaTableName; +import io.pravega.connectors.presto.schemamanagement.CompositeSchemaRegistry; +import io.pravega.connectors.presto.schemamanagement.LocalSchemaRegistry; +import io.pravega.connectors.presto.schemamanagement.SchemaRegistry; +import io.pravega.connectors.presto.schemamanagement.SchemaSupplier; +import io.pravega.connectors.presto.util.PravegaTestUtils; import java.net.URI; +import java.util.ArrayList; +import java.util.List; import java.util.Map; -import java.util.Optional; import static com.facebook.airlift.testing.Closeables.closeAllSuppress; import static com.facebook.presto.testing.TestingSession.testSessionBuilder; import static com.facebook.presto.tpch.TpchMetadata.TINY_SCHEMA_NAME; import static io.airlift.units.Duration.nanosSince; -import static io.pravega.connectors.presto.integration.PravegaTestUtils.getKvStreamDesc; -import static io.pravega.connectors.presto.integration.PravegaTestUtils.getStreamDesc; + import static java.util.Locale.ENGLISH; import static java.util.concurrent.TimeUnit.SECONDS; @@ -71,8 +72,7 @@ public static DistributedQueryRunner createQueryRunner(URI controller, Iterable< queryRunner.installPlugin(new TpchPlugin()); queryRunner.createCatalog("tpch", "tpch"); - PravegaTableDescriptionSupplier tableDescriptionSupplier = - createSchemas(queryRunner.getCoordinator().getMetadata(), tpchTables, keyValueTables); + PravegaTableDescriptionSupplier tableDescriptionSupplier = createTableDescriptionSupplier(tpchTables, keyValueTables); installPlugin(controller, queryRunner, tableDescriptionSupplier); @@ -120,31 +120,24 @@ private static void loadPravegaKVTable(URI controller, StreamManager streamManag log.info("Imported %s in %s", 0, table, nanosSince(start).convertToMostSuccinctTimeUnit()); } - private static PravegaTableDescriptionSupplier createSchemas(Metadata metadata, Iterable> tables, Iterable keyValueTables) + private static PravegaTableDescriptionSupplier createTableDescriptionSupplier(Iterable> tpchTables, Iterable keyValueTables) { - JsonCodec streamDescCodec = new CodecSupplier<>(PravegaStreamDescription.class, metadata).get(); + List schemaSuppliers = new ArrayList<>(); + List schemaRegistries = new ArrayList<>(); - Cache schemaCache = CacheBuilder.newBuilder().build(); - Cache> tableCache = CacheBuilder.newBuilder().build(); - - for (TpchTable table : tables) { - SchemaTableName schemaTableName = new SchemaTableName(TPCH_SCHEMA, table.getTableName()); - PravegaTableName pravegaTableName = new PravegaTableName(schemaTableName); - - schemaCache.put(schemaTableName.getSchemaName(), new Object()); - tableCache.put(pravegaTableName, Optional.of(getStreamDesc(streamDescCodec, "tpch", table.getTableName()))); + if (tpchTables.iterator().hasNext()) { + LocalSchemaRegistry tpch = PravegaTestUtils.localSchemaRegistry("tpch"); + schemaSuppliers.add(tpch); + schemaRegistries.add(tpch); } - for (String table : keyValueTables) { - SchemaTableName schemaTableName = new SchemaTableName(KV_SCHEMA, table); - PravegaTableName pravegaTableName = new PravegaTableName(schemaTableName); - - schemaCache.put(schemaTableName.getSchemaName(), new Object()); - tableCache.put(pravegaTableName, Optional.of(getKvStreamDesc(table))); + if (keyValueTables.iterator().hasNext()) { + LocalSchemaRegistry kv = PravegaTestUtils.localSchemaRegistry("kv"); + schemaSuppliers.add(kv); + schemaRegistries.add(kv); } - // all schemas + tables will be served from these provided caches - return new PravegaTableDescriptionSupplier(null, schemaCache, tableCache); + return new PravegaTableDescriptionSupplier(new CompositeSchemaRegistry(schemaSuppliers, schemaRegistries)); } public static Session createSession() diff --git a/src/test/java/io/pravega/connectors/presto/schemamanagement/LocalSchemaRegistryTest.java b/src/test/java/io/pravega/connectors/presto/schemamanagement/LocalSchemaRegistryTest.java new file mode 100644 index 0000000..9580c29 --- /dev/null +++ b/src/test/java/io/pravega/connectors/presto/schemamanagement/LocalSchemaRegistryTest.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto.schemamanagement; + +import com.facebook.presto.common.type.BigintType; +import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.spi.SchemaTableName; +import io.pravega.connectors.presto.PravegaStreamDescription; +import io.pravega.connectors.presto.PravegaStreamFieldDescription; +import io.pravega.connectors.presto.PravegaStreamFieldGroup; +import io.pravega.connectors.presto.PravegaTableHandle; +import io.pravega.connectors.presto.util.PravegaTestUtils; +import org.testng.annotations.Test; + +import java.util.List; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +@Test +public class LocalSchemaRegistryTest +{ + // uses resources/tpch for table description dir + + @Test + public void testListSchemas() + { + LocalSchemaRegistry schemaRegistry = PravegaTestUtils.localSchemaRegistry("tpch"); + + List schemas = schemaRegistry.listSchemas(); + assertEquals(schemas.size(), 1); + assertEquals(schemas.get(0), "tpch"); + } + + @Test + public void testListTables() + { + LocalSchemaRegistry schemaRegistry = PravegaTestUtils.localSchemaRegistry("tpch"); + + List tables = schemaRegistry.listTables("tpch"); + assertEquals(tables.size(), 8); + + PravegaTableHandle customerTableHandle = + tables.stream().filter(h -> h.getTableName().equals("customer")).findFirst().get(); + assertEquals(customerTableHandle.getSchemaName(), "tpch"); + assertEquals(customerTableHandle.getTableName(), "customer"); + assertEquals(customerTableHandle.getObjectName(), "customer"); + } + + @Test + public void testGetSchema() + { + LocalSchemaRegistry schemaRegistry = PravegaTestUtils.localSchemaRegistry("tpch"); + + List schema = + schemaRegistry.getSchema(new SchemaTableName("tpch", "customer")); + + assertNotNull(schema); + assertEquals(1, schema.size()); + + validateCustomerSchema(schema.get(0)); + } + + @Test + public void testGetTable() + { + LocalSchemaRegistry schemaRegistry = PravegaTestUtils.localSchemaRegistry("tpch"); + + PravegaStreamDescription table = + schemaRegistry.getTable(new SchemaTableName("tpch", "customer")); + assertNotNull(table); + + assertTrue(table.getEvent().isPresent()); + assertEquals(1, table.getEvent().get().size()); + + validateCustomerSchema(table.getEvent().get().get(0)); + } + + @Test + public void testTableDoesNotExist() + { + LocalSchemaRegistry schemaRegistry = PravegaTestUtils.localSchemaRegistry("tpch"); + assertNull(schemaRegistry.getTable(new SchemaTableName("tpch", "abcxyz123"))); + } + + private void validateCustomerSchema(PravegaStreamFieldGroup fieldGroup) + { + // spot check a fiew fields + + assertEquals(fieldGroup.getDataFormat(), "json"); + assertEquals(fieldGroup.getFields().size(), 8); + + PravegaStreamFieldDescription field = fieldGroup.getFields().get(0); + assertEquals(field.getName(), "custkey"); + assertEquals(field.getMapping(), "custkey"); + assertTrue(field.getType() instanceof BigintType); + + field = fieldGroup.getFields().get(6); + assertEquals(field.getName(), "mktsegment"); + assertEquals(field.getMapping(), "mktsegment"); + assertTrue(field.getType() instanceof VarcharType); + } +} diff --git a/src/test/java/io/pravega/connectors/presto/schemamanagement/PravegaSchemaRegistryTest.java b/src/test/java/io/pravega/connectors/presto/schemamanagement/PravegaSchemaRegistryTest.java new file mode 100644 index 0000000..f5ea023 --- /dev/null +++ b/src/test/java/io/pravega/connectors/presto/schemamanagement/PravegaSchemaRegistryTest.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto.schemamanagement; + +import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.spi.SchemaTableName; +import io.pravega.connectors.presto.PravegaStreamDescription; +import io.pravega.connectors.presto.PravegaStreamFieldDescription; +import io.pravega.connectors.presto.PravegaStreamFieldGroup; +import io.pravega.connectors.presto.PravegaTableHandle; +import io.pravega.connectors.presto.util.SchemaRegistryUtil; +import org.testng.annotations.Test; + +import java.util.List; + +import static io.pravega.connectors.presto.util.TestSchemas.EMPLOYEE_AVSC; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +@Test +public class PravegaSchemaRegistryTest +{ + @Test + public void testListSchemas() + { + SchemaRegistryUtil schemaRegistryUtil = new SchemaRegistryUtil(); + + schemaRegistryUtil.addSchema("schema1"); + schemaRegistryUtil.addSchema("schema2"); + + List schemas = schemaRegistryUtil.getSchemaRegistry().listSchemas(); + assertEquals(schemas.size(), 2); + assertEquals("schema1", schemas.get(0)); + assertEquals("schema2", schemas.get(1)); + } + + @Test + public void testListTables() + { + SchemaRegistryUtil schemaRegistryUtil = new SchemaRegistryUtil(); + + schemaRegistryUtil.addTable("schema1", "stream1"); + schemaRegistryUtil.addTable("schema2", "stream2"); + // stream starting with '_' is internal/hidden + schemaRegistryUtil.addTable("schema2", "_markStream2"); + + List tables = schemaRegistryUtil.getSchemaRegistry().listTables("schema2"); + assertEquals(tables.size(), 1); + assertEquals("stream2", tables.get(0).getObjectName()); + } + + @Test + public void testGetTable() + { + SchemaRegistryUtil schemaRegistryUtil = new SchemaRegistryUtil(); + + SchemaTableName schemaTableName = new SchemaTableName("hr", "employee"); + schemaRegistryUtil.addAvroSchema(schemaTableName, EMPLOYEE_AVSC); + + SchemaRegistry schemaRegistry = schemaRegistryUtil.getSchemaRegistry(); + + PravegaStreamDescription table = schemaRegistry.getTable(schemaTableName); + + assertNotNull(table); + assertTrue(table.getEvent().isPresent()); + assertEquals(table.getEvent().get().size(), 1); + + PravegaStreamFieldGroup fieldGroup = table.getEvent().get().get(0); + assertEquals(fieldGroup.getFields().size(), 2); + + PravegaStreamFieldDescription field = fieldGroup.getFields().get(0); + assertEquals(field.getName(), "first"); + assertTrue(field.getType() instanceof VarcharType); + + field = fieldGroup.getFields().get(1); + assertEquals(field.getName(), "last"); + assertTrue(field.getType() instanceof VarcharType); + } +} diff --git a/src/test/java/io/pravega/connectors/presto/integration/CodecSupplier.java b/src/test/java/io/pravega/connectors/presto/util/CodecSupplier.java similarity index 87% rename from src/test/java/io/pravega/connectors/presto/integration/CodecSupplier.java rename to src/test/java/io/pravega/connectors/presto/util/CodecSupplier.java index 2334f1e..884188e 100644 --- a/src/test/java/io/pravega/connectors/presto/integration/CodecSupplier.java +++ b/src/test/java/io/pravega/connectors/presto/util/CodecSupplier.java @@ -15,13 +15,13 @@ * (rev a8968160e1840ac67a5f63def27d31c0ef0acde7) * https://github.com/prestodb/presto/blob/0.247/presto-kafka/src/test/java/com/facebook/presto/kafka/util/CodecSupplier.java */ -package io.pravega.connectors.presto.integration; +package io.pravega.connectors.presto.util; import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.json.JsonCodecFactory; import com.facebook.airlift.json.ObjectMapperProvider; import com.facebook.presto.common.type.Type; -import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.metadata.FunctionAndTypeManager; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer; import com.google.common.collect.ImmutableMap; @@ -33,14 +33,14 @@ public final class CodecSupplier implements Supplier> { - private final Metadata metadata; + private final FunctionAndTypeManager typeManager; private final JsonCodecFactory codecFactory; private final Class clazz; - public CodecSupplier(Class clazz, Metadata metadata) + public CodecSupplier(Class clazz, FunctionAndTypeManager typeManager) { this.clazz = clazz; - this.metadata = metadata; + this.typeManager = typeManager; ObjectMapperProvider objectMapperProvider = new ObjectMapperProvider(); objectMapperProvider.setJsonDeserializers(ImmutableMap.of(Type.class, new TypeDeserializer())); this.codecFactory = new JsonCodecFactory(objectMapperProvider); @@ -65,7 +65,7 @@ public TypeDeserializer() @Override protected Type _deserialize(String value, DeserializationContext context) { - Type type = metadata.getType(parseTypeSignature(value)); + Type type = typeManager.getType(parseTypeSignature(value)); if (type == null) { throw new IllegalArgumentException(String.valueOf("Unknown type " + value)); } diff --git a/src/test/java/io/pravega/connectors/presto/util/MockSchemaRegistryClient.java b/src/test/java/io/pravega/connectors/presto/util/MockSchemaRegistryClient.java new file mode 100644 index 0000000..9a47ce8 --- /dev/null +++ b/src/test/java/io/pravega/connectors/presto/util/MockSchemaRegistryClient.java @@ -0,0 +1,185 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto.util; + +import io.pravega.schemaregistry.client.SchemaRegistryClient; +import io.pravega.schemaregistry.client.exceptions.RegistryExceptions; +import io.pravega.schemaregistry.contract.data.CodecType; +import io.pravega.schemaregistry.contract.data.Compatibility; +import io.pravega.schemaregistry.contract.data.EncodingId; +import io.pravega.schemaregistry.contract.data.EncodingInfo; +import io.pravega.schemaregistry.contract.data.GroupHistoryRecord; +import io.pravega.schemaregistry.contract.data.GroupProperties; +import io.pravega.schemaregistry.contract.data.SchemaInfo; +import io.pravega.schemaregistry.contract.data.SchemaWithVersion; +import io.pravega.schemaregistry.contract.data.VersionInfo; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * this class has a very limited use in unit testing, so many methods not implemented + */ +public class MockSchemaRegistryClient + implements SchemaRegistryClient +{ + private final Map groups = new HashMap<>(); + + private final Map> schemas = new HashMap<>(); + + @Override + public boolean addGroup(String s, GroupProperties groupProperties) throws RegistryExceptions.BadArgumentException, RegistryExceptions.UnauthorizedException + { + return groups.putIfAbsent(s, groupProperties) == null; + } + + @Override + public void removeGroup(String s) throws RegistryExceptions.UnauthorizedException + { + groups.remove(s); + } + + @Override + public Iterator> listGroups() throws RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public GroupProperties getGroupProperties(String s) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + GroupProperties groupProperties = groups.get(s); + if (groupProperties == null) { + throw new RegistryExceptions.ResourceNotFoundException(s); + } + return groupProperties; + } + + @Override + public boolean updateCompatibility(String s, Compatibility compatibility, @Nullable Compatibility compatibility1) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public List getSchemas(String s) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + return schemas.get(s); + } + + @Override + public VersionInfo addSchema(String s, SchemaInfo schemaInfo) throws RegistryExceptions.SchemaValidationFailedException, RegistryExceptions.SerializationMismatchException, RegistryExceptions.MalformedSchemaException, RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + List list = schemas.computeIfAbsent(s, k -> new ArrayList<>()); + VersionInfo versionInfo = new VersionInfo("type", "avro", list.size() + 1, list.size() + 1); + list.add(new SchemaWithVersion(schemaInfo, versionInfo)); + return versionInfo; + } + + @Override + public void deleteSchemaVersion(String s, VersionInfo versionInfo) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public SchemaInfo getSchemaForVersion(String s, VersionInfo versionInfo) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public EncodingInfo getEncodingInfo(String s, EncodingId encodingId) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public EncodingId getEncodingId(String s, VersionInfo versionInfo, String s1) throws RegistryExceptions.CodecTypeNotRegisteredException, RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public SchemaWithVersion getLatestSchemaVersion(String s, @Nullable String s1) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public VersionInfo getVersionForSchema(String s, SchemaInfo schemaInfo) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public List getSchemaVersions(String s, @Nullable String s1) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean validateSchema(String s, SchemaInfo schemaInfo) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean canReadUsing(String s, SchemaInfo schemaInfo) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public List getCodecTypes(String s) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public void addCodecType(String s, CodecType codecType) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public List getGroupHistory(String s) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public Map getSchemaReferences(SchemaInfo schemaInfo) throws RegistryExceptions.ResourceNotFoundException, RegistryExceptions.UnauthorizedException + { + throw new UnsupportedOperationException(); + } + + @Override + public String getNamespace() + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws Exception + { + + } +} diff --git a/src/test/java/io/pravega/connectors/presto/util/MockStreamManager.java b/src/test/java/io/pravega/connectors/presto/util/MockStreamManager.java new file mode 100644 index 0000000..dae528a --- /dev/null +++ b/src/test/java/io/pravega/connectors/presto/util/MockStreamManager.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto.util; + +import io.pravega.client.admin.StreamInfo; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.DeleteScopeFailedException; +import io.pravega.client.stream.Stream; +import io.pravega.client.stream.StreamConfiguration; +import io.pravega.client.stream.StreamCut; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * this class has a very limited use in unit testing, so many methods not implemented + */ +public class MockStreamManager + implements StreamManager { + + private final List scopes = new ArrayList<>(); + + private final List streams = new ArrayList<>(); + + @Override + public boolean createStream(String s, String s1, StreamConfiguration streamConfiguration) + { + Stream stream = Stream.of(s, s1); + return !streams.contains(stream) && streams.add(stream); + } + + @Override + public boolean updateStream(String s, String s1, StreamConfiguration streamConfiguration) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean truncateStream(String s, String s1, StreamCut streamCut) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean sealStream(String s, String s1) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean deleteStream(String s, String s1) + { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator listScopes() + { + return scopes.iterator(); + } + + @Override + public boolean createScope(String s) + { + return !scopes.contains(s) && scopes.add(s); + } + + @Override + public boolean checkScopeExists(String s) + { + return scopes.contains(s); + } + + @Override + public Iterator listStreams(String s) + { + return streams.stream().filter(stream -> stream.getScope().equals(s)).iterator(); + } + + @Override + public boolean checkStreamExists(String s, String s1) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean deleteScope(String s) + { + return scopes.remove(s); + } + + @Override + public boolean deleteScope(String s, boolean b) throws DeleteScopeFailedException + { + throw new UnsupportedOperationException(); + } + + @Override + public StreamInfo getStreamInfo(String s, String s1) + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() + { + + } +} diff --git a/src/test/java/io/pravega/connectors/presto/integration/PravegaTestUtils.java b/src/test/java/io/pravega/connectors/presto/util/PravegaTestUtils.java similarity index 63% rename from src/test/java/io/pravega/connectors/presto/integration/PravegaTestUtils.java rename to src/test/java/io/pravega/connectors/presto/util/PravegaTestUtils.java index 2452608..1209a2c 100644 --- a/src/test/java/io/pravega/connectors/presto/integration/PravegaTestUtils.java +++ b/src/test/java/io/pravega/connectors/presto/util/PravegaTestUtils.java @@ -13,54 +13,41 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.pravega.connectors.presto.integration; +package io.pravega.connectors.presto.util; import com.facebook.airlift.json.JsonCodec; import com.facebook.presto.common.QualifiedObjectName; +import com.facebook.presto.metadata.FunctionAndTypeManager; +import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.tests.TestingPrestoClient; -import com.google.common.io.ByteStreams; -import com.google.common.io.CharStreams; import io.pravega.client.admin.StreamManager; import io.pravega.connectors.presto.PravegaStreamDescription; -import io.pravega.connectors.presto.PravegaStreamFieldDescription; -import io.pravega.connectors.presto.PravegaStreamFieldGroup; +import io.pravega.connectors.presto.integration.PravegaKeyValueLoader; +import io.pravega.connectors.presto.integration.PravegaLoader; +import io.pravega.connectors.presto.schemamanagement.LocalSchemaRegistry; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecordBuilder; import java.io.BufferedReader; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.UncheckedIOException; import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import static io.pravega.connectors.presto.util.PravegaStreamDescUtils.mapFieldsFromSchema; import static java.lang.String.format; public final class PravegaTestUtils { private PravegaTestUtils() {} - public static PravegaStreamDescription getStreamDesc(JsonCodec streamDescriptionCodec, String directory, String table) + public static LocalSchemaRegistry localSchemaRegistry(String dir) { - try (InputStream inputStream = PravegaTestUtils.class.getResourceAsStream(String.format("/%s/%s.json", directory, table))) { - return streamDescriptionCodec.fromJson(ByteStreams.toByteArray(inputStream)); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - public static String readSchema(String directory, String schema) - { - try (InputStreamReader reader = new InputStreamReader(PravegaTestUtils.class.getResourceAsStream(String.format("/%s/%s", directory, schema)))) { - return CharStreams.toString(reader); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } + JsonCodec streamDescCodec = new CodecSupplier<>( + PravegaStreamDescription.class, + FunctionAndTypeManager.createTestFunctionAndTypeManager()).get(); + return new LocalSchemaRegistry(new File("src/test/resources/" + dir).getAbsoluteFile(), streamDescCodec); } public static void loadTpchStream(URI controller, StreamManager streamManager, TestingPrestoClient prestoClient, String schema, String stream, QualifiedObjectName tpchTableName) @@ -86,7 +73,7 @@ public static void loadKeyValueTable(URI controller, StreamManager streamManager avroSchema(tableDesc, 0), avroSchema(tableDesc, 1))) { try (InputStream inputStream = PravegaTestUtils.class.getResourceAsStream(String.format("/kv/%s.records", table)); - BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { // each line in file is a record, key + value // '|' character separates key fields from values fields // fields separated by ',' @@ -139,34 +126,16 @@ static void setAvroValue(GenericRecordBuilder builder, Schema.Field field, Strin public static PravegaStreamDescription getKvStreamDesc(String table) { - JsonCodec jsonCodec = JsonCodec.jsonCodec(PravegaStreamDescription.class); - - PravegaStreamDescription streamDescription = getStreamDesc(jsonCodec, "kv", table); - streamDescription.getEvent().orElseThrow(IllegalArgumentException::new); - - PravegaStreamFieldGroup keyEvent = streamDescription.getEvent().get().get(0); - keyEvent.getDataSchema().orElseThrow(IllegalArgumentException::new); - String keySchema = PravegaTestUtils.readSchema("kv", keyEvent.getDataSchema().get()); - - PravegaStreamFieldGroup valueEvent = streamDescription.getEvent().get().get(1); - valueEvent.getDataSchema().orElseThrow(IllegalArgumentException::new); - String valueSchema = PravegaTestUtils.readSchema("kv", valueEvent.getDataSchema().get()); - - List keyFields = - mapFieldsFromSchema("key", keyEvent.getDataFormat(), keySchema); - - List valueFields = - mapFieldsFromSchema("value", valueEvent.getDataFormat(), valueSchema); - - List newFieldGroups = new ArrayList<>(2); - newFieldGroups.add(new PravegaStreamFieldGroup(keyEvent, keySchema, keyFields)); - newFieldGroups.add(new PravegaStreamFieldGroup(valueEvent, valueSchema, valueFields)); - - return new PravegaStreamDescription(streamDescription, newFieldGroups); + return localSchemaRegistry("kv").getTable(new SchemaTableName("kv", table)); } public static Schema avroSchema(PravegaStreamDescription streamDescription, int event) { return new Schema.Parser().parse(streamDescription.getEvent().get().get(event).getDataSchema().get()); } + + public static Schema avroSchema(String avroSchemaString) + { + return new Schema.Parser().parse(avroSchemaString); + } } diff --git a/src/test/java/io/pravega/connectors/presto/util/SchemaRegistryUtil.java b/src/test/java/io/pravega/connectors/presto/util/SchemaRegistryUtil.java new file mode 100644 index 0000000..b4882c1 --- /dev/null +++ b/src/test/java/io/pravega/connectors/presto/util/SchemaRegistryUtil.java @@ -0,0 +1,124 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto.util; + +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.collect.ImmutableMap; +import io.pravega.client.admin.StreamManager; +import io.pravega.connectors.presto.schemamanagement.CompositeSchemaRegistry; +import io.pravega.connectors.presto.schemamanagement.LocalSchemaRegistry; +import io.pravega.connectors.presto.schemamanagement.PravegaSchemaRegistry; +import io.pravega.connectors.presto.schemamanagement.SchemaRegistry; +import io.pravega.connectors.presto.schemamanagement.SchemaSupplier; +import io.pravega.schemaregistry.client.SchemaRegistryClient; +import io.pravega.schemaregistry.contract.data.Compatibility; +import io.pravega.schemaregistry.contract.data.GroupProperties; +import io.pravega.schemaregistry.contract.data.SerializationFormat; +import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema; + +import java.util.ArrayList; +import java.util.List; + +import static io.pravega.connectors.presto.util.PravegaTestUtils.avroSchema; + +/** + * build CompositeSchemaRegistry for use in unit tests + */ +public class SchemaRegistryUtil +{ + private final StreamManager streamManager; + private final SchemaRegistryClient schemaRegistryClient; + private final PravegaSchemaRegistry pravegaSchemaRegistry; + + private final List localSchemaRegistryList; + + public SchemaRegistryUtil() + { + this.streamManager = new MockStreamManager(); + this.schemaRegistryClient = new MockSchemaRegistryClient(); + + this.pravegaSchemaRegistry = new PravegaSchemaRegistry(schemaRegistryClient, streamManager); + + this.localSchemaRegistryList = new ArrayList<>(); + } + + public CompositeSchemaRegistry getSchemaRegistry() + { + List schemaSuppliers = new ArrayList<>(); + List schemaRegistries = new ArrayList<>(); + + localSchemaRegistryList.forEach(lsr -> { + schemaSuppliers.add(lsr); + schemaRegistries.add(lsr); + }); + + schemaSuppliers.add(pravegaSchemaRegistry); + schemaRegistries.add(pravegaSchemaRegistry); + + return new CompositeSchemaRegistry(schemaSuppliers, schemaRegistries); + } + + public void addLocalSchema(String dir) + { + localSchemaRegistryList.add(PravegaTestUtils.localSchemaRegistry(dir)); + } + + public boolean addSchema(String schema) + { + return streamManager.createScope(schema); + } + + public boolean addTable(SchemaTableName schemaTableName) + { + return streamManager.createStream(schemaTableName.getSchemaName(), + schemaTableName.getTableName(), + null); + } + + public boolean addTable(String schema, String stream) + { + return addTable(new SchemaTableName(schema, stream)); + } + + public boolean addTable(SchemaTableName schemaTableName, String schema) + { + if (!addTable(schemaTableName)) { + return false; + } + addAvroSchema(schemaTableName, schema); + return true; + } + + public void addAvroSchema(SchemaTableName schemaTableName, String schema) + { + schemaRegistryClient.addGroup(groupId(schemaTableName), groupProperties(false)); + schemaRegistryClient.addSchema(groupId(schemaTableName), AvroSchema.of(avroSchema(schema)).getSchemaInfo()); + } + + private static GroupProperties groupProperties(boolean inline) + { + return new GroupProperties( + SerializationFormat.Avro, + Compatibility.allowAny(), + false, + ImmutableMap.builder().put(inline ? "inline" : "", "").build()); + } + + private static String groupId(SchemaTableName schemaTableName) + { + return PravegaNameUtils.groupId(schemaTableName.getSchemaName(), schemaTableName.getTableName()); + } +} diff --git a/src/test/java/io/pravega/connectors/presto/util/TestSchemas.java b/src/test/java/io/pravega/connectors/presto/util/TestSchemas.java new file mode 100644 index 0000000..116cbcd --- /dev/null +++ b/src/test/java/io/pravega/connectors/presto/util/TestSchemas.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) Pravega Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pravega.connectors.presto.util; + +public class TestSchemas +{ + private TestSchemas() {} + + public static final String EMPLOYEE_AVSC = + "{\"namespace\": \"io.pravega.avro\",\"type\": \"record\",\"name\": \"Employee\",\"fields\": [{\"name\": \"first\", \"type\": \"string\"},{\"name\": \"last\", \"type\": \"string\"}]}"; +} diff --git a/src/test/resources/kv/employee.json b/src/test/resources/kv/kv.employee.json similarity index 100% rename from src/test/resources/kv/employee.json rename to src/test/resources/kv/kv.employee.json diff --git a/src/test/resources/tpch/customer.json b/src/test/resources/tpch/tpch.customer.json similarity index 100% rename from src/test/resources/tpch/customer.json rename to src/test/resources/tpch/tpch.customer.json diff --git a/src/test/resources/tpch/lineitem.json b/src/test/resources/tpch/tpch.lineitem.json similarity index 100% rename from src/test/resources/tpch/lineitem.json rename to src/test/resources/tpch/tpch.lineitem.json diff --git a/src/test/resources/tpch/nation.json b/src/test/resources/tpch/tpch.nation.json similarity index 100% rename from src/test/resources/tpch/nation.json rename to src/test/resources/tpch/tpch.nation.json diff --git a/src/test/resources/tpch/orders.json b/src/test/resources/tpch/tpch.orders.json similarity index 100% rename from src/test/resources/tpch/orders.json rename to src/test/resources/tpch/tpch.orders.json diff --git a/src/test/resources/tpch/part.json b/src/test/resources/tpch/tpch.part.json similarity index 100% rename from src/test/resources/tpch/part.json rename to src/test/resources/tpch/tpch.part.json diff --git a/src/test/resources/tpch/partsupp.json b/src/test/resources/tpch/tpch.partsupp.json similarity index 100% rename from src/test/resources/tpch/partsupp.json rename to src/test/resources/tpch/tpch.partsupp.json diff --git a/src/test/resources/tpch/region.json b/src/test/resources/tpch/tpch.region.json similarity index 100% rename from src/test/resources/tpch/region.json rename to src/test/resources/tpch/tpch.region.json diff --git a/src/test/resources/tpch/supplier.json b/src/test/resources/tpch/tpch.supplier.json similarity index 100% rename from src/test/resources/tpch/supplier.json rename to src/test/resources/tpch/tpch.supplier.json diff --git a/src/test/resources/ut/ut.multiexplicit.json b/src/test/resources/ut/ut.multiexplicit.json new file mode 100644 index 0000000..bf6b50c --- /dev/null +++ b/src/test/resources/ut/ut.multiexplicit.json @@ -0,0 +1,6 @@ +{ + "schemaName": "ut", + "tableName": "multiexplicit", + "objectName": "multiexplicit", + "objectArgs": ["stream1", "stream3"] +} \ No newline at end of file diff --git a/src/test/resources/ut/ut.multiregex.json b/src/test/resources/ut/ut.multiregex.json new file mode 100644 index 0000000..911ee32 --- /dev/null +++ b/src/test/resources/ut/ut.multiregex.json @@ -0,0 +1,5 @@ +{ + "schemaName": "ut", + "tableName": "multiregex", + "objectName": "stream[0-9]" +} \ No newline at end of file diff --git a/src/test/unit-test.xml b/src/test/unit-test.xml index 1737cd5..eca6ae0 100644 --- a/src/test/unit-test.xml +++ b/src/test/unit-test.xml @@ -4,6 +4,7 @@ +