Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question about spout and keyby distributions #11

Open
mencagli opened this issue Feb 10, 2020 · 13 comments
Open

Question about spout and keyby distributions #11

mencagli opened this issue Feb 10, 2020 · 13 comments

Comments

@mencagli
Copy link

Hi Tony,

Is it possible in BriskStream to distribute tuples from the source to the next operator in a key-by basis (fieldsgrouping)? To be clearer, suppose an application starting with:

Spout->Operator

where each tuple from the source is delivered to the right replica of the operator based on a key attribute. As far as I understand, all the applications in your repository start with a Spout followed by a parser operator, where the connection between Source and Parser is always shuffledgrouping while the one between the Parser and the next operator can be shuffledgrouping or fieldsgrouping. So, I am wondering if it is possible to bypass the Parser. Unfortunately, the superclass AbstractSpout does not seem to have a getDefaultFields method. So, maybe this feature is not currently provided.

Thanks!

@ShuhaoZhangTony
Copy link
Collaborator

Hi mencagli,

  1. fieldsgrouping is supported, as you can reference to WordCount.java to how to declare it. It is almost the same as Storm, but do take note the slight difference.

, new FieldsGrouping(Component.SPLITTER, new Fields(Field.WORD))

  1. Put things short first, it is by design that there has to be a Parser followed by Spout.

I use Parser operator to shift the workload of Spout so that Spout has nothing to do but keep emitting next tuple to keep the system busy.
This ensures that different applications will have the same maximum rate of input (if the subsequent pipelines can handle it).
Logically, the Parser is hence the usual ``Spout" of an application, and the Spout of BriskStream is nothing but a data generator run at maximum speed.

Hope it clarifies.

@mencagli
Copy link
Author

Thanks! So, you can confirm me that it is possible to connect directly the spout with the next operator using fieldsgrouping. I know that fieldsgrouping is supported in BriskStream. My precise question (it it was not clear) is to known whether the spout can emit directly with fieldgrouping to the next operator without having a parser in the middle (suppose I am doing this in FraudDetection where I want to connect the Spout with the Predictor directly without the parser and using fieldgrouping to respect the application semantics).

So if it is possible thanks, and thanks also for the explanation about the presence of a parser. It is reasonable.

Gabriele

@ShuhaoZhangTony
Copy link
Collaborator

Technically yes, but I didn't write the corresponding code to allow Spout to connect next operator with FieldsGrouping (always assume shuffle grouping).

As I just mentioned, I always assume Spout is connected directly to Parser, which acts as the logical Spout of an application.

If you seriously need to have spout to directly connect to any operators with FieldsGrouping, I can update BriskStream to support it, it probably requires just few lines of code.

@mencagli
Copy link
Author

Dear Tony,

if you can do that with little effort, it would be very very appreciated from my side. I need to remove the parser from my briskstream benchmark and having it is a waste of threads in my case. If you can modify the source code and tell me which part of the source code need to be updated, it would be perfect!

Many many thanks for your prompt reply and great help!

Gabriele

@ShuhaoZhangTony
Copy link
Collaborator

Dear Gabriele,

I just checked the code, it is already supported.
For illustration, I modified FaultDetection.java so that Spout is connected to Parser using FieldsGrouping. Of course, you can then by-pass Parser by other operators similarly.

Tony

@mencagli
Copy link
Author

Wondeful, thanks! I will try immediately.

@mencagli
Copy link
Author

Dear Tony,

everything seems to work, thanks! I am wondering if you have a close/cancel method in the spouts, which will be called before terminating like in Flink and Storm. This would be very helpful to collect statistics.
Thanks again,
Gabriele

@ShuhaoZhangTony
Copy link
Collaborator

Unfortunately, there is currently no such feature built in BriskStream. You may want to implement it in application itself, say, call ``system.exit()" when the Spout receive a special tuple.

@mencagli
Copy link
Author

Ok, thanks for the suggestion Tony.

@mencagli
Copy link
Author

Hi Tony,

sorry for bothering you. I have a simple question that might have a fast answer I hope.
Is it possible in BriskStream to set the size of the message buffers used by the ExecutorNode to exchange tuples with each other? I imagine the API does not allow this, but maybe you can give me some hints about where I can modify this parameter in the source code.

Many thanks for your help,

Gabriele

@ShuhaoZhangTony
Copy link
Collaborator

Hi Gabriele,

Currently, BriskStream relies on ``-bt" to tune the number of tuples being transmitted at once between executors.
That is to say, the size of the message buffer is (size of tuple) x (batch), where the `(batch)` is tuneable.

More fine-grained control of the size of the message buffer is unfortunately not allowed unless you statically define the size of each tuple. This is because of the usage of object reference queues in TStream, i.e., it does not care about the actual physical size of a tuple.

To modify this behaviour, you could try to use P1C1OffHeapQueue (located at package brisk.queue.impl), which forces the system to use a statically defined object to pass (say an integer). Then, you can fine-grained control the size of the message buffer (say define the input to the queue as one integer as a 32 bits message buffer, two integers as 64 bits and so on).

Tony

@mencagli
Copy link
Author

mencagli commented Feb 13, 2020

Thanks. I am wondering if there is possibility to choose the size of the object reference queue (Tstream?). For example 100 means that it can hold 100 objects independently from their type (each entry is just a reference). This is actually my goal to set the maximum number of references that can be written in that queue. Maybe now my question could be clearer. Sorry for that.

@ShuhaoZhangTony
Copy link
Collaborator

I see, so you are talking about the size of the communication queue.

Yes, you can change it. It is located in the executorThread.java.
Currently, it is hard-coded as 100000.
Check for the following code.

private void allocate_OutputQueue() { // if (enable_latency_measurement) { // executor.allocate_OutputQueue(conf.getBoolean("linked", false), 2);//no queueing delay. // } else { executor.allocate_OutputQueue(conf.getBoolean("linked", false), 100000); // } }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants