Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use qured-client in qanban-node #18

Merged
merged 1 commit into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
"lint": "eslint --ext .js,.ts --max-warnings 0 src/"
},
"dependencies": {
"@mojotech/json-type-validation": "^3.1.0",
"better-sqlite3": "^6.0.1",
"express": "^4.17.1",
"node": "^13.10.1",
"qanban-types": "0.0.1",
"qured-client": "0.0.1",
"uuid": "^7.0.2",
"ws": "^7.2.3",
"yargs": "^15.3.1"
},
"devDependencies": {
Expand All @@ -36,7 +35,6 @@
"@types/node-fetch": "^2.5.5",
"@types/uuid": "^7.0.0",
"@types/wait-on": "^4.0.0",
"@types/ws": "^7.2.2",
"@types/yargs": "^15.0.4",
"@typescript-eslint/eslint-plugin": "^2.23.0",
"@typescript-eslint/parser": "^2.23.0",
Expand Down
48 changes: 18 additions & 30 deletions node/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import * as jtv from '@mojotech/json-type-validation';
import sqlite3 from 'better-sqlite3';
import express from 'express';
import WebSocket from 'ws';
import { Command, commandDecoder, Contract, contractDecoder, ContractState, Id, idDecoder, Message, messageDecoder, Party, partyDecoder, UpdateMessage } from 'qanban-types';
import { v4 as uuidV4 } from 'uuid';
import yargs from 'yargs';
import os from 'os';
import path from 'path';
import fs from 'fs';
import QuredClient, { Message as QuredMessage } from 'qured-client';

type Ledger = Readonly<{
list(): Id[];
Expand All @@ -17,11 +16,6 @@ type Ledger = Readonly<{
update(id: Id, contract: Contract): void;
}>

const sendMessage = (socket: WebSocket, message: unknown) => {
console.log('outgoing message:', message);
socket.send(JSON.stringify(message));
}

function stakeholders(contract: Contract): Set<Party> {
const stakeholders = new Set<Party>();
stakeholders.add(contract.proposer);
Expand Down Expand Up @@ -133,19 +127,15 @@ function updateLedger(ledger: Ledger, persist: boolean, sender: Party, message:
}
}

function handleMessage(ledger: Ledger, rawMessage: unknown) {
function handleMessage(ledger: Ledger, message: QuredMessage<Message, Party>) {
try {
const { sender, payload: message } = jtv.object({
sender: partyDecoder(),
payload: messageDecoder(),
}).runWithException(rawMessage);
updateLedger(ledger, true, sender, message);
updateLedger(ledger, true, message.sender, message.payload);
} catch (error) {
console.error('failed to handle message', rawMessage, error);
console.error('failed to handle message', message, error);
}
}

function handleCommand(ledger: Ledger, socket: WebSocket, participant: Party, command: Command) {
function handleCommand(ledger: Ledger, client: QuredClient<Message>, participant: Party, command: Command) {
let id: Id;
let message: Message;
if (command.type === "propose") {
Expand All @@ -155,7 +145,7 @@ function handleCommand(ledger: Ledger, socket: WebSocket, participant: Party, co
message = { ...command };
}
const contract = updateLedger(ledger, false, participant, message);
sendMessage(socket, {
client.send({
sender: participant,
receivers: [...stakeholders(contract)],
payload: message
Expand Down Expand Up @@ -260,21 +250,19 @@ if (args.clean && fs.existsSync(database)) {

const ledger = Ledger(database);

const socket = new WebSocket(`ws://${routerHost}`);
const client = new QuredClient<Message, Party>({
router: `ws://${routerHost}`,
login: participant,
partyDecoder: partyDecoder(),
payloadDecoder: messageDecoder(),
});

socket.on('open', () => {
console.log('connected to router');
socket.on('ping', () => socket.pong());
socket.on('message', rawMessage => {
const json = JSON.parse(rawMessage.toString());
console.log('incoming message:', json);
handleMessage(ledger, json);
});
socket.on('close', () => {
process.exit(1);
});
sendMessage(socket, { login: participant });
client.on("open", () => console.log(`connected to router at ${routerHost}`));
client.on("message", message => handleMessage(ledger, message));
client.on("error", error => {
console.error(error instanceof Error ? error.toString() : JSON.stringify(error));
});
client.on("close", () => process.exit(1));

const app = express();
app.use(express.static('ui/build'));
Expand All @@ -297,7 +285,7 @@ app.post('/api/command', (req, res) => {
try {
console.log('incoming command:', req.body);
const command = commandDecoder().runWithException(req.body);
handleCommand(ledger, socket, participant, command);
handleCommand(ledger, client, participant, command);
res.status(200).send({ success: true });
} catch (error) {
res.status(500).send({
Expand Down
53 changes: 31 additions & 22 deletions qured-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,52 @@ import * as jtv from '@mojotech/json-type-validation';
import Emittery from 'emittery';
import WebSocket from 'ws';

export type Config<Payload> = {
router: string;
login: string;
payloadDecoder?: jtv.Decoder<Payload>;
}
export type Config<Payload, Party extends string = string> = {
readonly router: string;
readonly login: Party;
readonly partyDecoder?: jtv.Decoder<Party>;
readonly payloadDecoder?: jtv.Decoder<Payload>;
} & (string extends Party ? {} : { readonly partyDecoder: jtv.Decoder<Party> })
// NOTE(MH): The purpose of the intersection with the conditional type is to
// make `partyDecoder` non-optional when `Party` is _not_ `string`.

export type Message<Payload> = {
sender: string;
receivers: string[];
export type Message<Payload, Party extends string = string> = {
sender: Party;
receivers: Party[];
payload: Payload;
}

export type Events<Payload> = {
message: Message<Payload>;
export type Events<Payload, Party extends string = string> = {
message: Message<Payload, Party>;
error: unknown;
close: number;
}

export type EmptyEvents = "open"

export default class QuredClient<Payload> extends Emittery.Typed<Events<Payload>, EmptyEvents> {
export default class QuredClient<Payload, Party extends string = string> extends Emittery.Typed<Events<Payload, Party>, EmptyEvents> {
private socket: WebSocket;
private messageDecoder: jtv.Decoder<Message<Payload>>;
private login: Party;
private messageDecoder: jtv.Decoder<Message<Payload, Party>>;

constructor(config: Config<Payload>) {
constructor(config: Config<Payload, Party>) {
super();
const socket = new WebSocket(config.router);
this.socket = socket;
this.login = config.login;
const partyDecoder = config.partyDecoder ?? jtv.string().map(party => party as Party);
this.messageDecoder = jtv.object({
sender: jtv.string(),
receivers: jtv.array(jtv.string()),
sender: partyDecoder,
receivers: jtv.array(partyDecoder),
payload: config.payloadDecoder ?? jtv.anyJson(),
});
socket.on("open", () => {
socket.on("ping", () => socket.pong());
socket.on("message", rawMessage => {
try {
console.log(`incoming message: ${rawMessage}`);
const message: Message<Payload> =
this.messageDecoder.runWithException(JSON.parse(rawMessage.toString()));
const json = JSON.parse(rawMessage.toString());
console.log('incoming message:', json);
const message: Message<Payload, Party> = this.messageDecoder.runWithException(json);
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.emit("message", message);
} catch (error) {
Expand All @@ -63,10 +69,13 @@ export default class QuredClient<Payload> extends Emittery.Typed<Events<Payload>
});
}

send(message: Message<Payload>): void {
const rawMessage = JSON.stringify(message);
console.log(`outgoing message: ${rawMessage}`);
this.socket.send(rawMessage);
send(message: Message<Payload, Party>): void {
if (message.sender !== this.login) {
throw Error(`sender '${message.sender}' of message does not match login '${this.login}'`);
}
const json = this.messageDecoder.runWithException(message);
console.log('outgoing message:', json);
this.socket.send(JSON.stringify(json));
}

close(): void {
Expand Down