Skip to content

Commit

Permalink
Moar logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
anjackson committed Jul 4, 2019
1 parent e5b1c06 commit 1126d0a
Showing 1 changed file with 6 additions and 0 deletions.
6 changes: 6 additions & 0 deletions crawldb/hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def extra_files(self):

def init_mapper(self):
# Set up DB connection...
logger.info("Getting DB connection...")
self.conn = psycopg2.connect(
database=self.cdb_db,
user=self.cdb_user,
Expand All @@ -103,9 +104,11 @@ def init_mapper(self):
host=self.cdb_host,
)
# Make each statement commit immediately.
logger.info("Configuring DB connection...")
self.conn.set_session(autocommit=True)

# Open a cursor to perform database operations.
logger.info("Getting DB cursor...")
self.cur = self.conn.cursor()

def run_mapper(self, stdin=sys.stdin, stdout=sys.stdout):
Expand All @@ -114,8 +117,10 @@ def run_mapper(self, stdin=sys.stdin, stdout=sys.stdout):
UKWA: In this case, we modify the mapper behaviour to make efficient, batched SQL inserts possible.
"""
logger.warning("Initialising...")
self.init_hadoop()
self.init_mapper()
logger.warning("Initialised. Now launching to execute_values...")
execute_values(
self.cur,
CrawlLogLine.upsert_sql,
Expand All @@ -127,6 +132,7 @@ def _map_input_reporter(self, stdin, stdout):
counter = 0
for result in self._map_input((line[:-1] for line in stdin)):
if counter%1000 == 0:
logger.warning( "Processed %i lines" % counter )
outputs = [("STATUS", "Processed %i lines" % counter)]
if self.reducer == NotImplemented:
self.writer(outputs, stdout)
Expand Down

0 comments on commit 1126d0a

Please sign in to comment.