Skip to content

Commit f7fd8c2

Browse files
authored
Refactor wire protocol(eth68): separate request methods and response handlers (#3145)
Separation of requests methods and response handlers into their own module will help us avoid cyclic import or employ convolute abstractions for user handler. So the dependency graph can be arranged like: response_handlers -> user_handlers -> request_methods Now the p2pProtocol macro is not used for eth68 anymore, only for devP2P. Next PR may reintroduce a simpler but more versatile macro, if needed. Or we can use a more sophisticated template, or both. With this refactoring, implementation of transactions broadcast, including blob transactions, will be much easier.
1 parent 5249ca2 commit f7fd8c2

File tree

22 files changed

+892
-162
lines changed

22 files changed

+892
-162
lines changed

execution_chain/common/logging.nim

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
# This file may not be copied, modified, or distributed except according to
88
# those terms.
99

10+
{.used.}
11+
1012
import
1113
std/[typetraits, net],
1214
json_serialization,

execution_chain/core/chain/forked_chain/chain_header_cache.nim

-4
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,6 @@ proc putState(db: CoreDbTxRef; state: FcHdrState) =
143143
db.put(LhcStateKey.toOpenArray, encodePayload(state)).isOkOr:
144144
raiseAssert RaisePfx & "put(state) failed: " & $$error
145145

146-
proc delState(db: CoreDbTxRef) =
147-
discard db.del(LhcStateKey.toOpenArray)
148-
149-
150146
proc putHeader(db: CoreDbTxRef; bn: BlockNumber; data: seq[byte]) =
151147
## Store rlp encoded header
152148
db.put(beaconHeaderKey(bn).toOpenArray, data).isOkOr:

execution_chain/networking/p2p.nim

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import
1313
std/[tables, algorithm, random, typetraits, strutils, net],
1414
chronos, chronos/timer, chronicles,
1515
eth/common/keys,
16+
results,
1617
./[discoveryv4, peer_pool, rlpx, p2p_types]
1718

1819
export
@@ -213,7 +214,7 @@ proc randomPeerWith*(node: EthereumNode, Protocol: type): Peer =
213214
if candidates.len > 0:
214215
return candidates.rand()
215216

216-
proc getPeer*(node: EthereumNode, peerId: NodeId, Protocol: type): Option[Peer] =
217+
proc getPeer*(node: EthereumNode, peerId: NodeId, Protocol: type): Opt[Peer] =
217218
for peer in node.peers(Protocol):
218219
if peer.remote.id == peerId:
219220
return some(peer)

execution_chain/networking/p2p_protocol_dsl.nim

+10-5
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
{.push raises: [].}
2323

2424
import
25-
std/[options, sequtils, macrocache],
25+
std/[sequtils, macrocache],
2626
results,
2727
stew/shims/macros, chronos
2828

@@ -158,7 +158,7 @@ let
158158
resultIdent* {.compileTime.} = ident "result"
159159

160160
# Locally used symbols:
161-
Option {.compileTime.} = ident "Option"
161+
OptionT {.compileTime.} = ident "Opt"
162162
Future {.compileTime.} = ident "Future"
163163
Void {.compileTime.} = ident "void"
164164
writeField {.compileTime.} = ident "writeField"
@@ -177,6 +177,11 @@ proc initFuture*[T](loc: var Future[T]) =
177177
proc isRlpx*(p: P2PProtocol): bool =
178178
p.rlpxName.len > 0
179179

180+
func getProtocolIndex*(): int {.compileTime.} =
181+
let protocolIndex = protocolCounter.value
182+
protocolCounter.inc
183+
protocolIndex
184+
180185
template applyDecorator(p: NimNode, decorator: NimNode) =
181186
if decorator.kind != nnkNilLit:
182187
p.pragma.insert(0, decorator)
@@ -207,13 +212,13 @@ when tracingEnabled:
207212
`protocolInfo`, `msgName`,
208213
getOutput(`tracerStream`, string))
209214

210-
proc createPeerState[Peer, ProtocolState](peer: Peer): RootRef =
215+
proc createPeerState*[Peer, ProtocolState](peer: Peer): RootRef =
211216
var res = new ProtocolState
212217
mixin initProtocolState
213218
initProtocolState(res, peer)
214219
return cast[RootRef](res)
215220

