From 689aa4b493d722d305f92bb81702d5168912df6a Mon Sep 17 00:00:00 2001 From: Devon Stewart Date: Thu, 14 Nov 2024 11:15:00 -0800 Subject: [PATCH 1/2] Switching from bespoke regex-powered protocol to just JSON --- impls/node-protocolv2/client.ts | 52 +++++++++--------- impls/node/client.ts | 56 +++++++++++--------- impls/python/src/river_python_test/client.py | 49 ++++++++--------- src/docker.ts | 3 +- 4 files changed, 81 insertions(+), 79 deletions(-) diff --git a/impls/node-protocolv2/client.ts b/impls/node-protocolv2/client.ts index cc682f2..fcba135 100644 --- a/impls/node-protocolv2/client.ts +++ b/impls/node-protocolv2/client.ts @@ -7,6 +7,7 @@ import { createClient, type Result, type WriteStream } from 'protocolv2'; import type { TransportOptions } from 'protocolv2/transport'; import { BinaryCodec } from 'protocolv2/codec'; import type { serviceDefs } from './serviceDefs'; +import assert from 'node:assert'; const { PORT, @@ -70,26 +71,21 @@ const handles = new Map< >(); for await (const line of rl) { - const match = line.match( - /(?\w+) -- (?\w+)\.(?\w+) -> ?(?.*)/, - ); - if (!match || !match.groups) { - console.error('FATAL: invalid command', line); - process.exit(1); - } + const { id, init, payload, proc } = JSON.parse(line); - const { id, svc, proc, payload } = match.groups; - if (svc === 'kv') { - if (proc === 'set') { - const [k, v] = payload.split(' '); + switch (proc) { + case 'kv.set': { + const { k, v } = payload; const res = await client.kv.set.rpc({ k, v: parseInt(v) }); if (res.ok) { console.log(`${id} -- ok:${res.payload.v}`); } else { console.log(`${id} -- err:${res.payload.code}`); } - } else if (proc === 'watch') { - const res = client.kv.watch.subscribe({ k: payload }); + } + case 'kv.watch': { + const { k } = payload; + const res = client.kv.watch.subscribe({ k }); (async () => { for await (const v of res) { if (v.ok) { @@ -100,8 +96,7 @@ for await (const line of rl) { } })(); } - } else if (svc === 'repeat') { - if (proc === 'echo') { + case 'repeat.echo': { const handle = handles.get(id); if (!handle) { const [writer, reader] = client.repeat.echo.stream({}); @@ -117,13 +112,19 @@ for await (const line of rl) { handles.set(id, { writer: writer }); } else { - handle.writer.write({ str: payload }); + const { s } = payload; + handle.writer.write({ str: s }); } - } else if (proc === 'echo_prefix') { + } + case 'repeat.echo_prefix': { const handle = handles.get(id); if (!handle) { + assert( + init !== undefined, + 'Expected to find "init" in the first message', + ); const [writer, reader] = await client.repeat.echo_prefix.stream({ - prefix: payload, + prefix: init.prefix, }); (async () => { for await (const v of reader) { @@ -137,28 +138,29 @@ for await (const line of rl) { handles.set(id, { writer }); } else { - handle.writer.write({ str: payload }); + const { str } = payload; + handle.writer.write({ str }); } } - } else if (svc === 'upload') { - if (proc === 'send') { + case 'upload.send': { const handle = handles.get(id); if (!handle) { const [writer, finalize] = client.upload.send.upload({}); - if (payload !== '') { + if (!!payload && 'part' in payload) { // For UploadNoInit - writer.write({ part: payload }); + writer.write({ part: payload['part'] }); } handles.set(id, { writer, finalize }); } else { + const { part } = payload; if (!handle.writer.isClosed()) { - handle.writer.write({ part: payload }); + handle.writer.write({ part }); } if ( - payload === 'EOF' || + part === 'EOF' || // the closed condition will always lead to UNEXPECTED_DISCONNECT // returned from finalize we do this to match other implementation handle.writer.isClosed() diff --git a/impls/node/client.ts b/impls/node/client.ts index d2f12b6..08da044 100644 --- a/impls/node/client.ts +++ b/impls/node/client.ts @@ -6,6 +6,7 @@ import type { TransportOptions } from '@replit/river/transport'; import { BinaryCodec } from '@replit/river/codec'; import type { serviceDefs } from './serviceDefs'; import type { Pushable } from 'it-pushable'; +import assert from 'node:assert'; const { PORT, @@ -60,26 +61,22 @@ const rl = readline.createInterface({ const handles = new Map>(); for await (const line of rl) { - const match = line.match( - /(?\w+) -- (?\w+)\.(?\w+) -> ?(?.*)/, - ); - if (!match || !match.groups) { - console.error('FATAL: invalid command', line); - process.exit(1); - } + const { id, init, payload, proc } = JSON.parse(line); - const { id, svc, proc, payload } = match.groups; - if (svc === 'kv') { - if (proc === 'set') { - const [k, v] = payload.split(' '); + switch (proc) { + case 'kv.set': { + const { k, v } = payload; const res = await client.kv.set.rpc({ k, v: parseInt(v) }); if (res.ok) { console.log(`${id} -- ok:${res.payload.v}`); } else { console.log(`${id} -- err:${res.payload.code}`); } - } else if (proc === 'watch') { - const [res] = await client.kv.watch.subscribe({ k: payload }); + break; + } + case 'kv.watch': { + const { k } = payload; + const [res] = await client.kv.watch.subscribe({ k }); (async () => { for await (const v of res) { if (v.ok) { @@ -89,10 +86,11 @@ for await (const line of rl) { } } })(); + break; } - } else if (svc === 'repeat') { - if (proc === 'echo') { + case 'repeat.echo': { if (!handles.has(id)) { + // init const [input, output] = await client.repeat.echo.stream(); (async () => { for await (const v of output) { @@ -106,12 +104,19 @@ for await (const line of rl) { handles.set(id, input); } else { - handles.get(id)!.push({ str: payload }); + const { s } = payload; + handles.get(id)!.push({ str: s }); } - } else if (proc === 'echo_prefix') { + break; + } + case 'repeat.echo_prefix': { if (!handles.has(id)) { + assert( + init !== undefined, + 'Expected to find "init" in the first message', + ); const [input, output] = await client.repeat.echo_prefix.stream({ - prefix: payload, + prefix: init.prefix, }); (async () => { for await (const v of output) { @@ -125,17 +130,18 @@ for await (const line of rl) { handles.set(id, input); } else { - handles.get(id)!.push({ str: payload }); + const { str } = payload; + handles.get(id)!.push({ str }); } + break; } - } else if (svc === 'upload') { - if (proc === 'send') { + case 'upload.send': { if (!handles.has(id)) { const [input, res] = await client.upload.send.upload(); - if (payload !== '') { + if (!!payload && 'part' in payload) { // For UploadNoInit - input.push({ part: payload }); + input.push({ part: payload.part }); } handles.set(id, input); @@ -149,8 +155,10 @@ for await (const line of rl) { } })(); } else { - handles.get(id)!.push({ part: payload }); + const { part } = payload; + handles.get(id)!.push({ part }); } + break; } } } diff --git a/impls/python/src/river_python_test/client.py b/impls/python/src/river_python_test/client.py index 5de4a6d..08793ee 100644 --- a/impls/python/src/river_python_test/client.py +++ b/impls/python/src/river_python_test/client.py @@ -1,4 +1,5 @@ import asyncio +import json import logging import os import re @@ -70,59 +71,51 @@ async def process_commands() -> None: ) if not line: break - # sometimes the line is like this - # {"hijack":true,"stream":true,"stdin":true,"stdout":true,"stderr":true}1 -- upload.send -> - if "}" in line: - line = line[line.index("}") + 1 :] - pattern = r"(?P\w+) -- (?P\w+)\.(?P\w+) -> ?(?P.*)" - # Perform the match - match = re.match(pattern, line) - - # Check if the match was successful and if groups are present - if not match: + action = json.loads(line) + if not action: print("FATAL: invalid command", line) sys.exit(1) + # Extract the named groups - id_ = match.group("id") - svc = match.group("svc") - proc = match.group("proc") - payload = match.group("payload") + id_ = action["id"] + payload = action.get("payload") # Example handling for a 'kv.set' command - if svc == "kv": - if proc == "set": - k, v = payload.split(" ") + match action["proc"]: + case "kv.set": + k = payload["k"] + v = payload["v"] try: res = await test_client.kv.set(KvSetInput(k=k, v=int(v))) print(f"{id_} -- ok:{res.v}") except Exception: print(f"{id_} -- err:UNEXPECTED_DISCONNECT") - elif proc == "watch": - k = payload + case "kv.watch": + k = payload["k"] tasks[id_] = asyncio.create_task(handle_watch(id_, k, test_client)) - elif svc == "repeat": - if proc == "echo": + case "repeat.echo": if id_ not in input_streams: input_streams[id_] = asyncio.Queue() tasks[id_] = asyncio.create_task(handle_echo(id_, test_client)) else: - await input_streams[id_].put(payload) - elif svc == "upload": - if proc == "send": + s = payload["s"] + await input_streams[id_].put(s) + case "upload.send": if id_ not in input_streams: input_streams[id_] = asyncio.Queue() tasks[id_] = asyncio.create_task( handle_upload(id_, test_client) ) - if payload != "": + if payload is not None: # For UploadNoInit - await input_streams[id_].put(payload) + await input_streams[id_].put(payload["part"]) else: - await input_streams[id_].put(payload) + part = payload["part"] + await input_streams[id_].put(part) - if payload == "EOF": + if part == "EOF": # Wait for the upload task to complete once EOF is sent await tasks[id_] tasks.pop(id_, None) # Cleanup task reference diff --git a/src/docker.ts b/src/docker.ts index 9394f61..e63dbdc 100644 --- a/src/docker.ts +++ b/src/docker.ts @@ -13,7 +13,6 @@ import DockerModem from 'docker-modem'; import logUpdate from 'log-update'; import { PassThrough } from 'stream'; import { - serializeInvokeAction, type CommonAction, type ClientAction, type ServerAction, @@ -434,7 +433,7 @@ export async function applyActionClient( } } } else if (action.type === 'invoke') { - containerHandle.stdin.write(serializeInvokeAction(action) + '\n'); + containerHandle.stdin.write(JSON.stringify(action) + '\n'); return; } From 7c208beb2b4ba918bf041dba2475ad4b3d9a6c29 Mon Sep 17 00:00:00 2001 From: Devon Stewart Date: Thu, 14 Nov 2024 11:36:03 -0800 Subject: [PATCH 2/2] Do a dance to handle the docker "hijack" message --- impls/node-protocolv2/client.ts | 19 ++++++++++++++++++- impls/node/client.ts | 19 ++++++++++++++++++- impls/python/src/river_python_test/client.py | 11 ++++++++++- 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/impls/node-protocolv2/client.ts b/impls/node-protocolv2/client.ts index fcba135..94137b5 100644 --- a/impls/node-protocolv2/client.ts +++ b/impls/node-protocolv2/client.ts @@ -71,7 +71,24 @@ const handles = new Map< >(); for await (const line of rl) { - const { id, init, payload, proc } = JSON.parse(line); + const { id, init, payload, proc } = (() => { + try { + return JSON.parse(line); + } catch (e) { + // Sometimes docker injects this into the stream: + // {"hijack":true,"stream":true,"stdin":true,"stdout":true,"stderr":true}{"type": "invoke", ... + const match = e.message.match(/line (\d*) column (\d*)/); + if (!!match) { + const offset = parseInt(match['2'], 10); + const first = JSON.parse(line.substring(0, offset)); + assert( + 'hijack' in first, + 'The only syntax errors that we expect are that Docker jams stuff into the stream', + ); + return JSON.parse(line.substring(offset)); + } + } + })(); switch (proc) { case 'kv.set': { diff --git a/impls/node/client.ts b/impls/node/client.ts index 08da044..d97225c 100644 --- a/impls/node/client.ts +++ b/impls/node/client.ts @@ -61,7 +61,24 @@ const rl = readline.createInterface({ const handles = new Map>(); for await (const line of rl) { - const { id, init, payload, proc } = JSON.parse(line); + const { id, init, payload, proc } = (() => { + try { + return JSON.parse(line); + } catch (e) { + // Sometimes docker injects this into the stream: + // {"hijack":true,"stream":true,"stdin":true,"stdout":true,"stderr":true}{"type": "invoke", ... + const match = e.message.match(/line (\d*) column (\d*)/); + if (!!match) { + const offset = parseInt(match['2'], 10); + const first = JSON.parse(line.substring(0, offset)); + assert( + 'hijack' in first, + 'The only syntax errors that we expect are that Docker jams stuff into the stream', + ); + return JSON.parse(line.substring(offset)); + } + } + })(); switch (proc) { case 'kv.set': { diff --git a/impls/python/src/river_python_test/client.py b/impls/python/src/river_python_test/client.py index 08793ee..1920ccb 100644 --- a/impls/python/src/river_python_test/client.py +++ b/impls/python/src/river_python_test/client.py @@ -72,7 +72,16 @@ async def process_commands() -> None: if not line: break - action = json.loads(line) + try: + action = json.loads(line) + except json.JSONDecodeError as e: + # Sometimes docker injects this into the stream: + # {"hijack":true,"stream":true,"stdin":true,"stdout":true,"stderr":true}{"type": "invoke", ... + offset = e.colno - 1 + first = json.loads(line[0:offset]) + assert "hijack" in first + action = json.loads(line[offset:]) + if not action: print("FATAL: invalid command", line) sys.exit(1)