Real-time data ingestion engine
stream.source
: kafka
or kinesis
target
: s3
or custom implementation
- Poll data data from the
stream.source
- Store it in buffer and and deserialize the data into a internal in-memory columnar format
Page
. - Flush the data into the
target
periodically usingmiddleware.max-flush-duration
config.
stream.max-flush-duration=15s
middleware.max-flush-duration=60s
Consumes data from your Kinesis streams
kinesis.stream=
kinesis.max-records-per-batch=
kinesis.consumer-dynamodb-table=
aws.enable-cloudwatch=
aws.kinesis-endpoint=
aws.dynamodb-endpoint=
aws.region=
aws.access-key= ## instance profile is used when not provided
aws.secret-access-key= ## instance profile is used when not provided
Consumes data from your Kafka cluster
max.poll.records=
historical.worker=false
kafka.topic=
kafka.offset='earliest' or 'latest'
kafka.group.id=
kafka.session.timeout.ms=
kafka.request.timeout.ms=
kafka.nodes=127.0.0.1,127.0.0.2
kafka.historical-data-topic=
source.data-format=AVRO or JSON
Creates compressed GZIP files with JSON content inside your S3 bucket
target.aws.region=
target.aws.s3-bucket=
target.access-key= ## instance profile is used when not provided
target.secret-access-key= ## instance profile is used when not provided
target.aws.s3-endpoint=
target.aws.s3-max-data-size=
Create the following Java classes:
public class CustomModule
extends AbstractConfigurationAwareModule {
@Override
protected void setup(Binder binder) {
binder.bind(DatabaseHandler.class).to(CustomDatabaseHandler.class).asEagerSingleton();
}
}
public class CustomDatabaseHandler extends AbstractDatabaseHandler {
@Inject
public CustomDatabaseHandler(@Named("metadata.store.jdbc") JDBCPoolDataSource metadataStore) {
super(metadataStore);
}
@Override
public Inserter insert(String projectName, String eventType, List<ColumnMetadata> eventProperties) {
return new Inserter() {
// we make use of PrestoSQL's Page as it provides us a way to process the data in a columnar and efficient way. See: https://github.com/prestosql/presto/blob/master/presto-spi/src/main/java/io/prestosql/spi/Page.java
@Override
public void addPage(Page page) {
for (int i = 0; i < eventProperties.size(); i++) {
ColumnMetadata property = eventProperties.get(i);
Block block = page.getBlock(i);
if(property.getType() == DoubleType.DOUBLE) {
// the `value` is the first value of property in our micro-batch
double value = DoubleType.DOUBLE.getDouble(block, 0);
}
}
}
@Override
public CompletableFuture commit() {
// If the future is completed, the checkpoint will be executed. We make use of checkpointing just to make sure that we process the data in a reliable way. If the worker dies, we will try processing the batch in a different worker.
return CompletableFuture.completedFuture(null);
}
};
}
}
Add it to io.rakam.presto.ServiceStarter.TargetConfig.Target
enum class for the system to install your module when target
is set to your implementation and you're done!
If your enum key in io.rakam.presto.ServiceStarter.TargetConfig.Target
is CUSTOM
, start the collector with config target=custom
and it will be used for target connector.