216-
proc createNetworkState[NetworkNode, NetworkState](network: NetworkNode): RootRef {.gcsafe.} =
221+
proc createNetworkState*[NetworkNode, NetworkState](network: NetworkNode): RootRef {.gcsafe.} =
217222
var res = new NetworkState
218223
mixin initProtocolState
219224
initProtocolState(res, network)
@@ -561,7 +566,7 @@ proc requestResultType*(msg: Message): NimNode =
561566
else:
562567
return newTree(nnkBracketExpr, wrapperType, responseRec)
563568
else:
564-
return newTree(nnkBracketExpr, Option, responseRec)
569+
return newTree(nnkBracketExpr, OptionT, responseRec)
565570

566571
proc createSendProc*(msg: Message,
567572
procType = nnkProcDef,

execution_chain/networking/rlpx.nim

+230-9
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
{.push raises: [].}
2626

2727
import
28-
std/[algorithm, deques, options, os, sequtils, strutils, typetraits],
28+
std/[algorithm, deques, os, sequtils, strutils, typetraits],
2929
stew/byteutils,
3030
stew/shims/macros,
3131
chronicles,
@@ -145,12 +145,12 @@ chronicles.formatIt(Opt[uint64]):
145145
include p2p_backends_helpers
146146

147147
proc requestResolver[MsgType](msg: pointer, future: FutureBase) {.gcsafe.} =
148-
var f = Future[Option[MsgType]](future)
148+
var f = Future[Opt[MsgType]](future)
149149
if not f.finished:
150150
if msg != nil:
151-
f.complete some(cast[ptr MsgType](msg)[])
151+
f.complete Opt.some(cast[ptr MsgType](msg)[])
152152
else:
153-
f.complete none(MsgType)
153+
f.complete Opt.none(MsgType)
154154

155155
proc linkSendFailureToReqFuture[S, R](sendFut: Future[S], resFut: Future[R]) =
156156
sendFut.addCallback do(arg: pointer):
@@ -186,7 +186,7 @@ proc disconnectAndRaise(
186186
await peer.disconnect(reason)
187187
raisePeerDisconnected(msg, reason)
188188

189-
proc handshakeImpl[T](
189+
proc handshakeImpl*[T](
190190
peer: Peer,
191191
sendFut: Future[void],
192192
responseFut: auto, # Future[T].Raising([CancelledError, EthP2PError]),
@@ -276,7 +276,7 @@ proc getMsgName*(peer: Peer, msgId: uint64): string =
276276
# Protocol info objects
277277
#
278278

279-
proc initProtocol(
279+
proc initProtocol*(
280280
name: string,
281281
version: uint64,
282282
peerInit: PeerStateInitializer,
@@ -289,7 +289,7 @@ proc initProtocol(
289289
networkStateInitializer: networkInit,
290290
)
291291

292-
proc setEventHandlers(
292+
proc setEventHandlers*(
293293
p: ProtocolInfo,
294294
onPeerConnected: OnPeerConnectedHandler,
295295
onPeerDisconnected: OnPeerDisconnectedHandler,
@@ -344,7 +344,7 @@ proc registerMsg(
344344
# Message composition and encryption
345345
#
346346

347-
proc perPeerMsgIdImpl(peer: Peer, proto: ProtocolInfo, msgId: uint64): uint64 =
347+
proc perPeerMsgIdImpl*(peer: Peer, proto: ProtocolInfo, msgId: uint64): uint64 =
348348
result = msgId
349349
if not peer.dispatcher.isNil:
350350
result += peer.dispatcher.protocolOffsets[proto.index].value
@@ -634,7 +634,7 @@ proc checkedRlpRead(
634634

635635
raise e
636636

637-
proc nextMsg*(
637+
proc nextMsg(
638638
peer: Peer, MsgType: type
639639
): Future[MsgType] {.async: (raises: [CancelledError, EthP2PError], raw: true).} =
640640
## This procs awaits a specific RLPx message.
@@ -1402,3 +1402,224 @@ proc rlpxAccept*(
14021402
rlpx_accept_success.inc()
14031403

14041404
return peer
1405+
1406+
#------------------------------------------------------------------------------
1407+
# Mini Protocol DSL
1408+
#------------------------------------------------------------------------------
1409+
1410+
type
1411+
Responder* = object
1412+
peer*: Peer
1413+
reqId*: uint64
1414+
1415+
proc `$`*(r: Responder): string =
1416+
$r.peer & ": " & $r.reqId
1417+
1418+
template msgIdImpl(PROTO: type; peer: Peer, methId: uint64): uint64 =
1419+
mixin protocolInfo
1420+
perPeerMsgIdImpl(peer, PROTO.protocolInfo, methId)
1421+
1422+
macro countArgs(args: untyped): untyped =
1423+
var count = 0
1424+
for arg in args:
1425+
let arg = if arg.kind == nnkHiddenStdConv: arg[1]
1426+
else: arg
1427+
if arg.kind == nnkArgList:
1428+
for _ in arg:
1429+
inc count
1430+
else:
1431+
inc count
1432+
result = newLit(count)
1433+
1434+
macro appendArgs(writer: untyped, args: untyped): untyped =
1435+
result = newStmtList()
1436+
for arg in args:
1437+
let arg = if arg.kind == nnkHiddenStdConv: arg[1]
1438+
else: arg
1439+
if arg.kind == nnkArgList:
1440+
for subarg in arg:
1441+
result.add quote do:
1442+
append(`writer`, `subarg`)
1443+
else:
1444+
result.add quote do:
1445+
append(`writer`, `arg`)
1446+
1447+
template rlpxSendMessage*(PROTO: type, peer: Peer, msgId: static[uint64], params: varargs[untyped]): auto =
1448+
let perPeerMsgId = msgIdImpl(PROTO, peer, msgId)
1449+
var writer = initRlpWriter()
1450+
const paramsLen = countArgs([params])
1451+
when paramsLen > 1:
1452+
startList(writer, paramsLen)
1453+
appendArgs(writer, [params])
1454+
let msgBytes = finish(writer)
1455+
sendMsg(peer, perPeerMsgId, msgBytes)
1456+
1457+
template rlpxSendMessage*(PROTO: type, responder: Responder, msgId: static[uint64], params: varargs[untyped]): auto =
1458+
let perPeerMsgId = msgIdImpl(PROTO, responder.peer, msgId)
1459+
var writer = initRlpWriter()
1460+
const paramsLen = countArgs([params])
1461+
when paramsLen > 0:
1462+
startList(writer, paramsLen + 1)
1463+
append(writer, responder.reqId)
1464+
appendArgs(writer, [params])
1465+
let msgBytes = finish(writer)
1466+
sendMsg(responder.peer, perPeerMsgId, msgBytes)
1467+
1468+
template rlpxSendRequest*(PROTO: type, peer: Peer, msgId: static[uint64], params: varargs[untyped]) =
1469+
let perPeerMsgId = msgIdImpl(PROTO, peer, msgId)
1470+
var writer = initRlpWriter()
1471+
const paramsLen = countArgs([params])
1472+
if paramsLen > 0:
1473+
startList(writer, paramsLen + 1)
1474+
initFuture result
1475+
let reqId = registerRequest(peer, timeout, result, perPeerMsgId + 1)
1476+
append(writer, reqId)
1477+
appendArgs(writer, [params])
1478+
let msgBytes = finish(writer)
1479+
linkSendFailureToReqFuture(sendMsg(peer, perPeerMsgId, msgBytes), result)
1480+
1481+
macro checkedRlpFields(peer; rlp; packet; fields): untyped =
1482+
result = newStmtList()
1483+
for field in fields:
1484+
result.add quote do:
1485+
`packet`.`field` = checkedRlpRead(`peer`, `rlp`, typeof(`packet`.`field`))
1486+
1487+
macro countFields(fields): untyped =
1488+
var count = 0
1489+
for _ in fields:
1490+
inc count
1491+
result = newLit(count)
1492+
1493+
template wrapRlpxWithPacketException(MSGTYPE: type, peer: Peer, body): untyped =
1494+
const
1495+
msgName = astToStr(MSGTYPE)
1496+
1497+
try:
1498+
body
1499+
except rlp.RlpError as exc:
1500+
discard
1501+
warn "TODO: RLP decoding failed for incoming message",
1502+
msg = msgName, remote = peer.remote,
1503+
clientId = peer.clientId, err = exc.msg
1504+
await peer.disconnectAndRaise(BreachOfProtocol,
1505+
"Invalid RLP in parameter list for " & msgName)
1506+
1507+
template rlpxWithPacketHandler*(PROTO: distinct type;
1508+
MSGTYPE: distinct type;
1509+
peer: Peer;
1510+
data: Rlp,
1511+
fields: untyped;
1512+
body): untyped =
1513+
const
1514+
numFields = countFields(fields)
1515+
1516+
wrapRlpxWithPacketException(MSGTYPE, peer):
1517+
var
1518+
rlp = data
1519+
packet {.inject.}: MSGTYPE
1520+
1521+
when numFields > 1:
1522+
tryEnterList(rlp)
1523+
1524+
checkedRlpFields(peer, rlp, packet, fields)
1525+
body
1526+
1527+
template rlpxWithPacketResponder*(PROTO: distinct type;
1528+
MSGTYPE: distinct type;
1529+
peer: Peer;
1530+
data: Rlp,
1531+
body): untyped =
1532+
wrapRlpxWithPacketException(MSGTYPE, peer):
1533+
var rlp = data
1534+
tryEnterList(rlp)
1535+
let reqId = read(rlp, uint64)
1536+
var
1537+
response {.inject.} = initResponder(peer, reqId)
1538+
packet {.inject.} = checkedRlpRead(peer, rlp, MSGTYPE)
1539+
body
1540+
1541+
template rlpxWithFutureHandler*(PROTO: distinct type;
1542+
MSGTYPE: distinct type;
1543+
msgId: static[uint64];
1544+
peer: Peer;
1545+
data: Rlp,
1546+
fields: untyped): untyped =
1547+
wrapRlpxWithPacketException(MSGTYPE, peer):
1548+
var
1549+
rlp = data
1550+
packet: MSGTYPE
1551+
1552+
tryEnterList(rlp)
1553+
let
1554+
reqId = read(rlp, uint64)
1555+
perPeerMsgId = msgIdImpl(PROTO, peer, msgId)
1556+
checkedRlpFields(peer, rlp, packet, fields)
1557+
resolveResponseFuture(peer,
1558+
perPeerMsgId, addr(packet), reqId)
1559+
1560+
1561+
proc nextMsg*(PROTO: distinct type,
1562+
peer: Peer,
1563+
MsgType: distinct type,
1564+
msgId: static[uint64]): Future[MsgType]
1565+
{.async: (raises: [CancelledError, EthP2PError], raw: true).} =
1566+
## This procs awaits a specific RLPx message.
1567+
## Any messages received while waiting will be dispatched to their
1568+
## respective handlers. The designated message handler will also run
1569+
## to completion before the future returned by `nextMsg` is resolved.
1570+
let wantedId = msgIdImpl(PROTO, peer, msgId)
1571+
let f = peer.awaitedMessages[wantedId]
1572+
if not f.isNil:
1573+
return Future[MsgType].Raising([CancelledError, EthP2PError])(f)
1574+
1575+
initFuture result
1576+
peer.awaitedMessages[wantedId] = result
1577+
1578+
template registerMsg*(protocol: ProtocolInfo,
1579+
msgId: static[uint64],
1580+
msgName: static[string],
1581+
msgThunk: untyped,
1582+
MsgType: type) =
1583+
registerMsg(protocol,
1584+
msgId,
1585+
msgName,
1586+
msgThunk,
1587+
messagePrinter[MsgType],
1588+
requestResolver[MsgType],
1589+
nextMsgResolver[MsgType],
1590+
failResolver[MsgType])
1591+
1592+
func initResponder*(peer: Peer, reqId: uint64): Responder =
1593+
Responder(peer: peer, reqId: reqId)
1594+
1595+
template state*(response: Responder, PROTO: type): auto =
1596+
state(response.peer, PROTO)
1597+
1598+
template networkState*(response: Responder, PROTO: type): auto =
1599+
networkState(response.peer, PROTO)
1600+
1601+
template defineProtocol*(PROTO: untyped,
1602+
version: static[int],
1603+
rlpxName: static[string],
1604+
peerState: distinct type,
1605+
networkState: distinct type) =
1606+
type
1607+
PROTO* = object
1608+
1609+
const
1610+
PROTOIndex = getProtocolIndex()
1611+
1612+
template protocolInfo*(_: type PROTO): auto =
1613+
getProtocol(PROTOIndex)
1614+
1615+
template State*(_: type PROTO): type =
1616+
peerState
1617+
1618+
template NetworkState*(_: type PROTO): type =
1619+
networkState
1620+
1621+
func initProtocol*(_: type PROTO): auto =
1622+
initProtocol(rlpxName,
1623+
version,
1624+
createPeerState[Peer, peerState],
1625+
createNetworkState[EthereumNode, networkState])

execution_chain/nimbus_execution_client.nim

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import
1111
../execution_chain/compile_info
1212

1313
import
14-
std/[os, osproc, strutils, net],
14+
std/[os, osproc, strutils, net, options],
1515
chronicles,
1616
eth/net/nat,
1717
metrics,

0 commit comments

Comments
 (0)