Skip to content

Commit 4197b6d

Browse files
committed
Introduce small delay to avoid race between publisher and subscriber
Signed-off-by: Tim Paine <[email protected]>
1 parent 3f6e9a0 commit 4197b6d

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

csp/tests/adapters/test_kafka.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ def graph(symbols: list, count: int):
131131
csp.timer(timedelta(seconds=0.2), True),
132132
csp.delay(csp.timer(timedelta(seconds=0.2), False), timedelta(seconds=0.1)),
133133
)
134+
134135
i = csp.count(csp.timer(timedelta(seconds=0.15)))
135136
d = csp.count(csp.timer(timedelta(seconds=0.2))) / 2.0
136137
s = csp.sample(csp.timer(timedelta(seconds=0.4)), csp.const("STRING"))
@@ -157,18 +158,13 @@ def graph(symbols: list, count: int):
157158
)
158159
csp.add_graph_output(f"pall_{symbol}", pub_data)
159160

160-
# csp.print('status', kafkaadapter.status())
161-
162161
sub_data = kafkaadapter.subscribe(
163162
ts_type=SubData,
164163
msg_mapper=msg_mapper,
165164
topic=topic,
166165
key=symbol,
167166
push_mode=csp.PushMode.NON_COLLAPSING,
168167
)
169-
170-
sub_data = csp.firstN(sub_data, count)
171-
172168
csp.add_graph_output(f"sall_{symbol}", sub_data)
173169

174170
done_flag = csp.count(sub_data) == count
@@ -190,8 +186,12 @@ def graph(symbols: list, count: int):
190186
pub = results[f"pall_{symbol}"]
191187
sub = results[f"sall_{symbol}"]
192188

189+
# limit by the last `count`
190+
sub = sub[-1*count:]
191+
pub = pub[-1*count:]
192+
193193
assert len(sub) == count
194-
assert [v[1] for v in sub] == [v[1] for v in pub[:count]]
194+
assert [v[1] for v in sub] == [v[1] for v in pub[-1*count:]]
195195

196196
@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
197197
def test_start_offsets(self, kafkaadapter, kafkabroker):

0 commit comments

Comments
 (0)