diff --git a/target_stitch/__init__.py b/target_stitch/__init__.py index e48a0b7..261ee8d 100755 --- a/target_stitch/__init__.py +++ b/target_stitch/__init__.py @@ -358,6 +358,7 @@ def __init__(self, # pylint: disable=too-many-arguments self.messages = [] self.buffer_size_bytes = 0 self.state = None + self.counts = {} # Mapping from stream name to {'schema': ..., 'key_names': ..., 'bookmark_names': ... } self.stream_meta = {} @@ -438,6 +439,12 @@ def handle_line(self, line): enough_bytes = num_bytes >= self.max_batch_bytes enough_messages = num_messages >= self.max_batch_records enough_time = num_seconds >= self.batch_delay_seconds + + if isinstance(message, singer.RecordMessage): + if not self.counts.get(message.stream): + self.counts[message.stream] = 0 + self.counts[message.stream] += 1 + if enough_bytes or enough_messages or enough_time: LOGGER.debug('Flushing %d bytes, %d messages, after %.2f seconds', num_bytes, num_messages, num_seconds) @@ -462,6 +469,13 @@ def consume(self, reader): self.handle_line(line) self.flush() + def print_counts(self): + printstr = '\n----------------------' + for stream_id, stream_count in self.counts.items(): + printstr += '\n{}: {}'.format(stream_id, stream_count) + printstr += '\n----------------------' + LOGGER.info(printstr) + def collect(): '''Send usage info to Stitch.''' @@ -554,11 +568,13 @@ def main_impl(): # queue = Queue(args.max_batch_records) reader = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') - TargetStitch(handlers, + target = TargetStitch(handlers, sys.stdout, args.max_batch_bytes, args.max_batch_records, - args.batch_delay_seconds).consume(reader) + args.batch_delay_seconds) + target.consume(reader) + target.print_counts() LOGGER.info("Exiting normally") def main():