Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ack() is not happening with BasicBolt / Apache Storm 1.0.3? #88

Open
dmitry-saritasa opened this issue Apr 11, 2017 · 1 comment
Open

Comments

@dmitry-saritasa
Copy link

hi Barry,

Example of the bolt on my side. When I run topology - everything works normal and my messages are persisted into database, but Storm UI shows NO acknowledged tuples. Zero. Although my bolt is inherited from BasicBolt that's supposed to do automated ack().

Thoughts?

import pprint
import traceback

import config as cfg
import inflection
from db.bigtable import records
from db.bigtable.connections import connect
from petrel.emitter import BasicBolt

# enable loggers
log = cfg.config.loggers.storm

# establish bigtable connection
connect(*cfg.config.db_connection_params, admin=True)

class PersistenceBolt(BasicBolt):
    def __init__(self):
        self.producer = cfg.config.kafka_producer
        super().__init__(script=__file__)

    def declareOutputFields(self):
        """Final bolt should not define any output fields
        you should always return empty array for the final bolt
        """
        return []

    def process(self, tup):
        # record is supposed to be dict with data we need to persist
        topic, record = tup.values
        log.debug("{}:{}".format(topic, record))

        # if record is not dictionary - bypass the record
        if isinstance(record, dict):
            try:
                persist = getattr(
                    records,
                    inflection.camelize("{}_record".format(topic)))
                persist(**record).save()
            except:               
                problem = {
                    "topic": topic,
                    "record": record,
                    "error": traceback.format_exc()
                }
                # send to *_failed kafka topic so we can reprocess later
                self.producer.send("{}_failed".format(topic), problem)
                pp = pprint.PrettyPrinter(indent=4)
                log.debug(pp.pformat(problem))


def run():
    """Default function needed to run bolt as task
    inside Apache Storm Topology
    """
    PersistenceBolt().run()
@dmitry-saritasa dmitry-saritasa changed the title ack() is not happening? ack() is not happening with Basic? Apr 11, 2017
@dmitry-saritasa dmitry-saritasa changed the title ack() is not happening with Basic? ack() is not happening with BasicBolt / Apache Storm 1.0.3? Apr 11, 2017
@barrywhart
Copy link
Contributor

The ack() happens here. You might try adding logging to that function to make sure it's being executed. Or temporarily switch to Bolt and see if it helps to ack() explicitly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants