From 7317bfba0fe6278df6fc05f8dbb74d18d2bbd18d Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Tue, 5 Sep 2023 21:07:50 -0700 Subject: [PATCH 1/6] Add necessary capabilities for reactors to create other reactors and make connections 0. Added _addChild and _addSibling necessary for mutation APIs 1. Solved issues related to hierarchy and ownership. --- src/core/reactor.ts | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 806bf2af7..0f200f21d 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -153,6 +153,11 @@ export abstract class Reactor extends Component { */ private readonly _keyChain = new Map(); + // This is the keychain for creation, i.e. if Reactor R's mutation created reactor B, + // then R is B's creator, even if they are siblings. R should have access to B, + // at least semantically......? + private readonly _creatorKeyChain = new Map(); + /** * This graph has in it all the dependencies implied by this container's * ports, reactions, and connections. @@ -387,6 +392,9 @@ export abstract class Reactor extends Component { return owner._getKey(component, this._keyChain.get(owner)); } } + return component + .getContainer() + ._getKey(component, this._creatorKeyChain.get(component.getContainer())); } /** @@ -1555,6 +1563,36 @@ export abstract class Reactor extends Component { toString(): string { return this._getFullyQualifiedName(); } + + protected _addChild( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + const newReactor = new constructor(this, ...args); + return newReactor; + } + + protected _addSibling( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + if (this._getContainer() == null) { + throw new Error( + `Reactor ${this} does not have a parent. Sibling is not well-defined.` + ); + } + if (this._getContainer() === this) { + throw new Error( + `Reactor ${this} is self-contained. Adding sibling creates logical issue.` + ); + } + const newReactor = this._getContainer()._addChild( + constructor, + ...args + ); + this._creatorKeyChain.set(newReactor, newReactor._key); + return newReactor; + } } /* From 5b53bf78ad5529413cceacca941fe9be33196432 Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Tue, 5 Sep 2023 21:26:24 -0700 Subject: [PATCH 2/6] Mutation Sandbox API --- src/core/reactor.ts | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 0f200f21d..8fa8844b6 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -475,6 +475,20 @@ export abstract class Reactor extends Component { public delete(reactor: Reactor): void { reactor._delete(); } + + public addChild( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + return this.reactor._addChild(constructor, ...args); + } + + public addSibling( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + return this.reactor._addSibling(constructor, ...args); + } }; /** @@ -1586,10 +1600,7 @@ export abstract class Reactor extends Component { `Reactor ${this} is self-contained. Adding sibling creates logical issue.` ); } - const newReactor = this._getContainer()._addChild( - constructor, - ...args - ); + const newReactor = this._getContainer()._addChild(constructor, ...args); this._creatorKeyChain.set(newReactor, newReactor._key); return newReactor; } @@ -1833,6 +1844,16 @@ export interface MutationSandbox extends ReactionSandbox { getReactor: () => Reactor; // Container + addChild: ( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ) => R; + + addSibling: ( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ) => R; + // FIXME: // forkJoin(constructor: new () => Reactor, ): void; } From 3549031bdca108e51ef9050514d31b2db265e315 Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Wed, 6 Sep 2023 18:25:23 -0700 Subject: [PATCH 3/6] WIP - rebased to mutation/create-reactor-api --- src/benchmark/sucoverrelax/actor.ts | 67 +++++++ src/benchmark/sucoverrelax/peer.ts | 71 +++++++ src/benchmark/sucoverrelax/runner.ts | 217 +++++++++++++++++++++ src/benchmark/sucoverrelax/sorutils.ts | 109 +++++++++++ src/benchmark/sucoverrelax/sucoverrelax.ts | 20 ++ src/benchmark/sucoverrelax/test.ts | 88 +++++++++ 6 files changed, 572 insertions(+) create mode 100644 src/benchmark/sucoverrelax/actor.ts create mode 100644 src/benchmark/sucoverrelax/peer.ts create mode 100644 src/benchmark/sucoverrelax/runner.ts create mode 100644 src/benchmark/sucoverrelax/sorutils.ts create mode 100644 src/benchmark/sucoverrelax/sucoverrelax.ts create mode 100644 src/benchmark/sucoverrelax/test.ts diff --git a/src/benchmark/sucoverrelax/actor.ts b/src/benchmark/sucoverrelax/actor.ts new file mode 100644 index 000000000..b53c8d3fe --- /dev/null +++ b/src/benchmark/sucoverrelax/actor.ts @@ -0,0 +1,67 @@ +/* eslint-disable @typescript-eslint/prefer-readonly */ +// The SOR (Re)actor +import { InPort, Reactor, State } from "../../core/internal"; +import { Message } from "./sorutils"; + +export class SORActor extends Reactor { + constructor( + parent: Reactor, + pos: number, + _value: number, + colour: number, + nx: number, + ny: number, + omega: number, + sorSource: Reactor, + peer: boolean + ) { + super(parent, "SORActor"); + + const x = Math.floor(pos / ny); + const y = pos % ny; + + const omegaOverFour = 0.25 * omega; + const oneMinusOmega = 1.0 - omega; + + this.portFromRunner = new InPort(this); + + const neighbours = (() => { + const calPos = (x: number, y: number): number => (x * ny + y); + + if (x > 0 && x < nx - 1 && y > 0 && y < ny - 1) { + return [calPos(x, y + 1), + calPos(x + 1, y), + calPos(x, y - 1), + calPos(x - 1, y)]; + } + if ((x === 0 || x === (nx - 1)) && (y === 0 || y === (ny - 1))) { + return [ + (x === 0) ? calPos(x + 1, y) : calPos(x - 1, y), + (y === 0) ? calPos(x, y + 1) : calPos(x, y - 1) + ]; + } + if ((x === 0 || x === (nx - 1)) || (y === 0 || y === (ny - 1))) { + if (x === 0 || x === nx - 1) { + return [ + (x === 0) ? calPos(x + 1, y) : calPos(x - 1, y), + calPos(x, y + 1), + calPos(x, y - 1) + ]; + } + return [ + (y === 0) ? calPos(x, y + 1) : calPos(x, y - 1), + calPos(x+1, y), + calPos(x-1, y) + ]; + } + return []; + })(); + } + + private iter = new State(0); + private maxIter = new State(0); + private msgRcv = new State(0); + private sorActors = new State([]); + + public portFromRunner: InPort; +} \ No newline at end of file diff --git a/src/benchmark/sucoverrelax/peer.ts b/src/benchmark/sucoverrelax/peer.ts new file mode 100644 index 000000000..5f70d94a7 --- /dev/null +++ b/src/benchmark/sucoverrelax/peer.ts @@ -0,0 +1,71 @@ +import { InPort, MutationSandbox, OutPort, Reactor, State } from "../../core/internal"; +import { SORActor } from "./actor"; +import { SORRunner } from "./runner"; +import { Message, MessageTypes, SorBorder as SORBorder, SorBorder } from "./sorutils"; + +export class SORPeer extends Reactor { + + s: State; + partStart: State; + matrixPart: State; + border: State; + sorSource: State + + + sorActors: State; + public portFromSORRunner: InPort; + + constructor( + parent: Reactor, + s: number, + partStart: number, + matrixPart: number[][], + border: SORBorder, + sorSource: SORRunner + ) { + super(parent, "SORPeer"); + this.sorActors = new State([]); + this.s = new State(s); + this.partStart = new State(partStart); + this.matrixPart = new State(matrixPart); + this.border = new State(border); + this.sorSource = new State(sorSour ce); + + this.portFromSORRunner = new InPort(this); + } + + static boot(this: MutationSandbox, args: BootProcessingArgs) { + const myBorder: SORActor[] = []; + const {_s, border, sorActors, partStart} = args; + const s = _s.get(); + const partStartVal = partStart.get(); + + const sorActorsValue = sorActors.get(); + const borderValue = border.get(); + for (let i = 0; i < s; ++i) { + sorActorsValue[i * (s - partStartVal + 1)] = borderValue.borderActors[i]; + } + sorActors.set(sorActorsValue); + + for () + } +} + +interface BootProcessingArgs { + type: MessageTypes.sorBootMessage, + _s: State, + border: State, + sorActors: State, + partStart: State, +} +interface ResultProcessingArgs { + type: MessageTypes.sorResultMessage, + expectingBoot: State, + totalMsgRcv: State, + returned: State, + gTotal: State, + s: State, + part: State +} + +type ProcessingArgs = BootProcessingArgs | ResultProcessingArgs; \ No newline at end of file diff --git a/src/benchmark/sucoverrelax/runner.ts b/src/benchmark/sucoverrelax/runner.ts new file mode 100644 index 000000000..56442ff2d --- /dev/null +++ b/src/benchmark/sucoverrelax/runner.ts @@ -0,0 +1,217 @@ +import { InMultiPort, InPort, MutationSandbox, OutMultiPort, OutPort, Parameter, Reactor, State, Variable, } from "../../core/internal"; +import { SORActor } from "./actor"; +import { SORPeer } from "./peer"; +import { Message, MessageTypes, SORBootMessage, SORResultMessage, SorBorder, jacobi, omega } from "./sorutils"; + +export class SORRunner extends Reactor { + protected s: State; + protected part: State; + + protected sorActors: State; + protected sorPeer: State; + + protected portToSORActors: OutPort; + protected portToSORPeer: OutPort; + // Unsure if this would work, let's just try...... + protected portFromSORActor: InPort; + protected portFromSORPeer: InPort; + + protected gTotal = new State(0.0); + protected returned = new State(0); + protected totalMsgRcv = new State(0); + protected expectingBoot = new State(true); + + // In Savina, randoms is accessed directly from SucOverRelaxConfig. + // Here we pass it in to the closure. + constructor(parent: Reactor, size: number, _randoms: number[][]) { + super(parent, "SORRunner"); + // These are in SorRunner; + this.s = new State(size); + // In the scala implementation a simple /2 was used. + // In JS we might need to enforce some sort of guarantee as it was used to calculate position + this.part = new State(Math.floor(size / 2)); + /** These are from Savina. They should be rather irrelevant, actually. */ + this.sorActors = new State([]); + this.sorPeer = new State(undefined); + + /** These are the actual messaging passing mechanism that are synonomous to that of Savina. */ + // This creates a bunch of ports. + this.portToSORActors = new OutPort(this); + this.portToSORPeer = new OutPort(this); + this.portFromSORActor = new InPort(this); + this.portFromSORPeer = new InPort(this); + + this.addMutation( + [this.startup], + [this.sorActors, this.sorPeer, this.portToSORActors, this.portToSORPeer], + function (this, sorActors, sorPeer, portToSORActors, portToSORPeer) { + // TODO: Add actual stuff + ; + } + ); + } + + // This is to be used WITHIN mutation react functions. + static process(this: MutationSandbox, message: Message, args: ProcessingArgs): void { + switch (message.messageType) { + case MessageTypes.sorBootMessage: { + if (args.type !== MessageTypes.sorBootMessage) { + throw new Error("Wrong type of arguments passed."); + } + // expectingBoot is args[0] + args.expectingBoot.set(false); + SORRunner.boot.apply(this, [args]); + break; + } + case MessageTypes.sorResultMessage: { + if (args.type !== MessageTypes.sorResultMessage) { + throw new Error("Wrong type of arguments passed."); + } + + const {mv, msgRcv} = message; + const {expectingBoot, totalMsgRcv, returned, gTotal, s, part} = args; + + if (expectingBoot.get()) { + throw new Error("SORRunner not booted yet!"); + } + + totalMsgRcv.set(totalMsgRcv.get() + msgRcv); + returned.set(returned.get() + 1); + gTotal.set(gTotal.get() + mv); + + if (returned.get() === (s.get() * part.get()) + 1) { + // TODO: validate + // TODO: exit + ; + } + break; + } + case MessageTypes.sorBorderMessage: { + if (args.type !== MessageTypes.sorBorderMessage) { + throw new Error("Wrong type of arguments passed."); + } + + const {mBorder} = message; + const {expectingBoot, s, part, sorActors, portToSORActors} = args; + + if (expectingBoot.get()) { + throw new Error("SORRunner not booted yet!"); + } + const sorActorsValue = sorActors.get(); + for (let i = 0; i <= s.get(); ++i) { + sorActorsValue[(i + 1) * (part.get() + 1) - 1] = mBorder.borderActors[i]; + } + sorActors.set(sorActorsValue); + for (let i = 0; i <= s.get(); ++i) { + for (let j = 0; j <= part.get(); ++j) { + const pos = (i * (part.get() + 1)) + j; + // Ibidem, connect then disconnect to simulate + // "fire and forget" in scala. + this.connect(portToSORActors, sorActorsValue[pos].portFromRunner); + this.getReactor().writable(portToSORActors).set( + { + messageType: MessageTypes.sorStartMessage, + mi: jacobi, + mActors: sorActorsValue + } + ); + this.disconnect(portToSORActors, sorActorsValue[pos].portFromRunner); + } + } + break; + } + default: { + throw new Error("Received wrong message from port"); + } + } + } + + // SorRunner::boot() + static boot(this: MutationSandbox, + args: BootProcessingArgs + ): void { + const {_randoms, _s, _part, sorActors, sorPeer, portToSORPeer} = args; + + const myBorder: SORActor[] = []; + const randoms = _randoms; + const s = _s.get(); + const part = _part.get(); + // In scala, (i <- 0 until s) is a loop excluding s. + const sorActorsValue = sorActors.get(); + for (let i = 0; i < s; ++i) { + let c = i % 2; + for (let j = 0; j < part; ++j) { + const pos = i * (part + 1) + j; + c = 1 - c; + // We modify them in bulk, then update the state. + // Unlike in Scala we do not need to initialise the array here, JS supports sparse array. + // I have absolutely no idea why these parametres are called as such...... + sorActorsValue[pos] = this.getReactor()._uncheckedAddSibling( + SORActor, + pos, randoms[i][j], c, s, part + 1, omega, this.getReactor(), false + ); + + if (j === (part - 1)) { + myBorder[i] = sorActorsValue[pos]; + } + + } + } + sorActors.set(sorActorsValue); + + const partialMatrix: number[][] = []; + for (let i = 0; i < s; ++i) { + for (let j = 0; j < s - part; ++j) { + partialMatrix[i][j] = randoms[i][j + part]; + } + } + + const sorPeerValue = this.getReactor()._uncheckedAddSibling( + SORPeer, + s, part, partialMatrix, new SorBorder(myBorder), + // A dirty hack. Maybe this will be removed as ports get added. + this.getReactor() as SORRunner + ); + sorPeer.set(sorPeerValue); + // Pass message. + // This is similar to Scala's !; but it looks pretty...... interesting in LF. + // If node is concurrent or parallel, this might be a problem, so direct copy-pastaing to C++ runtime might not work. + this.connect(portToSORPeer, sorPeerValue.portFromSORRunner); + this.getReactor().writable(portToSORPeer).set({messageType: MessageTypes.sorBootMessage}); + // Disconnect immediately. + this.disconnect(portToSORPeer, sorPeerValue.portFromSORRunner); + } +} + + +interface BootProcessingArgs { + type: MessageTypes.sorBootMessage, + expectingBoot: State, + _randoms: number[][], + _s: State, + _part: State, + sorActors: State, + sorPeer: State, + portToSORPeer: OutPort +} + +interface ResultProcessingArgs { + type: MessageTypes.sorResultMessage, + expectingBoot: State, + totalMsgRcv: State, + returned: State, + gTotal: State, + s: State, + part: State +} + +interface BorderProcessingArgs { + type: MessageTypes.sorBorderMessage, + expectingBoot: State, + s: State, + part: State, + sorActors: State, + portToSORActors: OutPort +} + +type ProcessingArgs = BootProcessingArgs | ResultProcessingArgs | BorderProcessingArgs; \ No newline at end of file diff --git a/src/benchmark/sucoverrelax/sorutils.ts b/src/benchmark/sucoverrelax/sorutils.ts new file mode 100644 index 000000000..1fe3f456a --- /dev/null +++ b/src/benchmark/sucoverrelax/sorutils.ts @@ -0,0 +1,109 @@ +import { Reactor } from "../../core/reactor"; +import { SORActor } from "./actor"; + +// Savina implementation of PRNG +export class SavinaPRNG { + private value: number; + + constructor(value?: number) { + this.value = value ?? 1145141919; + } + + public nextNumber(): number { + this.value = ((this.value * 1309) + 13849) & 65535; + return this.value; + } + + public nextFloat(): number { + return 1.0 / (this.nextNumber() + 1); + } +} + +// This is not a recommended way to use JS, but whatever...... + +export const refVal = [ + 0.000003189420084871275, + 0.001846644602759566, + 0.0032099996270638005, + 0.0050869220175413146, + 0.008496328291240363, + 0.016479973604143234, + 0.026575660248076397, + // This is different from the Savina one because JS doesn't have high precision + 1.026575660248076, + 2.026575660248076, + 3.026575660248076 +]; + +export const jacobi = 100; + +export const omega = 1.25; + +export function randomMatrix(m: number, n: number): number[][] { + const mat = []; + const prng = new SavinaPRNG(114514); + for (let i = 0; i < m; ++i) { + const row = []; + for (let j = 0; j < n; ++j) { + row.push(prng.nextFloat() * 1e-6); + } + mat.push(row); + } + return mat; +} + +export function jgfValidate(gTotal: number, size: number): void { + const dev = Math.abs(gTotal - refVal[size]); + if (dev > 1.0e-12) { + console.log("Validation failed"); + console.log(`GTotal=${gTotal}; ${refVal[size]}; ${dev}; ${size}`); + } else { + console.log("Validation OK!"); + } +} + +export enum MessageTypes { + sorBootMessage, + sorResultMessage, + sorBorderMessage, + sorStartMessage, + sorValueMessage, +} + +export class SorBorder { + borderActors: SORActor[]; + constructor(borderActors: SORActor[]) { + this.borderActors = borderActors; + } +} + +export interface SORBootMessage { + messageType: MessageTypes.sorBootMessage; +} + +export interface SORResultMessage { + messageType: MessageTypes.sorResultMessage; + mx: number; + my: number; + mv: number; + msgRcv: number; +} + +export interface SORBorderMessage { + messageType: MessageTypes.sorBorderMessage; + + mBorder: SorBorder; +} + +export interface SORStartMessage { + messageType: MessageTypes.sorStartMessage; + mi: number; + mActors: SORActor[]; +} + +export interface SORValueMessage { + messageType: MessageTypes.sorValueMessage; +} + +export type Message = SORBootMessage | SORResultMessage | SORBorderMessage | SORStartMessage | SORValueMessage; + diff --git a/src/benchmark/sucoverrelax/sucoverrelax.ts b/src/benchmark/sucoverrelax/sucoverrelax.ts new file mode 100644 index 000000000..652187c56 --- /dev/null +++ b/src/benchmark/sucoverrelax/sucoverrelax.ts @@ -0,0 +1,20 @@ +import { + type WritablePort, + Parameter, + InPort, + OutPort, + State, + Action, + Reactor, + App, + TimeValue, + Origin, + Log + } from "../../core/internal"; + + + + + + + diff --git a/src/benchmark/sucoverrelax/test.ts b/src/benchmark/sucoverrelax/test.ts new file mode 100644 index 000000000..b8b17de47 --- /dev/null +++ b/src/benchmark/sucoverrelax/test.ts @@ -0,0 +1,88 @@ +import { App, InPort, OutPort, Reactor } from "../../core/internal"; + +class Master extends Reactor { + outp: OutPort; + + constructor(parent: Reactor, receivers: Receiver[]) { + super(parent, ""); + this.outp = new OutPort(this); + + this.addMutation( + [this.startup], + [this.outp], + function(this, outp) { + let i = 0; + for (const r of receivers) { + this.connect(outp, r.inp); + console.log(`Master: triggering ${i}`) + this.getReactor().writable(outp).set(i); + console.log(`Master: disconnecting ${i}`) + this.disconnect(outp, r.inp); + ++i; + } + } + ); + } +} + +class Receiver extends Reactor { + inp: InPort; + outp: OutPort; + + constructor(parent: Reactor, receiver2: Receiver2) { + super(parent, ""); + this.inp = new InPort(this); + this.outp = new OutPort(this); + + this.addMutation( + [this.inp], + [this.inp, this.outp], + function(this, inp, outp) { + const message = inp.get(); + if (message == null) { + throw Error("Receiver: Message is null."); + } + console.log(`Receiver: message ${message}. Sending.`); + this.connect(outp, receiver2.inp); + this.getReactor().writable(outp).set(message); + this.disconnect(outp, receiver2.inp); + } + ); + } +} + +class Receiver2 extends Reactor { + inp: InPort; + + constructor(parent: Reactor) { + super(parent, ""); + this.inp = new InPort(this); + + this.addReaction( + [this.inp], + [this.inp], + function (this, inp) { + console.log(`Receiver2: received ${inp.get()}`) + } + ); + } +} + +class Apppp extends App { + master: Master; + recvs: Receiver[]; + recv2: Receiver2; + + constructor() { + super(undefined, undefined, false, ()=>(undefined), ()=>(undefined), ""); + this.recv2 = new Receiver2(this); + this.recvs = []; + for (let i = 0; i < 10; ++i) { + this.recvs.push(new Receiver(this, this.recv2)); + } + this.master = new Master(this, this.recvs); + } +} + +const app = new Apppp(); +app._start(); \ No newline at end of file From aa17310aa1f733c55a1cd20028f5fcb795de2a1e Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Fri, 15 Sep 2023 12:04:55 -0700 Subject: [PATCH 4/6] wip --- src/benchmark/sucoverrelax/peer.ts | 2 +- src/benchmark/sucoverrelax/runner.ts | 232 +++++++------------------ src/benchmark/sucoverrelax/sorutils.ts | 16 ++ 3 files changed, 75 insertions(+), 175 deletions(-) diff --git a/src/benchmark/sucoverrelax/peer.ts b/src/benchmark/sucoverrelax/peer.ts index 5f70d94a7..f26a66611 100644 --- a/src/benchmark/sucoverrelax/peer.ts +++ b/src/benchmark/sucoverrelax/peer.ts @@ -29,7 +29,7 @@ export class SORPeer extends Reactor { this.partStart = new State(partStart); this.matrixPart = new State(matrixPart); this.border = new State(border); - this.sorSource = new State(sorSour ce); + this.sorSource = new State(sorSource); this.portFromSORRunner = new InPort(this); } diff --git a/src/benchmark/sucoverrelax/runner.ts b/src/benchmark/sucoverrelax/runner.ts index 56442ff2d..a3a4add34 100644 --- a/src/benchmark/sucoverrelax/runner.ts +++ b/src/benchmark/sucoverrelax/runner.ts @@ -1,7 +1,7 @@ import { InMultiPort, InPort, MutationSandbox, OutMultiPort, OutPort, Parameter, Reactor, State, Variable, } from "../../core/internal"; import { SORActor } from "./actor"; import { SORPeer } from "./peer"; -import { Message, MessageTypes, SORBootMessage, SORResultMessage, SorBorder, jacobi, omega } from "./sorutils"; +import { Message, MessageTypes, SORBootMessage, SORBorderMessage, SORResultMessage, SORStartMessage, SorBorder, jacobi, omega, randomMatrix } from "./sorutils"; export class SORRunner extends Reactor { protected s: State; @@ -10,11 +10,26 @@ export class SORRunner extends Reactor { protected sorActors: State; protected sorPeer: State; - protected portToSORActors: OutPort; - protected portToSORPeer: OutPort; - // Unsure if this would work, let's just try...... - protected portFromSORActor: InPort; - protected portFromSORPeer: InPort; + // Definition of Ports. Initialisation will take place in the constructor, because some of them needs to access + // some parametres. + /// Runner will interact with App. It will receive BootMsg, and send ResultMsg. + /// In the original Savina benchmark suite there is no ResultMsg. + /// Instead, Runner handles validation and simply quits the programme. + /// We are taking a more based approach but there is no difference. + /// See https://github.com/shamsimam/savina/blob/52e546959a57670cdba7a88f0a030b53cfdb16d6/src/main/scala/edu/rice/habanero/benchmarks/sor/SucOverRelaxAkkaActorBenchmark.scala#L112-L115 + protected portFromApp: InPort; + protected portToApp: OutPort; + + /// Runner will send StartMsg to all actors, and receive ResultMsg from all actors. + /// Runner will also supply them with a the list of actors. We will do the same, connection will be handled by individual Actors. + protected portsFromActors: InMultiPort; + protected portsToActors: OutMultiPort; + + /// Runner will send BootMsg to Peer, and receive BorderMsg OR ResultMsg from Peer. + /// I am using a 3-port approach here, this way I can handle SORResultMessage in a single mutation. + protected portFromPeerBorder: InPort; + protected portFromPeerResult: InPort; + protected portToPeer: OutPort; protected gTotal = new State(0.0); protected returned = new State(0); @@ -23,195 +38,64 @@ export class SORRunner extends Reactor { // In Savina, randoms is accessed directly from SucOverRelaxConfig. // Here we pass it in to the closure. - constructor(parent: Reactor, size: number, _randoms: number[][]) { - super(parent, "SORRunner"); - // These are in SorRunner; - this.s = new State(size); + constructor(parent: Reactor, s: number, _randoms: number[][]) { + super(parent); + this.s = new State(s); // In the scala implementation a simple /2 was used. // In JS we might need to enforce some sort of guarantee as it was used to calculate position - this.part = new State(Math.floor(size / 2)); - /** These are from Savina. They should be rather irrelevant, actually. */ + const part = Math.floor(s / 2); + this.part = new State(part); + + // The following lines are from Savina. They should be rather irrelevant, actually, because message passing are handled by ports this.sorActors = new State([]); this.sorPeer = new State(undefined); - /** These are the actual messaging passing mechanism that are synonomous to that of Savina. */ - // This creates a bunch of ports. - this.portToSORActors = new OutPort(this); - this.portToSORPeer = new OutPort(this); - this.portFromSORActor = new InPort(this); - this.portFromSORPeer = new InPort(this); + // Initialisation of ports + this.portFromApp = new InPort(this); + this.portToApp = new OutPort(this); + // size * (part + 1) is the size of sorActors in the Savina benchmark + this.portsFromActors = new InMultiPort(this, s * (part + 1)); + this.portsToActors = new OutMultiPort(this, s * (part + 1)); + + this.portFromPeerBorder = new InPort(this); + this.portFromPeerResult = new InPort(this); + this.portToPeer = new OutPort(this); this.addMutation( [this.startup], - [this.sorActors, this.sorPeer, this.portToSORActors, this.portToSORPeer], - function (this, sorActors, sorPeer, portToSORActors, portToSORPeer) { - // TODO: Add actual stuff - ; + [], + function (this) { + console.log("I am SORRunner [M1]. I am trolling.") } ); - } - - // This is to be used WITHIN mutation react functions. - static process(this: MutationSandbox, message: Message, args: ProcessingArgs): void { - switch (message.messageType) { - case MessageTypes.sorBootMessage: { - if (args.type !== MessageTypes.sorBootMessage) { - throw new Error("Wrong type of arguments passed."); - } - // expectingBoot is args[0] - args.expectingBoot.set(false); - SORRunner.boot.apply(this, [args]); - break; - } - case MessageTypes.sorResultMessage: { - if (args.type !== MessageTypes.sorResultMessage) { - throw new Error("Wrong type of arguments passed."); - } - - const {mv, msgRcv} = message; - const {expectingBoot, totalMsgRcv, returned, gTotal, s, part} = args; - - if (expectingBoot.get()) { - throw new Error("SORRunner not booted yet!"); - } - - totalMsgRcv.set(totalMsgRcv.get() + msgRcv); - returned.set(returned.get() + 1); - gTotal.set(gTotal.get() + mv); - - if (returned.get() === (s.get() * part.get()) + 1) { - // TODO: validate - // TODO: exit - ; - } - break; - } - case MessageTypes.sorBorderMessage: { - if (args.type !== MessageTypes.sorBorderMessage) { - throw new Error("Wrong type of arguments passed."); - } - - const {mBorder} = message; - const {expectingBoot, s, part, sorActors, portToSORActors} = args; - - if (expectingBoot.get()) { - throw new Error("SORRunner not booted yet!"); - } - const sorActorsValue = sorActors.get(); - for (let i = 0; i <= s.get(); ++i) { - sorActorsValue[(i + 1) * (part.get() + 1) - 1] = mBorder.borderActors[i]; - } - sorActors.set(sorActorsValue); - for (let i = 0; i <= s.get(); ++i) { - for (let j = 0; j <= part.get(); ++j) { - const pos = (i * (part.get() + 1)) + j; - // Ibidem, connect then disconnect to simulate - // "fire and forget" in scala. - this.connect(portToSORActors, sorActorsValue[pos].portFromRunner); - this.getReactor().writable(portToSORActors).set( - { - messageType: MessageTypes.sorStartMessage, - mi: jacobi, - mActors: sorActorsValue - } - ); - this.disconnect(portToSORActors, sorActorsValue[pos].portFromRunner); - } - } - break; - } - default: { - throw new Error("Received wrong message from port"); - } - } - } - - // SorRunner::boot() - static boot(this: MutationSandbox, - args: BootProcessingArgs - ): void { - const {_randoms, _s, _part, sorActors, sorPeer, portToSORPeer} = args; + // SORRunner::process(mst: SorBootMessage) and SORRunner::boot + this.addMutation( + [this.portFromApp], + [this.portFromApp, this.expectingBoot, this.sorActors], + function(this, portFromApp, expectingBoot, sorActorsState) { + const { A } = portFromApp.get()!; + + const sorActors = sorActorsState.get(); + expectingBoot.set(false); + // SORRunner::boot const myBorder: SORActor[] = []; - const randoms = _randoms; - const s = _s.get(); - const part = _part.get(); - // In scala, (i <- 0 until s) is a loop excluding s. - const sorActorsValue = sorActors.get(); + const randoms = randomMatrix; + // In Scala/Akka, 0 until s is [0, s) for (let i = 0; i < s; ++i) { let c = i % 2; for (let j = 0; j < part; ++j) { - const pos = i * (part + 1) + j; + const pos = i * (part + i) + j; c = 1 - c; - // We modify them in bulk, then update the state. - // Unlike in Scala we do not need to initialise the array here, JS supports sparse array. - // I have absolutely no idea why these parametres are called as such...... - sorActorsValue[pos] = this.getReactor()._uncheckedAddSibling( - SORActor, - pos, randoms[i][j], c, s, part + 1, omega, this.getReactor(), false - ); - - if (j === (part - 1)) { - myBorder[i] = sorActorsValue[pos]; - } - + sorActors[pos] = this.addSibling(SORActor, pos, A[i][j], c, s, part + 1, omega, this.getReactor(), false); + } } - sorActors.set(sorActorsValue); + } + ) - const partialMatrix: number[][] = []; - for (let i = 0; i < s; ++i) { - for (let j = 0; j < s - part; ++j) { - partialMatrix[i][j] = randoms[i][j + part]; - } - } - const sorPeerValue = this.getReactor()._uncheckedAddSibling( - SORPeer, - s, part, partialMatrix, new SorBorder(myBorder), - // A dirty hack. Maybe this will be removed as ports get added. - this.getReactor() as SORRunner - ); - sorPeer.set(sorPeerValue); - // Pass message. - // This is similar to Scala's !; but it looks pretty...... interesting in LF. - // If node is concurrent or parallel, this might be a problem, so direct copy-pastaing to C++ runtime might not work. - this.connect(portToSORPeer, sorPeerValue.portFromSORRunner); - this.getReactor().writable(portToSORPeer).set({messageType: MessageTypes.sorBootMessage}); - // Disconnect immediately. - this.disconnect(portToSORPeer, sorPeerValue.portFromSORRunner); } -} -interface BootProcessingArgs { - type: MessageTypes.sorBootMessage, - expectingBoot: State, - _randoms: number[][], - _s: State, - _part: State, - sorActors: State, - sorPeer: State, - portToSORPeer: OutPort -} - -interface ResultProcessingArgs { - type: MessageTypes.sorResultMessage, - expectingBoot: State, - totalMsgRcv: State, - returned: State, - gTotal: State, - s: State, - part: State } - -interface BorderProcessingArgs { - type: MessageTypes.sorBorderMessage, - expectingBoot: State, - s: State, - part: State, - sorActors: State, - portToSORActors: OutPort -} - -type ProcessingArgs = BootProcessingArgs | ResultProcessingArgs | BorderProcessingArgs; \ No newline at end of file diff --git a/src/benchmark/sucoverrelax/sorutils.ts b/src/benchmark/sucoverrelax/sorutils.ts index 1fe3f456a..e80a7e12a 100644 --- a/src/benchmark/sucoverrelax/sorutils.ts +++ b/src/benchmark/sucoverrelax/sorutils.ts @@ -62,6 +62,19 @@ export function jgfValidate(gTotal: number, size: number): void { } } +// This is not present in the original Savina benchmark. +// There, an array of Actors are passed to other SORActor to allow them to communicate with their neighbours. +// In reality, again, they only communicate with their neighbours, which are to these four directions. +export enum Direction { + UP=0b00, + DOWN=0b01, + LEFT=0b10, + RIGHT=0b11, +} +// And to ensure that we don't want to shoot ourselves when reading our code, +// this is to help understand that connection to LEFT will be connected to connection from RIGHT. +export const oppositeDirection = (direction: Direction): Direction => (direction ^ 0b01); + export enum MessageTypes { sorBootMessage, sorResultMessage, @@ -79,6 +92,9 @@ export class SorBorder { export interface SORBootMessage { messageType: MessageTypes.sorBootMessage; + // This is SucOverRelaxConfig.A. + // eslint-disable-next-line @typescript-eslint/naming-convention + A: number[][]; } export interface SORResultMessage { From 7b7ef39ecc6dea9317c3923d2b6d00d66bafd549 Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Mon, 18 Sep 2023 01:46:07 -0700 Subject: [PATCH 5/6] wip --- src/benchmark/sucoverrelax/sucoverrelax.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/benchmark/sucoverrelax/sucoverrelax.ts b/src/benchmark/sucoverrelax/sucoverrelax.ts index 652187c56..577862548 100644 --- a/src/benchmark/sucoverrelax/sucoverrelax.ts +++ b/src/benchmark/sucoverrelax/sucoverrelax.ts @@ -17,4 +17,3 @@ import { - From c59fc3822c45d7aa9a0f5803510753af4c8dbe0f Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Thu, 21 Sep 2023 10:57:50 -0700 Subject: [PATCH 6/6] wip --- src/benchmark/sucoverrelax/runner.ts | 45 ++++++++++++++++++++------ src/benchmark/sucoverrelax/sorutils.ts | 26 ++++++++++++++- 2 files changed, 60 insertions(+), 11 deletions(-) diff --git a/src/benchmark/sucoverrelax/runner.ts b/src/benchmark/sucoverrelax/runner.ts index a3a4add34..828940bec 100644 --- a/src/benchmark/sucoverrelax/runner.ts +++ b/src/benchmark/sucoverrelax/runner.ts @@ -1,7 +1,7 @@ import { InMultiPort, InPort, MutationSandbox, OutMultiPort, OutPort, Parameter, Reactor, State, Variable, } from "../../core/internal"; import { SORActor } from "./actor"; -import { SORPeer } from "./peer"; -import { Message, MessageTypes, SORBootMessage, SORBorderMessage, SORResultMessage, SORStartMessage, SorBorder, jacobi, omega, randomMatrix } from "./sorutils"; +import { SORPeer, SORPeer } from "./peer"; +import { Message, MessageTypes, SORBootMessage, SORBorderMessage, SORResultMessage, SORStartMessage, SorBorder, arrayOfDim, jacobi, omega, randomMatrix } from "./sorutils"; export class SORRunner extends Reactor { protected s: State; @@ -69,31 +69,56 @@ export class SORRunner extends Reactor { } ); - // SORRunner::process(mst: SorBootMessage) and SORRunner::boot + // SORRunner::process(msg: SorBootMessage) and SORRunner::boot this.addMutation( [this.portFromApp], - [this.portFromApp, this.expectingBoot, this.sorActors], - function(this, portFromApp, expectingBoot, sorActorsState) { - const { A } = portFromApp.get()!; + [this.portFromApp, this.expectingBoot, this.sorActors, this.sorPeer, this.writable (this.portToPeer)], + function(this, portFromApp, expectingBoot, sorActorsState, sorPeerState, portToPeer) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const randoms = portFromApp.get()!.A!; const sorActors = sorActorsState.get(); expectingBoot.set(false); // SORRunner::boot const myBorder: SORActor[] = []; - const randoms = randomMatrix; // In Scala/Akka, 0 until s is [0, s) for (let i = 0; i < s; ++i) { let c = i % 2; for (let j = 0; j < part; ++j) { const pos = i * (part + i) + j; c = 1 - c; - sorActors[pos] = this.addSibling(SORActor, pos, A[i][j], c, s, part + 1, omega, this.getReactor(), false); - + sorActors[pos] = this.addSibling(SORActor, pos, randoms[i][j], c, s, part + 1, omega, this.getReactor(), false); + if (j === (part - 1)) { + myBorder[i] = sorActors[pos]; + } } } + const partialMatrix = arrayOfDim([s, s - part], 0 as number); + for (let i = 0; i < s; ++i) { + for (let j = 0; j < s - part; ++j) { + partialMatrix[i][j] = randoms[i][j + part]; + } + } + const sorPeer = this.addSibling(SORPeer, s, part, partialMatrix, new SorBorder(myBorder), this.getReactor() as SORRunner); + this.connect(portToPeer.getPort(), sorPeer.portFromSORRunner); + sorPeerState.set(sorPeer); + portToPeer.set({ + messageType: MessageTypes.sorBootMessage, + A: undefined + }); } - ) + ); + this.addMutation( + [this.portsFromActors, this.portFromPeerResult], + [this.portsFromActors, this.portFromPeerResult, this.totalMsgRcv], + function (this, portsFromActors, portFromPeerResult, totalMsgRcvState) { + const message = + + const totalMsgRcv = totalMsgRcvState.get(); + + } + ); } diff --git a/src/benchmark/sucoverrelax/sorutils.ts b/src/benchmark/sucoverrelax/sorutils.ts index e80a7e12a..c34695cd8 100644 --- a/src/benchmark/sucoverrelax/sorutils.ts +++ b/src/benchmark/sucoverrelax/sorutils.ts @@ -19,6 +19,30 @@ export class SavinaPRNG { } } +// Some type gymnastics that replicates Array.ofDim in Akka/Scala +// See https://stackoverflow.com/a/72139481 for reference +// Usage: arrayOfDim([1,2,3,4], -1 as number) (the `as` part is very important) +type NDArr = + T extends [number, ...infer K] + ? K extends number[] ? + Array> + : never + : R; + +export const arrayOfDim = (args: [...T], fill: R): NDArr => { + if (args.length > 0) { + const dim = args[0]; + const rest = args.slice(1); + const newArr = []; + for (let i = 0; i < dim; ++i) { + newArr[i] = arrayOfDim(rest, fill); + } + return newArr as NDArr; + } else { + return fill as NDArr; +} +}; + // This is not a recommended way to use JS, but whatever...... export const refVal = [ @@ -94,7 +118,7 @@ export interface SORBootMessage { messageType: MessageTypes.sorBootMessage; // This is SucOverRelaxConfig.A. // eslint-disable-next-line @typescript-eslint/naming-convention - A: number[][]; + A: number[][] | undefined; } export interface SORResultMessage {