-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
WIP - rebased to mutation/create-reactor-api
- Loading branch information
Showing
6 changed files
with
572 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
/* eslint-disable @typescript-eslint/prefer-readonly */ | ||
// The SOR (Re)actor | ||
import { InPort, Reactor, State } from "../../core/internal"; | ||
import { Message } from "./sorutils"; | ||
|
||
export class SORActor extends Reactor { | ||
constructor( | ||
parent: Reactor, | ||
pos: number, | ||
_value: number, | ||
colour: number, | ||
nx: number, | ||
ny: number, | ||
omega: number, | ||
sorSource: Reactor, | ||
peer: boolean | ||
) { | ||
super(parent, "SORActor"); | ||
|
||
const x = Math.floor(pos / ny); | ||
const y = pos % ny; | ||
|
||
const omegaOverFour = 0.25 * omega; | ||
const oneMinusOmega = 1.0 - omega; | ||
|
||
this.portFromRunner = new InPort<Message>(this); | ||
|
||
const neighbours = (() => { | ||
const calPos = (x: number, y: number): number => (x * ny + y); | ||
|
||
if (x > 0 && x < nx - 1 && y > 0 && y < ny - 1) { | ||
return [calPos(x, y + 1), | ||
calPos(x + 1, y), | ||
calPos(x, y - 1), | ||
calPos(x - 1, y)]; | ||
} | ||
if ((x === 0 || x === (nx - 1)) && (y === 0 || y === (ny - 1))) { | ||
return [ | ||
(x === 0) ? calPos(x + 1, y) : calPos(x - 1, y), | ||
(y === 0) ? calPos(x, y + 1) : calPos(x, y - 1) | ||
]; | ||
} | ||
if ((x === 0 || x === (nx - 1)) || (y === 0 || y === (ny - 1))) { | ||
if (x === 0 || x === nx - 1) { | ||
return [ | ||
(x === 0) ? calPos(x + 1, y) : calPos(x - 1, y), | ||
calPos(x, y + 1), | ||
calPos(x, y - 1) | ||
]; | ||
} | ||
return [ | ||
(y === 0) ? calPos(x, y + 1) : calPos(x, y - 1), | ||
calPos(x+1, y), | ||
calPos(x-1, y) | ||
]; | ||
} | ||
return []; | ||
})(); | ||
} | ||
|
||
private iter = new State(0); | ||
private maxIter = new State(0); | ||
private msgRcv = new State(0); | ||
private sorActors = new State<Reactor[]>([]); | ||
|
||
public portFromRunner: InPort<Message>; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
import { InPort, MutationSandbox, OutPort, Reactor, State } from "../../core/internal"; | ||
import { SORActor } from "./actor"; | ||
import { SORRunner } from "./runner"; | ||
import { Message, MessageTypes, SorBorder as SORBorder, SorBorder } from "./sorutils"; | ||
|
||
export class SORPeer extends Reactor { | ||
|
||
s: State<number>; | ||
partStart: State<number>; | ||
matrixPart: State<number[][]>; | ||
border: State<SORBorder>; | ||
sorSource: State<SORRunner> | ||
|
||
|
||
sorActors: State<Reactor[]>; | ||
public portFromSORRunner: InPort<Message>; | ||
|
||
constructor( | ||
parent: Reactor, | ||
s: number, | ||
partStart: number, | ||
matrixPart: number[][], | ||
border: SORBorder, | ||
sorSource: SORRunner | ||
) { | ||
super(parent, "SORPeer"); | ||
this.sorActors = new State([]); | ||
this.s = new State(s); | ||
this.partStart = new State(partStart); | ||
this.matrixPart = new State(matrixPart); | ||
this.border = new State(border); | ||
this.sorSource = new State(sorSour ce); | ||
|
||
this.portFromSORRunner = new InPort(this); | ||
} | ||
|
||
static boot(this: MutationSandbox, args: BootProcessingArgs) { | ||
const myBorder: SORActor[] = []; | ||
const {_s, border, sorActors, partStart} = args; | ||
const s = _s.get(); | ||
const partStartVal = partStart.get(); | ||
|
||
const sorActorsValue = sorActors.get(); | ||
const borderValue = border.get(); | ||
for (let i = 0; i < s; ++i) { | ||
sorActorsValue[i * (s - partStartVal + 1)] = borderValue.borderActors[i]; | ||
} | ||
sorActors.set(sorActorsValue); | ||
|
||
for () | ||
} | ||
} | ||
|
||
interface BootProcessingArgs { | ||
type: MessageTypes.sorBootMessage, | ||
_s: State<number>, | ||
border: State<SORBorder>, | ||
sorActors: State<SORActor[]>, | ||
partStart: State<number>, | ||
} | ||
interface ResultProcessingArgs { | ||
type: MessageTypes.sorResultMessage, | ||
expectingBoot: State<boolean>, | ||
totalMsgRcv: State<number>, | ||
returned: State<number>, | ||
gTotal: State<number>, | ||
s: State<number>, | ||
part: State<number> | ||
} | ||
|
||
type ProcessingArgs = BootProcessingArgs | ResultProcessingArgs; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
import { InMultiPort, InPort, MutationSandbox, OutMultiPort, OutPort, Parameter, Reactor, State, Variable, } from "../../core/internal"; | ||
import { SORActor } from "./actor"; | ||
import { SORPeer } from "./peer"; | ||
import { Message, MessageTypes, SORBootMessage, SORResultMessage, SorBorder, jacobi, omega } from "./sorutils"; | ||
|
||
export class SORRunner extends Reactor { | ||
protected s: State<number>; | ||
protected part: State<number>; | ||
|
||
protected sorActors: State<SORActor[]>; | ||
protected sorPeer: State<SORPeer | undefined>; | ||
|
||
protected portToSORActors: OutPort<Message>; | ||
protected portToSORPeer: OutPort<Message>; | ||
// Unsure if this would work, let's just try...... | ||
protected portFromSORActor: InPort<Message>; | ||
protected portFromSORPeer: InPort<Message>; | ||
|
||
protected gTotal = new State(0.0); | ||
protected returned = new State(0); | ||
protected totalMsgRcv = new State(0); | ||
protected expectingBoot = new State(true); | ||
|
||
// In Savina, randoms is accessed directly from SucOverRelaxConfig. | ||
// Here we pass it in to the closure. | ||
constructor(parent: Reactor, size: number, _randoms: number[][]) { | ||
super(parent, "SORRunner"); | ||
// These are in SorRunner; | ||
this.s = new State(size); | ||
// In the scala implementation a simple /2 was used. | ||
// In JS we might need to enforce some sort of guarantee as it was used to calculate position | ||
this.part = new State(Math.floor(size / 2)); | ||
/** These are from Savina. They should be rather irrelevant, actually. */ | ||
this.sorActors = new State([]); | ||
this.sorPeer = new State(undefined); | ||
|
||
/** These are the actual messaging passing mechanism that are synonomous to that of Savina. */ | ||
// This creates a bunch of ports. | ||
this.portToSORActors = new OutPort(this); | ||
this.portToSORPeer = new OutPort(this); | ||
this.portFromSORActor = new InPort(this); | ||
this.portFromSORPeer = new InPort(this); | ||
|
||
this.addMutation( | ||
[this.startup], | ||
[this.sorActors, this.sorPeer, this.portToSORActors, this.portToSORPeer], | ||
function (this, sorActors, sorPeer, portToSORActors, portToSORPeer) { | ||
// TODO: Add actual stuff | ||
; | ||
} | ||
); | ||
} | ||
|
||
// This is to be used WITHIN mutation react functions. | ||
static process(this: MutationSandbox, message: Message, args: ProcessingArgs): void { | ||
switch (message.messageType) { | ||
case MessageTypes.sorBootMessage: { | ||
if (args.type !== MessageTypes.sorBootMessage) { | ||
throw new Error("Wrong type of arguments passed."); | ||
} | ||
// expectingBoot is args[0] | ||
args.expectingBoot.set(false); | ||
SORRunner.boot.apply(this, [args]); | ||
break; | ||
} | ||
case MessageTypes.sorResultMessage: { | ||
if (args.type !== MessageTypes.sorResultMessage) { | ||
throw new Error("Wrong type of arguments passed."); | ||
} | ||
|
||
const {mv, msgRcv} = message; | ||
const {expectingBoot, totalMsgRcv, returned, gTotal, s, part} = args; | ||
|
||
if (expectingBoot.get()) { | ||
throw new Error("SORRunner not booted yet!"); | ||
} | ||
|
||
totalMsgRcv.set(totalMsgRcv.get() + msgRcv); | ||
returned.set(returned.get() + 1); | ||
gTotal.set(gTotal.get() + mv); | ||
|
||
if (returned.get() === (s.get() * part.get()) + 1) { | ||
// TODO: validate | ||
// TODO: exit | ||
; | ||
} | ||
break; | ||
} | ||
case MessageTypes.sorBorderMessage: { | ||
if (args.type !== MessageTypes.sorBorderMessage) { | ||
throw new Error("Wrong type of arguments passed."); | ||
} | ||
|
||
const {mBorder} = message; | ||
const {expectingBoot, s, part, sorActors, portToSORActors} = args; | ||
|
||
if (expectingBoot.get()) { | ||
throw new Error("SORRunner not booted yet!"); | ||
} | ||
const sorActorsValue = sorActors.get(); | ||
for (let i = 0; i <= s.get(); ++i) { | ||
sorActorsValue[(i + 1) * (part.get() + 1) - 1] = mBorder.borderActors[i]; | ||
} | ||
sorActors.set(sorActorsValue); | ||
for (let i = 0; i <= s.get(); ++i) { | ||
for (let j = 0; j <= part.get(); ++j) { | ||
const pos = (i * (part.get() + 1)) + j; | ||
// Ibidem, connect then disconnect to simulate | ||
// "fire and forget" in scala. | ||
this.connect(portToSORActors, sorActorsValue[pos].portFromRunner); | ||
this.getReactor().writable(portToSORActors).set( | ||
{ | ||
messageType: MessageTypes.sorStartMessage, | ||
mi: jacobi, | ||
mActors: sorActorsValue | ||
} | ||
); | ||
this.disconnect(portToSORActors, sorActorsValue[pos].portFromRunner); | ||
} | ||
} | ||
break; | ||
} | ||
default: { | ||
throw new Error("Received wrong message from port"); | ||
} | ||
} | ||
} | ||
|
||
// SorRunner::boot() | ||
static boot(this: MutationSandbox, | ||
args: BootProcessingArgs | ||
): void { | ||
const {_randoms, _s, _part, sorActors, sorPeer, portToSORPeer} = args; | ||
|
||
const myBorder: SORActor[] = []; | ||
const randoms = _randoms; | ||
const s = _s.get(); | ||
const part = _part.get(); | ||
// In scala, (i <- 0 until s) is a loop excluding s. | ||
const sorActorsValue = sorActors.get(); | ||
for (let i = 0; i < s; ++i) { | ||
let c = i % 2; | ||
for (let j = 0; j < part; ++j) { | ||
const pos = i * (part + 1) + j; | ||
c = 1 - c; | ||
// We modify them in bulk, then update the state. | ||
// Unlike in Scala we do not need to initialise the array here, JS supports sparse array. | ||
// I have absolutely no idea why these parametres are called as such...... | ||
sorActorsValue[pos] = this.getReactor()._uncheckedAddSibling( | ||
SORActor, | ||
pos, randoms[i][j], c, s, part + 1, omega, this.getReactor(), false | ||
); | ||
|
||
if (j === (part - 1)) { | ||
myBorder[i] = sorActorsValue[pos]; | ||
} | ||
|
||
} | ||
} | ||
sorActors.set(sorActorsValue); | ||
|
||
const partialMatrix: number[][] = []; | ||
for (let i = 0; i < s; ++i) { | ||
for (let j = 0; j < s - part; ++j) { | ||
partialMatrix[i][j] = randoms[i][j + part]; | ||
} | ||
} | ||
|
||
const sorPeerValue = this.getReactor()._uncheckedAddSibling( | ||
SORPeer, | ||
s, part, partialMatrix, new SorBorder(myBorder), | ||
// A dirty hack. Maybe this will be removed as ports get added. | ||
this.getReactor() as SORRunner | ||
); | ||
sorPeer.set(sorPeerValue); | ||
// Pass message. | ||
// This is similar to Scala's !; but it looks pretty...... interesting in LF. | ||
// If node is concurrent or parallel, this might be a problem, so direct copy-pastaing to C++ runtime might not work. | ||
this.connect(portToSORPeer, sorPeerValue.portFromSORRunner); | ||
this.getReactor().writable(portToSORPeer).set({messageType: MessageTypes.sorBootMessage}); | ||
// Disconnect immediately. | ||
this.disconnect(portToSORPeer, sorPeerValue.portFromSORRunner); | ||
} | ||
} | ||
|
||
|
||
interface BootProcessingArgs { | ||
type: MessageTypes.sorBootMessage, | ||
expectingBoot: State<boolean>, | ||
_randoms: number[][], | ||
_s: State<number>, | ||
_part: State<number>, | ||
sorActors: State<SORActor[]>, | ||
sorPeer: State<SORPeer>, | ||
portToSORPeer: OutPort<Message> | ||
} | ||
|
||
interface ResultProcessingArgs { | ||
type: MessageTypes.sorResultMessage, | ||
expectingBoot: State<boolean>, | ||
totalMsgRcv: State<number>, | ||
returned: State<number>, | ||
gTotal: State<number>, | ||
s: State<number>, | ||
part: State<number> | ||
} | ||
|
||
interface BorderProcessingArgs { | ||
type: MessageTypes.sorBorderMessage, | ||
expectingBoot: State<boolean>, | ||
s: State<number>, | ||
part: State<number>, | ||
sorActors: State<SORActor[]>, | ||
portToSORActors: OutPort<Message> | ||
} | ||
|
||
type ProcessingArgs = BootProcessingArgs | ResultProcessingArgs | BorderProcessingArgs; |
Oops, something went wrong.