Skip to content

Commit

Permalink
Fixed msg state of executor. The msg object was stored on a global va…
Browse files Browse the repository at this point in the history
…riable in the EthersActionExecutor, which will not wwork because a node gets only one executor instance
  • Loading branch information
Andreas Hauschild committed May 27, 2022
1 parent 4146cfa commit 581cc4a
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 83 deletions.
71 changes: 31 additions & 40 deletions lib/src/EthersActionExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,14 @@ export class EthersActionExecutor {
private subjects: { [key: number]: Subject<ModifyAction> } = {}

private provider: ethers.providers.JsonRpcProvider;
private msg: any;


constructor(private credentials: BaseCredentials, private rpc: string, private node: any, private output?: OutputMapping) {
this.provider = new ethers.providers.JsonRpcProvider(rpc);
}

async executeRead(a: ReadAction, msg: any): Promise<any> {
this.node.status({});
this.setMsg(msg);
if (a.type === ActionType.READ_CONTRACT) {
try {
const action = a as ReadContractAction
Expand All @@ -126,9 +125,9 @@ export class EthersActionExecutor {

const result = await contract[method](...action.params)
this.node.status({fill: "green", shape: "ring", text: `success`});
this.setOutput(result);
this.setOutput(result,msg);
} catch (error) {
this.node.error(error, this.msg)
this.node.error(error, msg)
this.node.status({fill: "red", shape: "ring", text: `failed`});
}
} else if (a.type === ActionType.READ_ACCOUNT) {
Expand All @@ -143,15 +142,15 @@ export class EthersActionExecutor {
shape: "ring",
text: `Balance: ${ethers.utils.formatEther(result.toString())}`
});
this.setOutput(result);
this.setOutput(result,msg);
} else if (action.method === 'transactionCount') {
this.node.status({fill: "yellow", shape: "ring", text: "reading"});
const result = await this.provider.getTransactionCount(action.accountAddress);
this.node.status({fill: "green", shape: "ring", text: `Tx Count: ${result}`});
this.setOutput(result);
this.setOutput(result,msg);
}
} catch (error) {
this.node.error(error, this.msg)
this.node.error(error, msg)
this.node.status({fill: "red", shape: "ring", text: `failed`});
}
} else if (a.type === ActionType.READ_CONTRACT_EVENT) {
Expand Down Expand Up @@ -204,14 +203,14 @@ export class EthersActionExecutor {
this.node.status({fill: "yellow", shape: "ring", text: status});
console.log(status)
if (events.length > 0) {
this.setOutput(events);
this.setOutput(events,msg);
}
next += range
}

this.node.status({fill: "green", shape: "ring", text: status});
} catch (error) {
this.node.error(error, this.msg)
this.node.error(error, msg)
this.node.status({fill: "red", shape: "ring", text: `failed`});
}

