Skip to content
This repository has been archived by the owner on Feb 28, 2023. It is now read-only.

Commit

Permalink
Updating presto kinesis plugin to work with 0.157 (#3)
Browse files Browse the repository at this point in the history
Make compatible with presto 0.157
  • Loading branch information
ankitdixit authored and Shubham Tagra committed Mar 23, 2017
1 parent dd935be commit 91a103a
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 195 deletions.
28 changes: 7 additions & 21 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
<properties>
<dep.aws-sdk.version>1.11.12</dep.aws-sdk.version>
<dep.kinesis-client.version>1.6.3</dep.kinesis-client.version>
<presto.version>0.149</presto.version>
<presto.version>0.157</presto.version>
<jodatime.version>2.8.1</jodatime.version>
<opencsv.version>2.3</opencsv.version>
<jmxutils.version>1.19</jmxutils.version>
<com.google.version>4.0</com.google.version>
<com.fasterxml.version>2.4.4</com.fasterxml.version>
<com.fasterxml.version>2.8.1</com.fasterxml.version>
<findbugs.version>2.0.3</findbugs.version>
<slice.version>0.22</slice.version>
<airlift.version>0.128</airlift.version>
<airlift.version>0.139</airlift.version>
<javax.version>1</javax.version>
<guava.version>18.0</guava.version>
<validation.version>1.1.0.Final</validation.version>
Expand Down Expand Up @@ -68,28 +68,26 @@
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>${presto.version}</version>
<scope>provided</scope>

</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
<version>${presto.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
<version>${jmxutils.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
<version>${airlift.version}</version>
<scope>provided</scope>

<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
Expand All @@ -114,14 +112,13 @@
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
<version>${airlift.version}</version>
<scope>provided</scope>

<exclusions>
<exclusion>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</exclusion>
</exclusions>

</dependency>

<dependency>
Expand All @@ -134,83 +131,72 @@
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
<version>${airlift.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
<version>${airlift.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
<version>${slice.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
<version>${airlift.version}</version>
<version>1.0</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>${com.google.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
<version>${com.google.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${com.fasterxml.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${com.fasterxml.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>${javax.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>${validation.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
<version>${findbugs.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,30 @@
*/
package com.qubole.presto.kinesis;

import static java.util.Objects.requireNonNull;

import com.facebook.presto.connector.ConnectorId;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.NodeManager;
import com.google.inject.Key;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.json.JsonModule;
import io.airlift.log.Logger;

import java.util.Map;
import java.util.Optional;

import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorContext;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.type.TypeManager;
import java.util.function.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.json.JsonModule;
import io.airlift.log.Logger;

import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;

/**
*
Expand All @@ -50,6 +49,7 @@ public class KinesisConnectorFactory
public static final String connectorName = "kinesis";
private static final Logger log = Logger.get(KinesisConnectorFactory.class);

private final ClassLoader classLoader;
private TypeManager typeManager;
private NodeManager nodeManager;
private Optional<Supplier<Map<SchemaTableName, KinesisStreamDescription>>> tableDescriptionSupplier = Optional.empty();
Expand All @@ -60,14 +60,11 @@ public class KinesisConnectorFactory

private Injector injector;

KinesisConnectorFactory(TypeManager typeManager,
NodeManager nodeManager,
Optional<Supplier<Map<SchemaTableName, KinesisStreamDescription>>> tableDescriptionSupplier,
KinesisConnectorFactory(ClassLoader classLoader, Optional<Supplier<Map<SchemaTableName, KinesisStreamDescription>>> tableDescriptionSupplier,
Map<String, String> optionalConfig,
Optional<Class<? extends KinesisClientProvider>> altProviderClass)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.classLoader = classLoader;
this.tableDescriptionSupplier = requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null");
this.optionalConfig = requireNonNull(optionalConfig, "optionalConfig is null");
this.altProviderClass = requireNonNull(altProviderClass, "altProviderClass is null");
Expand All @@ -89,11 +86,11 @@ public String getName()
@Override
public ConnectorHandleResolver getHandleResolver()
{
return this.handleResolver;
return new KinesisHandleResolver(connectorName);
}

@Override
public Connector create(String connectorId, Map<String, String> config)
public Connector create(String connectorId, Map<String, String> config, ConnectorContext context)
{
log.info("In connector factory create method. Connector id: " + connectorId);
requireNonNull(connectorId, "connectorId is null");
Expand All @@ -103,34 +100,30 @@ public Connector create(String connectorId, Map<String, String> config)
Bootstrap app = new Bootstrap(
new JsonModule(),
new KinesisConnectorModule(),
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("connectorId")).to(connectorId);
binder.bind(TypeManager.class).toInstance(typeManager);
binder.bind(NodeManager.class).toInstance(nodeManager);
// Note: moved creation from KinesisConnectorModule because connector manager accesses it earlier!
binder.bind(KinesisHandleResolver.class).toInstance(handleResolver);

// Moved creation here from KinesisConnectorModule to make it easier to parameterize
if (altProviderClass.isPresent()) {
binder.bind(KinesisClientProvider.class).to(altProviderClass.get()).in(Scopes.SINGLETON);
}
else {
binder.bind(KinesisClientProvider.class).to(KinesisClientManager.class).in(Scopes.SINGLETON);
}

if (tableDescriptionSupplier.isPresent()) {
binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, KinesisStreamDescription>>>() {}).toInstance(tableDescriptionSupplier.get());
}
else {
binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, KinesisStreamDescription>>>() {}).to(KinesisTableDescriptionSupplier.class).in(Scopes.SINGLETON);
}
binder -> {
binder.bindConstant().annotatedWith(Names.named("connectorId")).to(connectorId);
binder.bind(ConnectorId.class).toInstance(new ConnectorId(connectorId));
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
binder.bind(NodeManager.class).toInstance(context.getNodeManager());
// Note: moved creation from KinesisConnectorModule because connector manager accesses it earlier!
binder.bind(KinesisHandleResolver.class).toInstance(new KinesisHandleResolver(connectorName));

// Moved creation here from KinesisConnectorModule to make it easier to parameterize
if (altProviderClass.isPresent()) {
binder.bind(KinesisClientProvider.class).to(altProviderClass.get()).in(Scopes.SINGLETON);
}
else {
binder.bind(KinesisClientProvider.class).to(KinesisClientManager.class).in(Scopes.SINGLETON);
}

if (tableDescriptionSupplier.isPresent()) {
binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, KinesisStreamDescription>>>() {}).toInstance(tableDescriptionSupplier.get());
}
else {
binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, KinesisStreamDescription>>>() {}).to(KinesisTableDescriptionSupplier.class).in(Scopes.SINGLETON);
}
}
);
);

this.injector = app.strictConfig()
.doNotInitializeLogging()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ErrorCode;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.ErrorType;

/**
* Kinesis connector specific error codes.
Expand All @@ -35,7 +36,7 @@ public enum KinesisErrorCode

KinesisErrorCode(int code)
{
errorCode = new ErrorCode(code + StartingErrorCode, name());
errorCode = new ErrorCode(code + StartingErrorCode, name(), ErrorType.INTERNAL_ERROR);
}

@Override
Expand Down
38 changes: 16 additions & 22 deletions src/main/java/com/qubole/presto/kinesis/KinesisPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,15 @@
*/
package com.qubole.presto.kinesis;

import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.type.TypeManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import io.airlift.log.Logger;

import javax.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
Expand All @@ -44,37 +39,36 @@ public class KinesisPlugin
{
private static final Logger log = Logger.get(KinesisPlugin.class);

private TypeManager typeManager;
private NodeManager nodeManager;
private Optional<Supplier<Map<SchemaTableName, KinesisStreamDescription>>> tableDescriptionSupplier = Optional.empty();
private Map<String, String> optionalConfig = ImmutableMap.of();
private Optional<Class<? extends KinesisClientProvider>> altProviderClass = Optional.empty();

private KinesisConnectorFactory factory;

@Override
public synchronized void setOptionalConfig(Map<String, String> optionalConfig)
private static ClassLoader getClassLoader()
{
this.optionalConfig = ImmutableMap.copyOf(requireNonNull(optionalConfig, "optionalConfig is null"));
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
classLoader = KinesisPlugin.class.getClassLoader();
}
return classLoader;
}

@Inject
public synchronized void setTypeManager(TypeManager typeManager)
public synchronized void setOptionalConfig(Map<String, String> optionalConfig)
{
// Note: this is done by the PluginManager when loading (not the injector of this plug in!)
log.info("Injecting type manager into KinesisPlugin");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.optionalConfig = ImmutableMap.copyOf(requireNonNull(optionalConfig, "optionalConfig is null"));
}

@Inject
public synchronized void setNodeManager(NodeManager nodeManager)
@Override
public synchronized Iterable<ConnectorFactory> getConnectorFactories()
{
log.info("Injecting node manager into KinesisPlugin");
this.nodeManager = requireNonNull(nodeManager, "node is null");
if(factory == null) {
this.factory = new KinesisConnectorFactory(getClassLoader(), tableDescriptionSupplier, optionalConfig, altProviderClass);
}
return ImmutableList.of(this.factory);
}

@Override
public synchronized <T> List<T> getServices(Class<T> type)
/*public synchronized <T> List<T> getServices(Class<T> type)
{
if (type == ConnectorFactory.class) {
if (this.factory == null) {
Expand All @@ -84,7 +78,7 @@ public synchronized <T> List<T> getServices(Class<T> type)
return ImmutableList.of(type.cast(this.factory));
}
return ImmutableList.of();
}
}*/

@VisibleForTesting
public synchronized void setTableDescriptionSupplier(Supplier<Map<SchemaTableName, KinesisStreamDescription>> tableDescriptionSupplier)
Expand Down
Loading

0 comments on commit 91a103a

Please sign in to comment.