Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cryptic error when declareOutputFields doesn't match emit fields in nextTuple #92

Open
dmitry-saritasa opened this issue Apr 17, 2017 · 2 comments

Comments

@dmitry-saritasa
Copy link

[2017-04-17 14:11:53,544][storm][ERROR]Sent failure message ("E_SPOUTFAILED__tspouts__dmitry__pid__24617__port__-1__taskindex__-1__StormIPCException") to Storm
[2017-04-17 14:11:58,548][storm][ERROR]Caught exception in Spout.run: Read EOF from stdin
Traceback (most recent call last):
  File "/home/dmitry/.pyenv/versions/3.5.2/envs/storm/lib/python3.5/site-packages/petrel-1.0.2.0.3-py3.5.egg/petrel/storm.py", line 438, in run
    msg = readCommand()
  File "/home/dmitry/.pyenv/versions/3.5.2/envs/storm/lib/python3.5/site-packages/petrel-1.0.2.0.3-py3.5.egg/petrel/storm.py", line 70, in readCommand
    msg = readMsg()
  File "/home/dmitry/.pyenv/versions/3.5.2/envs/storm/lib/python3.5/site-packages/petrel-1.0.2.0.3-py3.5.egg/petrel/storm.py", line 39, in readMsg
    raise StormIPCException('Read EOF from stdin')
petrel.storm.StormIPCException: Read EOF from stdin
Worker tspouts exiting normally.`
def declareOutputFields(self):
    return ['topic', 'key', 'record']

def nextTuple(self):
    if isinstance(record.value, dict):
                storm.emit([record.topic, record.value])

So if I forgot to update both methods - I get this cryptic error. Maybe we should define a different solution? For example have a Meta class which (inside (Spout or Bolt classes) like in Django ORM models) where we define output fields, and then we should always emit namedtuple so we can get attrgetter call to Meta.output_fields and that's it. Easier management for end developers

sample:

import collections
from operator import attrgetter

class KafkaSpout(Spout):
    class Meta:
        output_fields = ['topic', 'key', 'value']
    def nextTuple(self):
        _TUP = collections.namedtuple('message', 'topic key value')
        tup = _TUP('topic', 'GUID', dict(a=1, b=2, c=3))
        self.emit(tup)
    def emit(self, tup):
        storm.emit(attrgetter(*self.Meta.output_fields)(tup))

and then self.emit implementation will use operator.attrgetter to obtain values from tup using Meta.output_fields

What do you think, Barry?

@barrywhart
Copy link
Contributor

It seems like a reasonable thing for you to do in your topologies, but I don't want to place additional constraints on all users. In general, I've tried to keep the core storm.py pretty close to the one provided with Storm, in order to make it easier to merge in changes made in core Storm.

@barrywhart
Copy link
Contributor

Could your idea be implemented in a separate .py file, in subclasses of the core Storm / Petrel classes? That could be a good way to implement it without breaking backward compatibility or making it hard to merge upstream changes from Apache Storm.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants