Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/output config #497

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions blockchainetl/jobs/exporters/kafka_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,24 @@

class KafkaItemExporter:

def __init__(self, output, item_type_to_topic_mapping, converters=()):
def __init__(self, output, output_config, item_type_to_topic_mapping, converters=()):
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.converter = CompositeItemConverter(converters)
self.connection_url = self.get_connection_url(output)
print(self.connection_url)
self.producer = KafkaProducer(bootstrap_servers=self.connection_url)

self.topic_prefix = output_config.get('topic_prefix') or ''

logging.info('Kafka output config: {}'.format(output_config))

configs = {
'bootstrap_servers': self.connection_url,
**output_config
}

del configs['topic_prefix']

self.producer = KafkaProducer(**configs)

def get_connection_url(self, output):
try:
Expand All @@ -34,7 +46,7 @@ def export_item(self, item):
if item_type is not None and item_type in self.item_type_to_topic_mapping:
data = json.dumps(item).encode('utf-8')
logging.debug(data)
return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data)
return self.producer.send(self.topic_prefix + self.item_type_to_topic_mapping[item_type], data)
else:
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))

Expand Down
9 changes: 7 additions & 2 deletions ethereumetl/cli/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
'or kafka, output name and connection host:port e.g. kafka/127.0.0.1:9092 '
'or Kinesis, e.g. kinesis://your-data-stream-name'
'If not specified will print to console')
@click.option('--output-config-file', default='output_config.yaml', type=str,
help='Configuration for the output. File path. For Kafka, YAML file with KafkaProducer configurations (will override bootstrap_servers if included).')
@click.option('-s', '--start-block', default=None, show_default=True, type=int, help='Start block')
@click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_INFURA), show_default=True, type=str,
help='The list of entity types to export.')
Expand All @@ -53,7 +55,7 @@
@click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The number of workers')
@click.option('--log-file', default=None, show_default=True, type=str, help='Log file')
@click.option('--pid-file', default=None, show_default=True, type=str, help='pid file')
def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,
def stream(last_synced_block_file, lag, provider_uri, output, output_config_file, start_block, entity_types,
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None):
"""Streams all data types to console or Google Pub/Sub."""
configure_logging(log_file)
Expand All @@ -67,9 +69,12 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit
provider_uri = pick_random_provider_uri(provider_uri)
logging.info('Using ' + provider_uri)

if output_config_file:
logging.info('Using output config file ' + output_config_file)

streamer_adapter = EthStreamerAdapter(
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
item_exporter=create_item_exporters(output),
item_exporter=create_item_exporters(output, output_config_file),
batch_size=batch_size,
max_workers=max_workers,
entity_types=entity_types
Expand Down
26 changes: 22 additions & 4 deletions ethereumetl/streaming/item_exporter_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,33 @@
from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter
from blockchainetl.jobs.exporters.multi_item_exporter import MultiItemExporter

import os

def create_item_exporters(outputs):

def create_item_exporters(outputs, output_config_files):
split_outputs = [output.strip() for output in outputs.split(',')] if outputs else ['console']
split_output_config_files = [output_config_file.strip() for output_config_file in output_config_files.split(',')] if output_config_files else [None] * len(split_outputs)

if len(split_outputs) != len(split_output_config_files):
raise ValueError('outputs and output_config_files should have the same number of elements')

item_exporters = [create_item_exporter(output) for output in split_outputs]
item_exporters = [create_item_exporter(output, output_config) for output, output_config in zip(split_outputs, split_output_config_files)]
return MultiItemExporter(item_exporters)


def create_item_exporter(output):
def create_item_exporter(output, output_config_file):

output_config = {}
if os.path.isfile(output_config_file):
try:
import yaml
with open(output_config_file, 'r') as file:
output_config = yaml.safe_load(file)
except Exception as e:
raise ValueError('Error parsing output config file: ' + str(e))
else:
raise ValueError('Output config file not found: ' + output_config_file)

item_exporter_type = determine_item_exporter_type(output)
if item_exporter_type == ItemExporterType.PUBSUB:
from blockchainetl.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter
Expand Down Expand Up @@ -92,7 +110,7 @@ def array_to_str(val):
item_exporter = ConsoleItemExporter()
elif item_exporter_type == ItemExporterType.KAFKA:
from blockchainetl.jobs.exporters.kafka_exporter import KafkaItemExporter
item_exporter = KafkaItemExporter(output, item_type_to_topic_mapping={
item_exporter = KafkaItemExporter(output, output_config, item_type_to_topic_mapping={
'block': 'blocks',
'transaction': 'transactions',
'log': 'logs',
Expand Down