You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Example of the bolt on my side. When I run topology - everything works normal and my messages are persisted into database, but Storm UI shows NO acknowledged tuples. Zero. Although my bolt is inherited from BasicBolt that's supposed to do automated ack().
Thoughts?
importpprintimporttracebackimportconfigascfgimportinflectionfromdb.bigtableimportrecordsfromdb.bigtable.connectionsimportconnectfrompetrel.emitterimportBasicBolt# enable loggerslog=cfg.config.loggers.storm# establish bigtable connectionconnect(*cfg.config.db_connection_params, admin=True)
classPersistenceBolt(BasicBolt):
def__init__(self):
self.producer=cfg.config.kafka_producersuper().__init__(script=__file__)
defdeclareOutputFields(self):
"""Final bolt should not define any output fields you should always return empty array for the final bolt """return []
defprocess(self, tup):
# record is supposed to be dict with data we need to persisttopic, record=tup.valueslog.debug("{}:{}".format(topic, record))
# if record is not dictionary - bypass the recordifisinstance(record, dict):
try:
persist=getattr(
records,
inflection.camelize("{}_record".format(topic)))
persist(**record).save()
except:
problem= {
"topic": topic,
"record": record,
"error": traceback.format_exc()
}
# send to *_failed kafka topic so we can reprocess laterself.producer.send("{}_failed".format(topic), problem)
pp=pprint.PrettyPrinter(indent=4)
log.debug(pp.pformat(problem))
defrun():
"""Default function needed to run bolt as task inside Apache Storm Topology """PersistenceBolt().run()
The text was updated successfully, but these errors were encountered:
The ack() happens here. You might try adding logging to that function to make sure it's being executed. Or temporarily switch to Bolt and see if it helps to ack() explicitly.
hi Barry,
Example of the bolt on my side. When I run topology - everything works normal and my messages are persisted into database, but Storm UI shows NO acknowledged tuples. Zero. Although my bolt is inherited from BasicBolt that's supposed to do automated ack().
Thoughts?
The text was updated successfully, but these errors were encountered: