Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore/avoid bespoke protocol #46

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 44 additions & 25 deletions impls/node-protocolv2/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { createClient, type Result, type WriteStream } from 'protocolv2';
import type { TransportOptions } from 'protocolv2/transport';
import { BinaryCodec } from 'protocolv2/codec';
import type { serviceDefs } from './serviceDefs';
import assert from 'node:assert';

const {
PORT,
Expand Down Expand Up @@ -70,26 +71,38 @@ const handles = new Map<
>();

for await (const line of rl) {
const match = line.match(
/(?<id>\w+) -- (?<svc>\w+)\.(?<proc>\w+) -> ?(?<payload>.*)/,
);
if (!match || !match.groups) {
console.error('FATAL: invalid command', line);
process.exit(1);
}
const { id, init, payload, proc } = (() => {
try {
return JSON.parse(line);
} catch (e) {
// Sometimes docker injects this into the stream:
// {"hijack":true,"stream":true,"stdin":true,"stdout":true,"stderr":true}{"type": "invoke", ...
const match = e.message.match(/line (\d*) column (\d*)/);
if (!!match) {
const offset = parseInt(match['2'], 10);
const first = JSON.parse(line.substring(0, offset));
assert(
'hijack' in first,
'The only syntax errors that we expect are that Docker jams stuff into the stream',
);
return JSON.parse(line.substring(offset));
}
}
})();

const { id, svc, proc, payload } = match.groups;
if (svc === 'kv') {
if (proc === 'set') {
const [k, v] = payload.split(' ');
switch (proc) {
case 'kv.set': {
const { k, v } = payload;
const res = await client.kv.set.rpc({ k, v: parseInt(v) });
if (res.ok) {
console.log(`${id} -- ok:${res.payload.v}`);
} else {
console.log(`${id} -- err:${res.payload.code}`);
}
} else if (proc === 'watch') {
const res = client.kv.watch.subscribe({ k: payload });
}
case 'kv.watch': {
const { k } = payload;
const res = client.kv.watch.subscribe({ k });
(async () => {
for await (const v of res) {
if (v.ok) {
Expand All @@ -100,8 +113,7 @@ for await (const line of rl) {
}
})();
}
} else if (svc === 'repeat') {
if (proc === 'echo') {
case 'repeat.echo': {
const handle = handles.get(id);
if (!handle) {
const [writer, reader] = client.repeat.echo.stream({});
Expand All @@ -117,13 +129,19 @@ for await (const line of rl) {

handles.set(id, { writer: writer });
} else {
handle.writer.write({ str: payload });
const { s } = payload;
handle.writer.write({ str: s });
}
} else if (proc === 'echo_prefix') {
}
case 'repeat.echo_prefix': {
const handle = handles.get(id);
if (!handle) {
assert(
init !== undefined,
'Expected to find "init" in the first message',
);
const [writer, reader] = await client.repeat.echo_prefix.stream({
prefix: payload,
prefix: init.prefix,
});
(async () => {
for await (const v of reader) {
Expand All @@ -137,28 +155,29 @@ for await (const line of rl) {

handles.set(id, { writer });
} else {
handle.writer.write({ str: payload });
const { str } = payload;
handle.writer.write({ str });
}
}
} else if (svc === 'upload') {
if (proc === 'send') {
case 'upload.send': {
const handle = handles.get(id);
if (!handle) {
const [writer, finalize] = client.upload.send.upload({});

if (payload !== '') {
if (!!payload && 'part' in payload) {
// For UploadNoInit
writer.write({ part: payload });
writer.write({ part: payload['part'] });
}

handles.set(id, { writer, finalize });
} else {
const { part } = payload;
if (!handle.writer.isClosed()) {
handle.writer.write({ part: payload });
handle.writer.write({ part });
}

if (
payload === 'EOF' ||
part === 'EOF' ||
// the closed condition will always lead to UNEXPECTED_DISCONNECT
// returned from finalize we do this to match other implementation
handle.writer.isClosed()
Expand Down
73 changes: 49 additions & 24 deletions impls/node/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { TransportOptions } from '@replit/river/transport';
import { BinaryCodec } from '@replit/river/codec';
import type { serviceDefs } from './serviceDefs';
import type { Pushable } from 'it-pushable';
import assert from 'node:assert';

const {
PORT,
Expand Down Expand Up @@ -60,26 +61,39 @@ const rl = readline.createInterface({

const handles = new Map<string, Pushable<unknown>>();
for await (const line of rl) {
const match = line.match(
/(?<id>\w+) -- (?<svc>\w+)\.(?<proc>\w+) -> ?(?<payload>.*)/,
);
if (!match || !match.groups) {
console.error('FATAL: invalid command', line);
process.exit(1);
}
const { id, init, payload, proc } = (() => {
try {
return JSON.parse(line);
} catch (e) {
// Sometimes docker injects this into the stream:
// {"hijack":true,"stream":true,"stdin":true,"stdout":true,"stderr":true}{"type": "invoke", ...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wtf

const match = e.message.match(/line (\d*) column (\d*)/);
if (!!match) {
const offset = parseInt(match['2'], 10);
const first = JSON.parse(line.substring(0, offset));
assert(
'hijack' in first,
'The only syntax errors that we expect are that Docker jams stuff into the stream',
);
return JSON.parse(line.substring(offset));
}
}
})();

const { id, svc, proc, payload } = match.groups;
if (svc === 'kv') {
if (proc === 'set') {
const [k, v] = payload.split(' ');
switch (proc) {
case 'kv.set': {
const { k, v } = payload;
const res = await client.kv.set.rpc({ k, v: parseInt(v) });
if (res.ok) {
console.log(`${id} -- ok:${res.payload.v}`);
} else {
console.log(`${id} -- err:${res.payload.code}`);
}
} else if (proc === 'watch') {
const [res] = await client.kv.watch.subscribe({ k: payload });
break;
}
case 'kv.watch': {
const { k } = payload;
const [res] = await client.kv.watch.subscribe({ k });
(async () => {
for await (const v of res) {
if (v.ok) {
Expand All @@ -89,10 +103,11 @@ for await (const line of rl) {
}
}
})();
break;
}
} else if (svc === 'repeat') {
if (proc === 'echo') {
case 'repeat.echo': {
if (!handles.has(id)) {
// init
const [input, output] = await client.repeat.echo.stream();
(async () => {
for await (const v of output) {
Expand All @@ -106,12 +121,19 @@ for await (const line of rl) {

handles.set(id, input);
} else {
handles.get(id)!.push({ str: payload });
const { s } = payload;
handles.get(id)!.push({ str: s });
}
} else if (proc === 'echo_prefix') {
break;
}
case 'repeat.echo_prefix': {
if (!handles.has(id)) {
assert(
init !== undefined,
'Expected to find "init" in the first message',
);
const [input, output] = await client.repeat.echo_prefix.stream({
prefix: payload,
prefix: init.prefix,
});
(async () => {
for await (const v of output) {
Expand All @@ -125,17 +147,18 @@ for await (const line of rl) {

handles.set(id, input);
} else {
handles.get(id)!.push({ str: payload });
const { str } = payload;
handles.get(id)!.push({ str });
}
break;
}
} else if (svc === 'upload') {
if (proc === 'send') {
case 'upload.send': {
if (!handles.has(id)) {
const [input, res] = await client.upload.send.upload();

if (payload !== '') {
if (!!payload && 'part' in payload) {
// For UploadNoInit
input.push({ part: payload });
input.push({ part: payload.part });
}

handles.set(id, input);
Expand All @@ -149,8 +172,10 @@ for await (const line of rl) {
}
})();
} else {
handles.get(id)!.push({ part: payload });
const { part } = payload;
handles.get(id)!.push({ part });
}
break;
}
}
}
58 changes: 30 additions & 28 deletions impls/python/src/river_python_test/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json
import logging
import os
import re
Expand Down Expand Up @@ -70,59 +71,60 @@ async def process_commands() -> None:
)
if not line:
break
# sometimes the line is like this
# {"hijack":true,"stream":true,"stdin":true,"stdout":true,"stderr":true}1 -- upload.send ->
if "}" in line:
line = line[line.index("}") + 1 :]

pattern = r"(?P<id>\w+) -- (?P<svc>\w+)\.(?P<proc>\w+) -> ?(?P<payload>.*)"
# Perform the match
match = re.match(pattern, line)

# Check if the match was successful and if groups are present
if not match:
try:
action = json.loads(line)
except json.JSONDecodeError as e:
# Sometimes docker injects this into the stream:
# {"hijack":true,"stream":true,"stdin":true,"stdout":true,"stderr":true}{"type": "invoke", ...
offset = e.colno - 1
first = json.loads(line[0:offset])
assert "hijack" in first
action = json.loads(line[offset:])

if not action:
print("FATAL: invalid command", line)
sys.exit(1)

# Extract the named groups
id_ = match.group("id")
svc = match.group("svc")
proc = match.group("proc")
payload = match.group("payload")
id_ = action["id"]
payload = action.get("payload")

# Example handling for a 'kv.set' command
if svc == "kv":
if proc == "set":
k, v = payload.split(" ")
match action["proc"]:
case "kv.set":
k = payload["k"]
v = payload["v"]
try:
res = await test_client.kv.set(KvSetInput(k=k, v=int(v)))
print(f"{id_} -- ok:{res.v}")
except Exception:
print(f"{id_} -- err:UNEXPECTED_DISCONNECT")
elif proc == "watch":
k = payload
case "kv.watch":
k = payload["k"]
tasks[id_] = asyncio.create_task(handle_watch(id_, k, test_client))
elif svc == "repeat":
if proc == "echo":
case "repeat.echo":
if id_ not in input_streams:
input_streams[id_] = asyncio.Queue()
tasks[id_] = asyncio.create_task(handle_echo(id_, test_client))
else:
await input_streams[id_].put(payload)
elif svc == "upload":
if proc == "send":
s = payload["s"]
await input_streams[id_].put(s)
case "upload.send":
if id_ not in input_streams:
input_streams[id_] = asyncio.Queue()
tasks[id_] = asyncio.create_task(
handle_upload(id_, test_client)
)

if payload != "":
if payload is not None:
# For UploadNoInit
await input_streams[id_].put(payload)
await input_streams[id_].put(payload["part"])
else:
await input_streams[id_].put(payload)
part = payload["part"]
await input_streams[id_].put(part)

if payload == "EOF":
if part == "EOF":
# Wait for the upload task to complete once EOF is sent
await tasks[id_]
tasks.pop(id_, None) # Cleanup task reference
Expand Down
3 changes: 1 addition & 2 deletions src/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import DockerModem from 'docker-modem';
import logUpdate from 'log-update';
import { PassThrough } from 'stream';
import {
serializeInvokeAction,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this function from export site pls

type CommonAction,
type ClientAction,
type ServerAction,
Expand Down Expand Up @@ -434,7 +433,7 @@ export async function applyActionClient(
}
}
} else if (action.type === 'invoke') {
containerHandle.stdin.write(serializeInvokeAction(action) + '\n');
containerHandle.stdin.write(JSON.stringify(action) + '\n');
return;
}

Expand Down
Loading