Skip to content

Commit 6f430de

Browse files
committed
Introduce small delay to avoid race between publisher and subscriber
Signed-off-by: Tim Paine <[email protected]>
1 parent 3f6e9a0 commit 6f430de

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

csp/tests/adapters/test_kafka.py

+11-8
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,18 @@ def curtime(x: ts[object]) -> ts[datetime]:
127127
return csp.now()
128128

129129
def graph(symbols: list, count: int):
130-
b = csp.merge(
131-
csp.timer(timedelta(seconds=0.2), True),
132-
csp.delay(csp.timer(timedelta(seconds=0.2), False), timedelta(seconds=0.1)),
130+
delay = timedelta(seconds=1)
131+
b = csp.delay(
132+
csp.merge(
133+
csp.timer(timedelta(seconds=0.2), True),
134+
csp.delay(csp.timer(timedelta(seconds=0.2), False), timedelta(seconds=0.1)),
135+
),
136+
delay=delay,
133137
)
134-
i = csp.count(csp.timer(timedelta(seconds=0.15)))
135-
d = csp.count(csp.timer(timedelta(seconds=0.2))) / 2.0
136-
s = csp.sample(csp.timer(timedelta(seconds=0.4)), csp.const("STRING"))
138+
139+
i = csp.delay(csp.count(csp.timer(timedelta(seconds=0.15))), delay=delay)
140+
d = csp.delay(csp.count(csp.timer(timedelta(seconds=0.2))) / 2.0, delay=delay)
141+
s = csp.delay(csp.sample(csp.timer(timedelta(seconds=0.4)), csp.const("STRING")), delay=delay)
137142
dt = curtime(b)
138143
struct = MyData.collectts(b=b, i=i, d=d, s=s, dt=dt)
139144

@@ -157,8 +162,6 @@ def graph(symbols: list, count: int):
157162
)
158163
csp.add_graph_output(f"pall_{symbol}", pub_data)
159164

160-
# csp.print('status', kafkaadapter.status())
161-
162165
sub_data = kafkaadapter.subscribe(
163166
ts_type=SubData,
164167
msg_mapper=msg_mapper,

0 commit comments

Comments
 (0)