Expand All @@ -221,7 +220,6 @@ export class EthersActionExecutor {

execute(action: ModifyAction, msg: any) {
this.node.status({});
this.setMsg(msg);
if (this.credentials.type === CredentialType.MNEMONIC && action.hierarchicalDeterministicWalletIndex == null) {
this.node.error(`Node use credentials of type '${CredentialType.MNEMONIC}', but the action does not provide a 'hierarchicalDeterministicWalletIndex'. Action will not be executed!`)
return;
Expand All @@ -235,9 +233,9 @@ export class EthersActionExecutor {
const wallet = ethers.Wallet.fromMnemonic((this.credentials as MnemonicCredentials).mnemonic, path).connect(this.provider);
this.wallets[action.hierarchicalDeterministicWalletIndex] = wallet;
this.subjects[action.hierarchicalDeterministicWalletIndex] = new Subject<ModifyAction>();
this.subscribeTransferHandler(this.subjects[action.hierarchicalDeterministicWalletIndex]);
this.subscribeDeployContractHandler(this.subjects[action.hierarchicalDeterministicWalletIndex])
this.subscribeWriteContractHandler(this.subjects[action.hierarchicalDeterministicWalletIndex])
this.subscribeTransferHandler(this.subjects[action.hierarchicalDeterministicWalletIndex],msg);
this.subscribeDeployContractHandler(this.subjects[action.hierarchicalDeterministicWalletIndex],msg)
this.subscribeWriteContractHandler(this.subjects[action.hierarchicalDeterministicWalletIndex],msg)
}
this.subjects[action.hierarchicalDeterministicWalletIndex].next(action);

Expand All @@ -251,9 +249,9 @@ export class EthersActionExecutor {
const wallet = new ethers.Wallet((this.credentials as PrivateKeyCredentials).privateKey).connect(this.provider);
this.wallets[0] = wallet;
this.subjects[0] = new Subject<ModifyAction>();
this.subscribeTransferHandler(this.subjects[0]);
this.subscribeDeployContractHandler(this.subjects[0]);
this.subscribeWriteContractHandler(this.subjects[0]);
this.subscribeTransferHandler(this.subjects[0],msg);
this.subscribeDeployContractHandler(this.subjects[0],msg);
this.subscribeWriteContractHandler(this.subjects[0],msg);
}
this.subjects[0].next(action);
}
Expand All @@ -262,7 +260,7 @@ export class EthersActionExecutor {

}

private subscribeDeployContractHandler(subject: Subject<ModifyAction>) {
private subscribeDeployContractHandler(subject: Subject<ModifyAction>, msg:any) {
subject.pipe(filter(a => a != null && a.type === ActionType.DEPLOY_CONTRACT),
concatMap(async a => {
try {
Expand All @@ -283,22 +281,22 @@ export class EthersActionExecutor {
this.node.status({fill: "green", shape: "ring", text: `deployed ${contract.address}`});
return {txReceipt, action, contract} as any
}).catch(e => {
this.node.error(e, this.msg)
this.node.error(e, msg)
this.node.status({fill: "red", shape: "ring", text: `failed`});
});
} catch (e) {
this.node.error(e, this.msg);
this.node.error(e, msg);
this.node.status({fill: "red", shape: "ring", text: `failed`});
return undefined;
}
})
).subscribe(result => {
this.node.log(`Deployed contract to '${result?.contract?.address}'`)
this.setOutput(result?.txReceipt);
this.setOutput(result?.txReceipt,msg);
})
}

private subscribeWriteContractHandler(subject: Subject<ModifyAction>) {
private subscribeWriteContractHandler(subject: Subject<ModifyAction>, msg:any) {

subject.pipe(filter(a => a != null && a.type === ActionType.WRITE_CONTRACT),
concatMap(async a => {
Expand All @@ -325,24 +323,24 @@ export class EthersActionExecutor {
this.node.status({fill: "green", shape: "ring", text: `success`});
return {txReceipt, action, contract}
}).catch(e => {
this.node.error(e, this.msg)
this.node.error(e, msg)
this.node.status({fill: "red", shape: "ring", text: `failed`});
});
} catch (e) {
this.node.error(e, this.msg);
this.node.error(e, msg);
this.node.status({fill: "red", shape: "ring", text: `failed`});
return undefined;
}
})
).subscribe(result => {
if (result) {
this.node.log(`executed method '${result?.action.method}' with params '${result?.action.params}' on contract '${result?.contract.address} with tx: ${result?.txReceipt?.transactionHash}'`)
this.setOutput(result?.txReceipt);
this.setOutput(result?.txReceipt,msg);
}
})
}

private subscribeTransferHandler(subject: Subject<ModifyAction>) {
private subscribeTransferHandler(subject: Subject<ModifyAction>,msg:any) {
subject.pipe(filter(a => a != null && a.type === ActionType.TRANSFER),
concatMap(async a => {
try {
Expand All @@ -361,11 +359,11 @@ export class EthersActionExecutor {
return this.provider.waitForTransaction(tx.hash).then(txReceipt => {
return {txReceipt, action}
}).catch(e => {
this.node.error(e, this.msg)
this.node.error(e, msg)
this.node.status({fill: "red", shape: "ring", text: `failed`});
});
} catch (e) {
this.node.error(e, this.msg);
this.node.error(e, msg);
this.node.status({fill: "red", shape: "ring", text: `failed`});
return undefined;
}
Expand All @@ -375,40 +373,33 @@ export class EthersActionExecutor {
const log = `Transferred '${result?.action?.amount}' from: '${result?.txReceipt?.from}' to '${result?.txReceipt?.to}'`;
this.node.log(log)
this.node.status({fill: "green", shape: "ring", text: log});
this.setOutput(result?.txReceipt);
this.setOutput(result?.txReceipt,msg);
}
});
}

private setOutput(result: any): void {
private setOutput(result: any,msg:any): void {
if (this.output) {
switch (this.output.context) {
case "msg": {
this.msg[this.output!?.key] = result;
this.node.send(this.msg);
msg[this.output!?.key] = result;
this.node.send(msg);
break;
}
case "flow": {
this.node.context().flow.set(this.output.key, result);
this.node.send(this.msg);
this.node.send(msg);
break;
}
case "global": {
this.node.context().global.set(this.output.key, result);
this.node.send(this.msg);
this.node.send(msg);
break;
}
}
}
}

private setMsg(msg: any): void {
this.msg = msg;
if (!this.msg) {
throw Error("can not execute Read. Message need to be provided")
}
}

private fn(num: string | number) {
return num.toString().replace(/(\d)(?=(\d{3})+(?!\d))/g, '$1.')
}
Expand Down
7 changes: 4 additions & 3 deletions nodes/ethers-read-account.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ module.exports = function (RED) {
context: this.outputType,
key: this.output
});
node.on('input', function (msg) {
node.on('input', async function (msg) {
const accountAddress = RED.util.evaluateNodeProperty(config.accountAddress, config.accountAddressType || "str", node, msg)
const method = config.method
const action = {
type:ActionType.READ_ACCOUNT,
type: ActionType.READ_ACCOUNT,
accountAddress,
method
}
ethersActionExecutor.executeRead(action,msg);

await ethersActionExecutor.executeRead(action, msg);
});
}

Expand Down
Loading

0 comments on commit 581cc4a

Please sign in to comment.