diff --git a/src/benchmark/sucoverrelax/actor.ts b/src/benchmark/sucoverrelax/actor.ts new file mode 100644 index 000000000..b53c8d3fe --- /dev/null +++ b/src/benchmark/sucoverrelax/actor.ts @@ -0,0 +1,67 @@ +/* eslint-disable @typescript-eslint/prefer-readonly */ +// The SOR (Re)actor +import { InPort, Reactor, State } from "../../core/internal"; +import { Message } from "./sorutils"; + +export class SORActor extends Reactor { + constructor( + parent: Reactor, + pos: number, + _value: number, + colour: number, + nx: number, + ny: number, + omega: number, + sorSource: Reactor, + peer: boolean + ) { + super(parent, "SORActor"); + + const x = Math.floor(pos / ny); + const y = pos % ny; + + const omegaOverFour = 0.25 * omega; + const oneMinusOmega = 1.0 - omega; + + this.portFromRunner = new InPort(this); + + const neighbours = (() => { + const calPos = (x: number, y: number): number => (x * ny + y); + + if (x > 0 && x < nx - 1 && y > 0 && y < ny - 1) { + return [calPos(x, y + 1), + calPos(x + 1, y), + calPos(x, y - 1), + calPos(x - 1, y)]; + } + if ((x === 0 || x === (nx - 1)) && (y === 0 || y === (ny - 1))) { + return [ + (x === 0) ? calPos(x + 1, y) : calPos(x - 1, y), + (y === 0) ? calPos(x, y + 1) : calPos(x, y - 1) + ]; + } + if ((x === 0 || x === (nx - 1)) || (y === 0 || y === (ny - 1))) { + if (x === 0 || x === nx - 1) { + return [ + (x === 0) ? calPos(x + 1, y) : calPos(x - 1, y), + calPos(x, y + 1), + calPos(x, y - 1) + ]; + } + return [ + (y === 0) ? calPos(x, y + 1) : calPos(x, y - 1), + calPos(x+1, y), + calPos(x-1, y) + ]; + } + return []; + })(); + } + + private iter = new State(0); + private maxIter = new State(0); + private msgRcv = new State(0); + private sorActors = new State([]); + + public portFromRunner: InPort; +} \ No newline at end of file diff --git a/src/benchmark/sucoverrelax/peer.ts b/src/benchmark/sucoverrelax/peer.ts new file mode 100644 index 000000000..f26a66611 --- /dev/null +++ b/src/benchmark/sucoverrelax/peer.ts @@ -0,0 +1,71 @@ +import { InPort, MutationSandbox, OutPort, Reactor, State } from "../../core/internal"; +import { SORActor } from "./actor"; +import { SORRunner } from "./runner"; +import { Message, MessageTypes, SorBorder as SORBorder, SorBorder } from "./sorutils"; + +export class SORPeer extends Reactor { + + s: State; + partStart: State; + matrixPart: State; + border: State; + sorSource: State + + + sorActors: State; + public portFromSORRunner: InPort; + + constructor( + parent: Reactor, + s: number, + partStart: number, + matrixPart: number[][], + border: SORBorder, + sorSource: SORRunner + ) { + super(parent, "SORPeer"); + this.sorActors = new State([]); + this.s = new State(s); + this.partStart = new State(partStart); + this.matrixPart = new State(matrixPart); + this.border = new State(border); + this.sorSource = new State(sorSource); + + this.portFromSORRunner = new InPort(this); + } + + static boot(this: MutationSandbox, args: BootProcessingArgs) { + const myBorder: SORActor[] = []; + const {_s, border, sorActors, partStart} = args; + const s = _s.get(); + const partStartVal = partStart.get(); + + const sorActorsValue = sorActors.get(); + const borderValue = border.get(); + for (let i = 0; i < s; ++i) { + sorActorsValue[i * (s - partStartVal + 1)] = borderValue.borderActors[i]; + } + sorActors.set(sorActorsValue); + + for () + } +} + +interface BootProcessingArgs { + type: MessageTypes.sorBootMessage, + _s: State, + border: State, + sorActors: State, + partStart: State, +} +interface ResultProcessingArgs { + type: MessageTypes.sorResultMessage, + expectingBoot: State, + totalMsgRcv: State, + returned: State, + gTotal: State, + s: State, + part: State +} + +type ProcessingArgs = BootProcessingArgs | ResultProcessingArgs; \ No newline at end of file diff --git a/src/benchmark/sucoverrelax/runner.ts b/src/benchmark/sucoverrelax/runner.ts new file mode 100644 index 000000000..828940bec --- /dev/null +++ b/src/benchmark/sucoverrelax/runner.ts @@ -0,0 +1,126 @@ +import { InMultiPort, InPort, MutationSandbox, OutMultiPort, OutPort, Parameter, Reactor, State, Variable, } from "../../core/internal"; +import { SORActor } from "./actor"; +import { SORPeer, SORPeer } from "./peer"; +import { Message, MessageTypes, SORBootMessage, SORBorderMessage, SORResultMessage, SORStartMessage, SorBorder, arrayOfDim, jacobi, omega, randomMatrix } from "./sorutils"; + +export class SORRunner extends Reactor { + protected s: State; + protected part: State; + + protected sorActors: State; + protected sorPeer: State; + + // Definition of Ports. Initialisation will take place in the constructor, because some of them needs to access + // some parametres. + /// Runner will interact with App. It will receive BootMsg, and send ResultMsg. + /// In the original Savina benchmark suite there is no ResultMsg. + /// Instead, Runner handles validation and simply quits the programme. + /// We are taking a more based approach but there is no difference. + /// See https://github.com/shamsimam/savina/blob/52e546959a57670cdba7a88f0a030b53cfdb16d6/src/main/scala/edu/rice/habanero/benchmarks/sor/SucOverRelaxAkkaActorBenchmark.scala#L112-L115 + protected portFromApp: InPort; + protected portToApp: OutPort; + + /// Runner will send StartMsg to all actors, and receive ResultMsg from all actors. + /// Runner will also supply them with a the list of actors. We will do the same, connection will be handled by individual Actors. + protected portsFromActors: InMultiPort; + protected portsToActors: OutMultiPort; + + /// Runner will send BootMsg to Peer, and receive BorderMsg OR ResultMsg from Peer. + /// I am using a 3-port approach here, this way I can handle SORResultMessage in a single mutation. + protected portFromPeerBorder: InPort; + protected portFromPeerResult: InPort; + protected portToPeer: OutPort; + + protected gTotal = new State(0.0); + protected returned = new State(0); + protected totalMsgRcv = new State(0); + protected expectingBoot = new State(true); + + // In Savina, randoms is accessed directly from SucOverRelaxConfig. + // Here we pass it in to the closure. + constructor(parent: Reactor, s: number, _randoms: number[][]) { + super(parent); + this.s = new State(s); + // In the scala implementation a simple /2 was used. + // In JS we might need to enforce some sort of guarantee as it was used to calculate position + const part = Math.floor(s / 2); + this.part = new State(part); + + // The following lines are from Savina. They should be rather irrelevant, actually, because message passing are handled by ports + this.sorActors = new State([]); + this.sorPeer = new State(undefined); + + // Initialisation of ports + this.portFromApp = new InPort(this); + this.portToApp = new OutPort(this); + // size * (part + 1) is the size of sorActors in the Savina benchmark + this.portsFromActors = new InMultiPort(this, s * (part + 1)); + this.portsToActors = new OutMultiPort(this, s * (part + 1)); + + this.portFromPeerBorder = new InPort(this); + this.portFromPeerResult = new InPort(this); + this.portToPeer = new OutPort(this); + + this.addMutation( + [this.startup], + [], + function (this) { + console.log("I am SORRunner [M1]. I am trolling.") + } + ); + + // SORRunner::process(msg: SorBootMessage) and SORRunner::boot + this.addMutation( + [this.portFromApp], + [this.portFromApp, this.expectingBoot, this.sorActors, this.sorPeer, this.writable (this.portToPeer)], + function(this, portFromApp, expectingBoot, sorActorsState, sorPeerState, portToPeer) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const randoms = portFromApp.get()!.A!; + + const sorActors = sorActorsState.get(); + expectingBoot.set(false); + // SORRunner::boot + const myBorder: SORActor[] = []; + // In Scala/Akka, 0 until s is [0, s) + for (let i = 0; i < s; ++i) { + let c = i % 2; + for (let j = 0; j < part; ++j) { + const pos = i * (part + i) + j; + c = 1 - c; + sorActors[pos] = this.addSibling(SORActor, pos, randoms[i][j], c, s, part + 1, omega, this.getReactor(), false); + if (j === (part - 1)) { + myBorder[i] = sorActors[pos]; + } + } + } + const partialMatrix = arrayOfDim([s, s - part], 0 as number); + for (let i = 0; i < s; ++i) { + for (let j = 0; j < s - part; ++j) { + partialMatrix[i][j] = randoms[i][j + part]; + } + } + const sorPeer = this.addSibling(SORPeer, s, part, partialMatrix, new SorBorder(myBorder), this.getReactor() as SORRunner); + this.connect(portToPeer.getPort(), sorPeer.portFromSORRunner); + sorPeerState.set(sorPeer); + portToPeer.set({ + messageType: MessageTypes.sorBootMessage, + A: undefined + }); + } + ); + + this.addMutation( + [this.portsFromActors, this.portFromPeerResult], + [this.portsFromActors, this.portFromPeerResult, this.totalMsgRcv], + function (this, portsFromActors, portFromPeerResult, totalMsgRcvState) { + const message = + + const totalMsgRcv = totalMsgRcvState.get(); + + } + ); + + } + + +} diff --git a/src/benchmark/sucoverrelax/sorutils.ts b/src/benchmark/sucoverrelax/sorutils.ts new file mode 100644 index 000000000..c34695cd8 --- /dev/null +++ b/src/benchmark/sucoverrelax/sorutils.ts @@ -0,0 +1,149 @@ +import { Reactor } from "../../core/reactor"; +import { SORActor } from "./actor"; + +// Savina implementation of PRNG +export class SavinaPRNG { + private value: number; + + constructor(value?: number) { + this.value = value ?? 1145141919; + } + + public nextNumber(): number { + this.value = ((this.value * 1309) + 13849) & 65535; + return this.value; + } + + public nextFloat(): number { + return 1.0 / (this.nextNumber() + 1); + } +} + +// Some type gymnastics that replicates Array.ofDim in Akka/Scala +// See https://stackoverflow.com/a/72139481 for reference +// Usage: arrayOfDim([1,2,3,4], -1 as number) (the `as` part is very important) +type NDArr = + T extends [number, ...infer K] + ? K extends number[] ? + Array> + : never + : R; + +export const arrayOfDim = (args: [...T], fill: R): NDArr => { + if (args.length > 0) { + const dim = args[0]; + const rest = args.slice(1); + const newArr = []; + for (let i = 0; i < dim; ++i) { + newArr[i] = arrayOfDim(rest, fill); + } + return newArr as NDArr; + } else { + return fill as NDArr; +} +}; + +// This is not a recommended way to use JS, but whatever...... + +export const refVal = [ + 0.000003189420084871275, + 0.001846644602759566, + 0.0032099996270638005, + 0.0050869220175413146, + 0.008496328291240363, + 0.016479973604143234, + 0.026575660248076397, + // This is different from the Savina one because JS doesn't have high precision + 1.026575660248076, + 2.026575660248076, + 3.026575660248076 +]; + +export const jacobi = 100; + +export const omega = 1.25; + +export function randomMatrix(m: number, n: number): number[][] { + const mat = []; + const prng = new SavinaPRNG(114514); + for (let i = 0; i < m; ++i) { + const row = []; + for (let j = 0; j < n; ++j) { + row.push(prng.nextFloat() * 1e-6); + } + mat.push(row); + } + return mat; +} + +export function jgfValidate(gTotal: number, size: number): void { + const dev = Math.abs(gTotal - refVal[size]); + if (dev > 1.0e-12) { + console.log("Validation failed"); + console.log(`GTotal=${gTotal}; ${refVal[size]}; ${dev}; ${size}`); + } else { + console.log("Validation OK!"); + } +} + +// This is not present in the original Savina benchmark. +// There, an array of Actors are passed to other SORActor to allow them to communicate with their neighbours. +// In reality, again, they only communicate with their neighbours, which are to these four directions. +export enum Direction { + UP=0b00, + DOWN=0b01, + LEFT=0b10, + RIGHT=0b11, +} +// And to ensure that we don't want to shoot ourselves when reading our code, +// this is to help understand that connection to LEFT will be connected to connection from RIGHT. +export const oppositeDirection = (direction: Direction): Direction => (direction ^ 0b01); + +export enum MessageTypes { + sorBootMessage, + sorResultMessage, + sorBorderMessage, + sorStartMessage, + sorValueMessage, +} + +export class SorBorder { + borderActors: SORActor[]; + constructor(borderActors: SORActor[]) { + this.borderActors = borderActors; + } +} + +export interface SORBootMessage { + messageType: MessageTypes.sorBootMessage; + // This is SucOverRelaxConfig.A. + // eslint-disable-next-line @typescript-eslint/naming-convention + A: number[][] | undefined; +} + +export interface SORResultMessage { + messageType: MessageTypes.sorResultMessage; + mx: number; + my: number; + mv: number; + msgRcv: number; +} + +export interface SORBorderMessage { + messageType: MessageTypes.sorBorderMessage; + + mBorder: SorBorder; +} + +export interface SORStartMessage { + messageType: MessageTypes.sorStartMessage; + mi: number; + mActors: SORActor[]; +} + +export interface SORValueMessage { + messageType: MessageTypes.sorValueMessage; +} + +export type Message = SORBootMessage | SORResultMessage | SORBorderMessage | SORStartMessage | SORValueMessage; + diff --git a/src/benchmark/sucoverrelax/sucoverrelax.ts b/src/benchmark/sucoverrelax/sucoverrelax.ts new file mode 100644 index 000000000..577862548 --- /dev/null +++ b/src/benchmark/sucoverrelax/sucoverrelax.ts @@ -0,0 +1,19 @@ +import { + type WritablePort, + Parameter, + InPort, + OutPort, + State, + Action, + Reactor, + App, + TimeValue, + Origin, + Log + } from "../../core/internal"; + + + + + + diff --git a/src/benchmark/sucoverrelax/test.ts b/src/benchmark/sucoverrelax/test.ts new file mode 100644 index 000000000..b8b17de47 --- /dev/null +++ b/src/benchmark/sucoverrelax/test.ts @@ -0,0 +1,88 @@ +import { App, InPort, OutPort, Reactor } from "../../core/internal"; + +class Master extends Reactor { + outp: OutPort; + + constructor(parent: Reactor, receivers: Receiver[]) { + super(parent, ""); + this.outp = new OutPort(this); + + this.addMutation( + [this.startup], + [this.outp], + function(this, outp) { + let i = 0; + for (const r of receivers) { + this.connect(outp, r.inp); + console.log(`Master: triggering ${i}`) + this.getReactor().writable(outp).set(i); + console.log(`Master: disconnecting ${i}`) + this.disconnect(outp, r.inp); + ++i; + } + } + ); + } +} + +class Receiver extends Reactor { + inp: InPort; + outp: OutPort; + + constructor(parent: Reactor, receiver2: Receiver2) { + super(parent, ""); + this.inp = new InPort(this); + this.outp = new OutPort(this); + + this.addMutation( + [this.inp], + [this.inp, this.outp], + function(this, inp, outp) { + const message = inp.get(); + if (message == null) { + throw Error("Receiver: Message is null."); + } + console.log(`Receiver: message ${message}. Sending.`); + this.connect(outp, receiver2.inp); + this.getReactor().writable(outp).set(message); + this.disconnect(outp, receiver2.inp); + } + ); + } +} + +class Receiver2 extends Reactor { + inp: InPort; + + constructor(parent: Reactor) { + super(parent, ""); + this.inp = new InPort(this); + + this.addReaction( + [this.inp], + [this.inp], + function (this, inp) { + console.log(`Receiver2: received ${inp.get()}`) + } + ); + } +} + +class Apppp extends App { + master: Master; + recvs: Receiver[]; + recv2: Receiver2; + + constructor() { + super(undefined, undefined, false, ()=>(undefined), ()=>(undefined), ""); + this.recv2 = new Receiver2(this); + this.recvs = []; + for (let i = 0; i < 10; ++i) { + this.recvs.push(new Receiver(this, this.recv2)); + } + this.master = new Master(this, this.recvs); + } +} + +const app = new Apppp(); +app._start(); \ No newline at end of file diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 806bf2af7..8fa8844b6 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -153,6 +153,11 @@ export abstract class Reactor extends Component { */ private readonly _keyChain = new Map(); + // This is the keychain for creation, i.e. if Reactor R's mutation created reactor B, + // then R is B's creator, even if they are siblings. R should have access to B, + // at least semantically......? + private readonly _creatorKeyChain = new Map(); + /** * This graph has in it all the dependencies implied by this container's * ports, reactions, and connections. @@ -387,6 +392,9 @@ export abstract class Reactor extends Component { return owner._getKey(component, this._keyChain.get(owner)); } } + return component + .getContainer() + ._getKey(component, this._creatorKeyChain.get(component.getContainer())); } /** @@ -467,6 +475,20 @@ export abstract class Reactor extends Component { public delete(reactor: Reactor): void { reactor._delete(); } + + public addChild( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + return this.reactor._addChild(constructor, ...args); + } + + public addSibling( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + return this.reactor._addSibling(constructor, ...args); + } }; /** @@ -1555,6 +1577,33 @@ export abstract class Reactor extends Component { toString(): string { return this._getFullyQualifiedName(); } + + protected _addChild( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + const newReactor = new constructor(this, ...args); + return newReactor; + } + + protected _addSibling( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + if (this._getContainer() == null) { + throw new Error( + `Reactor ${this} does not have a parent. Sibling is not well-defined.` + ); + } + if (this._getContainer() === this) { + throw new Error( + `Reactor ${this} is self-contained. Adding sibling creates logical issue.` + ); + } + const newReactor = this._getContainer()._addChild(constructor, ...args); + this._creatorKeyChain.set(newReactor, newReactor._key); + return newReactor; + } } /* @@ -1795,6 +1844,16 @@ export interface MutationSandbox extends ReactionSandbox { getReactor: () => Reactor; // Container + addChild: ( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ) => R; + + addSibling: ( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ) => R; + // FIXME: // forkJoin(constructor: new () => Reactor, ): void; }