diff --git a/blockchainetl/jobs/exporters/kafka_exporter.py b/blockchainetl/jobs/exporters/kafka_exporter.py index ea548d5b3..940ff8bda 100644 --- a/blockchainetl/jobs/exporters/kafka_exporter.py +++ b/blockchainetl/jobs/exporters/kafka_exporter.py @@ -9,12 +9,24 @@ class KafkaItemExporter: - def __init__(self, output, item_type_to_topic_mapping, converters=()): + def __init__(self, output, output_config, item_type_to_topic_mapping, converters=()): self.item_type_to_topic_mapping = item_type_to_topic_mapping self.converter = CompositeItemConverter(converters) self.connection_url = self.get_connection_url(output) print(self.connection_url) - self.producer = KafkaProducer(bootstrap_servers=self.connection_url) + + self.topic_prefix = output_config.get('topic_prefix') or '' + + logging.info('Kafka output config: {}'.format(output_config)) + + configs = { + 'bootstrap_servers': self.connection_url, + **output_config + } + + del configs['topic_prefix'] + + self.producer = KafkaProducer(**configs) def get_connection_url(self, output): try: @@ -34,7 +46,7 @@ def export_item(self, item): if item_type is not None and item_type in self.item_type_to_topic_mapping: data = json.dumps(item).encode('utf-8') logging.debug(data) - return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data) + return self.producer.send(self.topic_prefix + self.item_type_to_topic_mapping[item_type], data) else: logging.warning('Topic for item type "{}" is not configured.'.format(item_type)) diff --git a/ethereumetl/cli/stream.py b/ethereumetl/cli/stream.py index 217208dba..a9ceaaea2 100644 --- a/ethereumetl/cli/stream.py +++ b/ethereumetl/cli/stream.py @@ -44,6 +44,8 @@ 'or kafka, output name and connection host:port e.g. kafka/127.0.0.1:9092 ' 'or Kinesis, e.g. kinesis://your-data-stream-name' 'If not specified will print to console') +@click.option('--output-config-file', default='output_config.yaml', type=str, + help='Configuration for the output. File path. For Kafka, YAML file with KafkaProducer configurations (will override bootstrap_servers if included).') @click.option('-s', '--start-block', default=None, show_default=True, type=int, help='Start block') @click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_INFURA), show_default=True, type=str, help='The list of entity types to export.') @@ -53,7 +55,7 @@ @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The number of workers') @click.option('--log-file', default=None, show_default=True, type=str, help='Log file') @click.option('--pid-file', default=None, show_default=True, type=str, help='pid file') -def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types, +def stream(last_synced_block_file, lag, provider_uri, output, output_config_file, start_block, entity_types, period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None): """Streams all data types to console or Google Pub/Sub.""" configure_logging(log_file) @@ -67,9 +69,12 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit provider_uri = pick_random_provider_uri(provider_uri) logging.info('Using ' + provider_uri) + if output_config_file: + logging.info('Using output config file ' + output_config_file) + streamer_adapter = EthStreamerAdapter( batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)), - item_exporter=create_item_exporters(output), + item_exporter=create_item_exporters(output, output_config_file), batch_size=batch_size, max_workers=max_workers, entity_types=entity_types diff --git a/ethereumetl/streaming/item_exporter_creator.py b/ethereumetl/streaming/item_exporter_creator.py index 0f3c627a5..da1f57e84 100644 --- a/ethereumetl/streaming/item_exporter_creator.py +++ b/ethereumetl/streaming/item_exporter_creator.py @@ -23,15 +23,33 @@ from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter from blockchainetl.jobs.exporters.multi_item_exporter import MultiItemExporter +import os -def create_item_exporters(outputs): + +def create_item_exporters(outputs, output_config_files): split_outputs = [output.strip() for output in outputs.split(',')] if outputs else ['console'] + split_output_config_files = [output_config_file.strip() for output_config_file in output_config_files.split(',')] if output_config_files else [None] * len(split_outputs) + + if len(split_outputs) != len(split_output_config_files): + raise ValueError('outputs and output_config_files should have the same number of elements') - item_exporters = [create_item_exporter(output) for output in split_outputs] + item_exporters = [create_item_exporter(output, output_config) for output, output_config in zip(split_outputs, split_output_config_files)] return MultiItemExporter(item_exporters) -def create_item_exporter(output): +def create_item_exporter(output, output_config_file): + + output_config = {} + if os.path.isfile(output_config_file): + try: + import yaml + with open(output_config_file, 'r') as file: + output_config = yaml.safe_load(file) + except Exception as e: + raise ValueError('Error parsing output config file: ' + str(e)) + else: + raise ValueError('Output config file not found: ' + output_config_file) + item_exporter_type = determine_item_exporter_type(output) if item_exporter_type == ItemExporterType.PUBSUB: from blockchainetl.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter @@ -92,7 +110,7 @@ def array_to_str(val): item_exporter = ConsoleItemExporter() elif item_exporter_type == ItemExporterType.KAFKA: from blockchainetl.jobs.exporters.kafka_exporter import KafkaItemExporter - item_exporter = KafkaItemExporter(output, item_type_to_topic_mapping={ + item_exporter = KafkaItemExporter(output, output_config, item_type_to_topic_mapping={ 'block': 'blocks', 'transaction': 'transactions', 'log': 'logs',