Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce State Passing Between Operators #2802

Merged
merged 203 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 158 commits
Commits
Show all changes
203 commits
Select commit Hold shift + click to select a range
3af95b0
init
aglinxinyuan Mar 10, 2024
6c17a01
Merge branch 'master' into xinyuan-marker
aglinxinyuan Mar 11, 2024
c5208d7
fix format
aglinxinyuan Mar 11, 2024
a4c75c4
Merge remote-tracking branch 'origin/xinyuan-marker' into xinyuan-marker
aglinxinyuan Mar 11, 2024
6830a06
fix format
aglinxinyuan Mar 11, 2024
c40fb17
Merge branch 'master' into xinyuan-marker
aglinxinyuan Mar 18, 2024
8f3975b
rename
aglinxinyuan Mar 18, 2024
7bb8c7e
Merge branch 'master' into xinyuan-marker
aglinxinyuan Mar 20, 2024
c1f48bf
fix fmt
aglinxinyuan Mar 20, 2024
7e40798
update
aglinxinyuan Mar 25, 2024
6ac2e71
Merge branch 'master' into xinyuan-marker
aglinxinyuan Mar 28, 2024
775aff2
Merge branch 'master' into xinyuan-marker
aglinxinyuan Mar 30, 2024
a2e9f34
Merge branch 'master' into xinyuan-marker
aglinxinyuan Apr 1, 2024
91beb72
Merge branch 'master' into xinyuan-marker
aglinxinyuan Apr 2, 2024
e7bab7f
add string serialization and test program
shengquan-ni Apr 4, 2024
e196f74
Merge branch 'master' into xinyuan-marker
aglinxinyuan Apr 4, 2024
6fb5466
update
aglinxinyuan Apr 5, 2024
772fbf7
update serialization
shengquan-ni Apr 5, 2024
7fd673e
Merge branch 'master' into xinyuan-marker
aglinxinyuan Apr 8, 2024
289787c
update
aglinxinyuan Apr 12, 2024
4ccf968
Merge branch 'master' into xinyuan-marker
aglinxinyuan Apr 16, 2024
3904018
update
aglinxinyuan Apr 16, 2024
58d6afd
update
aglinxinyuan Apr 16, 2024
d32b4f1
Merge branch 'master' into xinyuan-marker
aglinxinyuan Apr 29, 2024
ab8d438
Merge branch 'master' into xinyuan-marker
aglinxinyuan May 11, 2024
49a321d
update
aglinxinyuan May 12, 2024
5020377
update
aglinxinyuan May 13, 2024
20800ea
Merge branch 'master' into xinyuan-marker
aglinxinyuan May 13, 2024
bba1714
Merge branch 'master' into xinyuan-marker
aglinxinyuan May 19, 2024
02f420f
update
aglinxinyuan May 19, 2024
e50ad09
update
aglinxinyuan May 19, 2024
c2ede6b
Merge branch 'master' into xinyuan-marker
aglinxinyuan May 27, 2024
332aa0c
Merge branch 'master' into xinyuan-marker
aglinxinyuan Jul 24, 2024
7c27f69
Merge branch 'master' into xinyuan-marker
aglinxinyuan Jul 27, 2024
6981ea1
fix
aglinxinyuan Jul 28, 2024
2c0c8d4
fix
aglinxinyuan Jul 29, 2024
a4e4982
fix fmt
aglinxinyuan Jul 31, 2024
f5aeeb9
Merge branch 'master' into xinyuan-marker
aglinxinyuan Aug 3, 2024
e47dc9d
update
aglinxinyuan Aug 3, 2024
8cbb603
update
aglinxinyuan Aug 3, 2024
4d4babc
update
aglinxinyuan Aug 4, 2024
bcad4bf
update
aglinxinyuan Aug 4, 2024
98e98eb
update
aglinxinyuan Aug 4, 2024
7286df5
Revert "update"
aglinxinyuan Aug 4, 2024
b617d62
update
aglinxinyuan Aug 4, 2024
812706f
update
aglinxinyuan Aug 4, 2024
e6e93a6
update
aglinxinyuan Aug 4, 2024
cdf0d88
update
aglinxinyuan Aug 4, 2024
9749cdb
update
aglinxinyuan Aug 4, 2024
82367aa
update
aglinxinyuan Aug 4, 2024
8402750
update
aglinxinyuan Aug 4, 2024
7addf6e
update
aglinxinyuan Aug 5, 2024
81b72c4
update
aglinxinyuan Aug 5, 2024
6c403b8
update
aglinxinyuan Aug 5, 2024
5931bb0
update
aglinxinyuan Aug 6, 2024
c77bc97
update
aglinxinyuan Aug 6, 2024
a64df1f
Merge branch 'master' into xinyuan-marker
aglinxinyuan Aug 6, 2024
4a984b2
update
aglinxinyuan Aug 6, 2024
85a7e07
update
aglinxinyuan Aug 6, 2024
03a44f6
update
aglinxinyuan Aug 6, 2024
a5a27f6
update
aglinxinyuan Aug 6, 2024
dd623cf
update
aglinxinyuan Aug 6, 2024
ab2f9b0
update
aglinxinyuan Aug 6, 2024
eed3433
update
aglinxinyuan Aug 6, 2024
ca8d36b
update
aglinxinyuan Aug 7, 2024
cb5eecb
update
aglinxinyuan Aug 7, 2024
30d4695
update
aglinxinyuan Aug 7, 2024
4822bbe
update
aglinxinyuan Aug 7, 2024
79856ba
update
aglinxinyuan Aug 7, 2024
ca02b6d
update
aglinxinyuan Aug 7, 2024
1122caf
update
aglinxinyuan Aug 7, 2024
5472101
update
aglinxinyuan Aug 7, 2024
234e83b
update
aglinxinyuan Aug 7, 2024
526b495
update
aglinxinyuan Aug 7, 2024
3773ca1
update
aglinxinyuan Aug 7, 2024
0b19214
Merge branch 'master' into xinyuan-marker
aglinxinyuan Aug 8, 2024
1ff31d1
update
aglinxinyuan Aug 8, 2024
72c873d
update
aglinxinyuan Aug 8, 2024
b9dd6ab
update
aglinxinyuan Aug 9, 2024
0ee5fd3
update
aglinxinyuan Aug 9, 2024
d0acbbb
update
aglinxinyuan Aug 10, 2024
66b3928
update
aglinxinyuan Aug 10, 2024
78f3259
update
aglinxinyuan Aug 10, 2024
bd83627
Merge branch 'master' into xinyuan-marker
aglinxinyuan Aug 10, 2024
e177765
Merge branch 'master' into xinyuan-marker
aglinxinyuan Aug 22, 2024
dfcfa69
fix fmt
aglinxinyuan Aug 22, 2024
daf7615
fix fmt
aglinxinyuan Aug 22, 2024
1add5d2
fix fmt
aglinxinyuan Aug 22, 2024
11381c0
update
aglinxinyuan Aug 22, 2024
484feda
update
aglinxinyuan Aug 22, 2024
eb60922
init
aglinxinyuan Aug 22, 2024
25468fa
update
aglinxinyuan Aug 22, 2024
995b6d5
update
aglinxinyuan Aug 22, 2024
92b3c95
update
aglinxinyuan Aug 22, 2024
ad874a9
update
aglinxinyuan Aug 22, 2024
770deba
update
aglinxinyuan Aug 22, 2024
fc38808
update
aglinxinyuan Aug 22, 2024
df3a3ab
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 3, 2024
3da62d8
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 3, 2024
64197e2
update
aglinxinyuan Sep 5, 2024
b636546
update
aglinxinyuan Sep 5, 2024
7750b7f
update
aglinxinyuan Sep 5, 2024
ab93577
update
aglinxinyuan Sep 5, 2024
d08a830
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 5, 2024
86282de
update
aglinxinyuan Sep 5, 2024
7ceea09
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 7, 2024
4fb386c
update
aglinxinyuan Sep 7, 2024
f3d7336
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 7, 2024
693ac00
update
aglinxinyuan Sep 7, 2024
823919d
update
aglinxinyuan Sep 7, 2024
436e113
update
aglinxinyuan Sep 7, 2024
4773020
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 7, 2024
4a606e4
update
aglinxinyuan Sep 8, 2024
8ddea2c
update
aglinxinyuan Sep 9, 2024
ce832ff
update
aglinxinyuan Sep 9, 2024
ffe9d26
update
aglinxinyuan Sep 9, 2024
a60f562
update
aglinxinyuan Sep 9, 2024
27a40fc
update
aglinxinyuan Sep 9, 2024
a88edd1
update
aglinxinyuan Sep 10, 2024
79a4e8f
update
aglinxinyuan Sep 12, 2024
4c422f2
update
aglinxinyuan Sep 12, 2024
00c1504
update
aglinxinyuan Sep 12, 2024
205d589
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 14, 2024
51dff7d
update
aglinxinyuan Sep 14, 2024
bbffbbe
update
aglinxinyuan Sep 14, 2024
e0d6056
update
aglinxinyuan Sep 14, 2024
fdbc91f
update
aglinxinyuan Sep 14, 2024
e5c9cf5
update
aglinxinyuan Sep 14, 2024
20f9c02
update
aglinxinyuan Sep 14, 2024
f9c201b
update
aglinxinyuan Sep 14, 2024
5254aa7
update
aglinxinyuan Sep 14, 2024
067247f
update
aglinxinyuan Sep 15, 2024
13eafe0
update
aglinxinyuan Sep 15, 2024
1887bfb
update
aglinxinyuan Sep 16, 2024
083e866
update
aglinxinyuan Sep 16, 2024
87e305a
update
aglinxinyuan Sep 16, 2024
59aa9a9
update
aglinxinyuan Sep 16, 2024
d603c41
update
aglinxinyuan Sep 17, 2024
30be7db
fix format
aglinxinyuan Sep 17, 2024
4cc7e76
fix format
aglinxinyuan Sep 17, 2024
dbe435d
fix format
aglinxinyuan Sep 17, 2024
fdee773
fix test
aglinxinyuan Sep 17, 2024
e4594a8
fix test
aglinxinyuan Sep 17, 2024
760b590
fix fmt
aglinxinyuan Sep 17, 2024
a2783f3
fix fmt
aglinxinyuan Sep 17, 2024
809a9af
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 17, 2024
4e8c027
update
aglinxinyuan Sep 17, 2024
fdf68df
update
aglinxinyuan Sep 18, 2024
f0e028e
fix fmt
aglinxinyuan Sep 18, 2024
efa22ff
update
aglinxinyuan Sep 18, 2024
e3ff682
update
aglinxinyuan Sep 18, 2024
f4f5318
fix fmt
aglinxinyuan Sep 18, 2024
2e136f2
fix fmt
aglinxinyuan Sep 18, 2024
cff6f57
fix fmt
aglinxinyuan Sep 18, 2024
6372288
update
aglinxinyuan Sep 18, 2024
990a720
update
aglinxinyuan Sep 18, 2024
6e50be1
update
aglinxinyuan Sep 18, 2024
8889e61
update
aglinxinyuan Sep 18, 2024
183d21d
fix
aglinxinyuan Sep 19, 2024
a0a34a1
fix
aglinxinyuan Sep 19, 2024
3316f47
fix
aglinxinyuan Sep 19, 2024
e0a856b
fix
aglinxinyuan Sep 19, 2024
4eb0564
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 19, 2024
d91c167
fix
aglinxinyuan Sep 19, 2024
600934f
fix
aglinxinyuan Sep 19, 2024
7f511df
fix
aglinxinyuan Sep 19, 2024
47f88e6
fix
aglinxinyuan Sep 19, 2024
bf159cc
fix
aglinxinyuan Sep 19, 2024
a1ead8f
fix
aglinxinyuan Sep 19, 2024
4800620
fix
aglinxinyuan Sep 19, 2024
c454017
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 19, 2024
64b28f2
update
aglinxinyuan Sep 19, 2024
13b4dae
update
aglinxinyuan Sep 19, 2024
d4d2a48
update
aglinxinyuan Sep 19, 2024
38e57cc
update
aglinxinyuan Sep 19, 2024
f5b02cf
update
aglinxinyuan Sep 19, 2024
dbe12a0
update
aglinxinyuan Sep 19, 2024
6903721
update
aglinxinyuan Sep 19, 2024
4045d2f
update
aglinxinyuan Sep 20, 2024
c471697
update
aglinxinyuan Sep 20, 2024
18da3a5
Revert "update"
aglinxinyuan Sep 20, 2024
c60e606
update
aglinxinyuan Sep 20, 2024
96230f7
rename StartOfUpstream
aglinxinyuan Sep 20, 2024
c5632a3
rename EndOfUpstream
aglinxinyuan Sep 20, 2024
7e4d937
fix fmt
aglinxinyuan Sep 20, 2024
12dabe6
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 20, 2024
8989a26
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 20, 2024
1018814
update
aglinxinyuan Sep 20, 2024
485a143
update
aglinxinyuan Sep 20, 2024
a97b49d
update
aglinxinyuan Sep 20, 2024
52196b9
update
aglinxinyuan Sep 20, 2024
bda1ffc
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 20, 2024
6873652
update
aglinxinyuan Sep 20, 2024
fea6543
Merge remote-tracking branch 'origin/xinyuan-state-passing' into xiny…
aglinxinyuan Sep 20, 2024
3fad565
update
aglinxinyuan Sep 20, 2024
b816ad4
update
aglinxinyuan Sep 20, 2024
2f85274
update
aglinxinyuan Sep 20, 2024
6e92ee4
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 21, 2024
901e3ea
update
aglinxinyuan Sep 21, 2024
216dea8
fix fmt
aglinxinyuan Sep 21, 2024
d534956
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 21, 2024
053eca6
update
aglinxinyuan Sep 23, 2024
5828053
Merge branch 'master' into xinyuan-state-passing
aglinxinyuan Sep 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .console_message_manager import ConsoleMessageManager
from .debug_manager import DebugManager
from .exception_manager import ExceptionManager
from .marker_processing_manager import MarkerProcessingManager
from .tuple_processing_manager import TupleProcessingManager
from .executor_manager import ExecutorManager
from .pause_manager import PauseManager
Expand All @@ -26,6 +27,7 @@ def __init__(self, worker_id, input_queue):
self.input_queue: InternalQueue = input_queue
self.executor_manager = ExecutorManager()
self.tuple_processing_manager = TupleProcessingManager()
self.marker_processing_manager = MarkerProcessingManager()
self.exception_manager = ExceptionManager()
self.state_manager = StateManager(
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Optional
from core.models.marker import State, Marker


class MarkerProcessingManager:
def __init__(self):
self.current_input_marker: Optional[Marker] = None
self.current_output_state: Optional[State] = None

def get_input_marker(self) -> Optional[State]:
ret, self.current_input_marker = self.current_input_marker, None
return ret

def get_output_state(self) -> Optional[State]:
ret, self.current_output_state = self.current_output_state, None
return ret
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
from threading import Event, Condition
from typing import Optional, Union, Tuple, Iterator
from typing import Optional, Tuple, Iterator

from core.models import InputExhausted
from core.models.marker import State
from proto.edu.uci.ics.amber.engine.common import PortIdentity


class TupleProcessingManager:
def __init__(self):
self.current_input_tuple: Optional[Union[Tuple, InputExhausted]] = None
self.current_input_tuple: Optional[Tuple] = None
self.current_input_port_id: Optional[PortIdentity] = None
self.current_input_tuple_iter: Optional[
Iterator[Union[Tuple, InputExhausted]]
] = None
self.current_input_tuple_iter: Optional[Iterator[Tuple]] = None
self.current_output_tuple: Optional[Tuple] = None
self.context_switch_condition: Condition = Condition()
self.finished_current: Event = Event()

def get_input_tuple(self) -> Optional[State]:
aglinxinyuan marked this conversation as resolved.
Show resolved Hide resolved
ret, self.current_input_tuple = self.current_input_tuple, None
return ret

def get_output_tuple(self) -> Optional[Tuple]:
ret, self.current_output_tuple = self.current_output_tuple, None
return ret

def get_input_port(self) -> int:
aglinxinyuan marked this conversation as resolved.
Show resolved Hide resolved
port_id = self.current_input_port_id
port: int
if port_id is None:
# no upstream, special case for source executor.
port = 0
else:
port = port_id.id
return port
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from typing import Iterator, Optional, Union, Dict, List

from core.models import Tuple, ArrowTableTupleProvider, Schema, InputExhausted
from core.models.internal_marker import EndOfAll, InternalMarker, SenderChange
from core.models.marker import EndOfUpstream
from pyarrow.lib import Table
from core.models import Tuple, ArrowTableTupleProvider, Schema
from core.models.internal_marker import (
InternalMarker,
StartOfOutputPorts,
EndOfOutputPorts,
SenderChange,
)
from core.models.marker import EndOfUpstream, State, StartOfUpstream, Marker
from core.models.payload import DataFrame, DataPayload, MarkerFrame
from proto.edu.uci.ics.amber.engine.common import (
ActorVirtualIdentity,
Expand Down Expand Up @@ -50,6 +55,7 @@ def __init__(self):
self._ports: Dict[PortIdentity, WorkerPort] = dict()
self._channels: Dict[ChannelIdentity, Channel] = dict()
self._current_channel_id: Optional[ChannelIdentity] = None
self.started = False

def add_input_port(self, port_id: PortIdentity, schema: Schema) -> None:
if port_id.id is None:
Expand Down Expand Up @@ -78,16 +84,21 @@ def register_input(

def process_data_payload(
self, from_: ActorVirtualIdentity, payload: DataPayload
) -> Iterator[Union[Tuple, InputExhausted, InternalMarker]]:
) -> Iterator[Union[Tuple, InternalMarker]]:
# special case used to yield for source op
if from_ == InputManager.SOURCE_STARTER:
yield InputExhausted()
yield EndOfAll()
yield EndOfUpstream()
yield EndOfOutputPorts()
return
current_channel_id = None
for channel_id, channel in self._channels.items():
if channel_id.from_worker_id == from_:
current_channel_id = channel_id

current_channel_id = next(
(
channel_id
for channel_id, channel in self._channels.items()
if channel_id.from_worker_id == from_
),
None,
)

if (
self._current_channel_id is None
Expand All @@ -97,17 +108,30 @@ def process_data_payload(
yield SenderChange(current_channel_id)

if isinstance(payload, DataFrame):
for field_accessor in ArrowTableTupleProvider(payload.frame):
yield Tuple(
{name: field_accessor for name in payload.frame.column_names},
schema=self._ports[
self._channels[self._current_channel_id].port_id
].get_schema(),
)
yield from self._process_data(payload.frame)
elif isinstance(payload, MarkerFrame):
yield from self._process_marker(payload.frame)
else:
raise NotImplementedError()

elif isinstance(payload, MarkerFrame) and isinstance(
payload.frame, EndOfUpstream
):
def _process_data(self, table: Table) -> Iterator[Tuple]:
schema = self._ports[
self._channels[self._current_channel_id].port_id
].get_schema()
for field_accessor in ArrowTableTupleProvider(table):
yield Tuple(
{name: field_accessor for name in table.column_names}, schema=schema
)

def _process_marker(self, marker: Marker) -> Iterator[InternalMarker]:
if isinstance(marker, State):
yield marker
if isinstance(marker, StartOfUpstream): # StartOfInputChannel()
if not self.started:
yield StartOfOutputPorts()
self.started = True
yield StartOfUpstream() # StartOfInputChannel()
aglinxinyuan marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(marker, EndOfUpstream): # EndOfInputChannel()
channel = self._channels[self._current_channel_id]
channel.complete()
port_id = channel.port_id
Expand All @@ -119,14 +143,11 @@ def process_data_payload(
)

if port_completed:
yield InputExhausted()
yield EndOfUpstream() # EndOfInputPort()

all_ports_completed = all(
map(lambda port: port.is_completed(), self._ports.values())
)

if all_ports_completed:
yield EndOfAll()

else:
raise NotImplementedError()
aglinxinyuan marked this conversation as resolved.
Show resolved Hide resolved
yield EndOfOutputPorts()
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,26 @@
from collections import OrderedDict
from itertools import chain
from loguru import logger
from typing import Iterable, Iterator

from pyarrow import Table
from typing import Iterable, Iterator

from core.architecture.packaging.input_manager import WorkerPort, Channel
from core.architecture.sendsemantics.broad_cast_partitioner import (
BroadcastPartitioner,
)
from core.architecture.sendsemantics.hash_based_shuffle_partitioner import (
HashBasedShufflePartitioner,
)
from core.architecture.sendsemantics.one_to_one_partitioner import OneToOnePartitioner
from core.architecture.sendsemantics.partitioner import Partitioner
from core.architecture.sendsemantics.range_based_shuffle_partitioner import (
RangeBasedShufflePartitioner,
)
from core.architecture.sendsemantics.one_to_one_partitioner import OneToOnePartitioner
from core.architecture.sendsemantics.partitioner import Partitioner
from core.architecture.sendsemantics.round_robin_partitioner import (
RoundRobinPartitioner,
)
from core.architecture.sendsemantics.broad_cast_partitioner import (
BroadcastPartitioner,
)
from core.models import Tuple, Schema, MarkerFrame
from core.models.marker import EndOfUpstream
from core.models.marker import Marker
from core.models.payload import DataPayload, DataFrame
from core.util import get_one_of
from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import (
Expand Down Expand Up @@ -99,33 +98,33 @@ def tuple_to_batch(
)
)

def tuple_to_frame(self, tuples: typing.List[Tuple]) -> DataFrame:
return DataFrame(
frame=Table.from_pydict(
{
name: [t[name] for t in tuples]
for name in self.get_port().get_schema().get_attr_names()
},
schema=self.get_port().get_schema().as_arrow_schema(),
)
)

def emit_end_of_upstream(
self,
def emit_marker(
self, marker: Marker
) -> Iterable[typing.Tuple[ActorVirtualIdentity, DataPayload]]:
return chain(
*(
(
(
receiver,
(
MarkerFrame(tuples)
if isinstance(tuples, EndOfUpstream)
else self.tuple_to_frame(tuples)
MarkerFrame(payload)
if isinstance(payload, Marker)
else self.tuple_to_frame(payload)
),
)
for receiver, tuples in partitioner.no_more()
for receiver, payload in partitioner.flush(marker)
)
for partitioner in self._partitioners.values()
)
)

def tuple_to_frame(self, tuples: typing.List[Tuple]) -> DataFrame:
return DataFrame(
frame=Table.from_pydict(
{
name: [t[name] for t in tuples]
for name in self.get_port().get_schema().get_attr_names()
},
schema=self.get_port().get_schema().as_arrow_schema(),
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from core.architecture.sendsemantics.partitioner import Partitioner
from core.models import Tuple
from core.models.marker import EndOfUpstream
from core.models.marker import Marker
from core.util import set_one_of
from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import (
Partitioning,
Expand Down Expand Up @@ -34,20 +34,18 @@ def add_tuple_to_batch(
self.reset()

@overrides
def no_more(
self,
def flush(
self, marker: Marker
) -> Iterator[
typing.Tuple[
ActorVirtualIdentity, typing.Union[EndOfUpstream, typing.List[Tuple]]
]
typing.Tuple[ActorVirtualIdentity, typing.Union[Marker, typing.List[Tuple]]]
]:
if len(self.batch) > 0:
for receiver in self.receivers:
yield receiver, self.batch

self.reset()
for receiver in self.receivers:
yield receiver, EndOfUpstream()
yield receiver, marker

@overrides
def reset(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@

from loguru import logger
from overrides import overrides

from core.architecture.sendsemantics.partitioner import Partitioner
from core.models import Tuple
from core.models.marker import EndOfUpstream
from core.models.marker import Marker
from core.util import set_one_of
from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import (
HashBasedShufflePartitioning,
Expand Down Expand Up @@ -43,14 +42,12 @@ def add_tuple_to_batch(
self.receivers[hash_code] = (receiver, list())

@overrides
def no_more(
self,
def flush(
self, marker: Marker
) -> Iterator[
typing.Tuple[
ActorVirtualIdentity, typing.Union[EndOfUpstream, typing.List[Tuple]]
]
typing.Tuple[ActorVirtualIdentity, typing.Union[Marker, typing.List[Tuple]]]
]:
for receiver, batch in self.receivers:
if len(batch) > 0:
yield receiver, batch
yield receiver, EndOfUpstream()
yield receiver, marker
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
from typing import Iterator

from overrides import overrides

from core.architecture.sendsemantics.partitioner import Partitioner
from core.models import Tuple
from core.models.marker import EndOfUpstream
from core.models.marker import Marker
from core.util import set_one_of
from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import (
OneToOnePartitioning,
Expand Down Expand Up @@ -34,17 +33,15 @@ def add_tuple_to_batch(
self.reset()

@overrides
def no_more(
self,
def flush(
self, marker: Marker
) -> Iterator[
typing.Tuple[
ActorVirtualIdentity, typing.Union[EndOfUpstream, typing.List[Tuple]]
]
typing.Tuple[ActorVirtualIdentity, typing.Union[Marker, typing.List[Tuple]]]
]:
if len(self.batch) > 0:
yield self.receiver, self.batch
self.reset()
yield self.receiver, EndOfUpstream()
yield self.receiver, marker

@overrides
def reset(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from betterproto import Message

from core.models import Tuple
from core.models.marker import EndOfUpstream
from core.models.marker import Marker
from core.util import get_one_of
from proto.edu.uci.ics.amber.engine.architecture.sendsemantics import Partitioning
from proto.edu.uci.ics.amber.engine.common import ActorVirtualIdentity
Expand All @@ -20,12 +20,10 @@ def add_tuple_to_batch(
) -> Iterator[typing.Tuple[ActorVirtualIdentity, typing.List[Tuple]]]:
pass

def no_more(
self,
def flush(
self, marker: Marker
) -> Iterator[
typing.Tuple[
ActorVirtualIdentity, typing.Union[EndOfUpstream, typing.List[Tuple]]
]
typing.Tuple[ActorVirtualIdentity, typing.Union[Marker, typing.List[Tuple]]]
]:
pass

Expand Down
Loading
Loading