Skip to content

Commit

Permalink
mempool: use dict for ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
afalaleev committed Nov 27, 2024
1 parent 9701a65 commit 6281d32
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 34 deletions.
10 changes: 5 additions & 5 deletions proxy/base/mp_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,12 @@ class MpSlotGasPriceModel(BaseModel):


class MpRequest(BaseModel):
ctx_id: str
ctx_id: dict
chain_id: int


class MpTxCntRequest(BaseModel):
ctx_id: str
ctx_id: dict
sender: NeonAccountField


Expand All @@ -213,7 +213,7 @@ class MpTxCntResp(BaseModel):


class MpTxRequest(BaseModel):
ctx_id: str
ctx_id: dict
tx: MpTxModel
state_tx_cnt: int

Expand Down Expand Up @@ -241,12 +241,12 @@ class MpTxResp(BaseModel):


class MpGetTxByHashRequest(BaseModel):
ctx_id: str
ctx_id: dict
neon_tx_hash: EthTxHashField


class MpGetTxBySenderNonceRequest(BaseModel):
ctx_id: str
ctx_id: dict
sender: NeonAccountField
tx_nonce: int

Expand Down
12 changes: 6 additions & 6 deletions proxy/base/mp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,35 +32,35 @@ async def get_evm_cfg(self) -> EvmConfigModel:
async def get_gas_price(self) -> MpGasPriceModel:
return await self._get_gas_price()

async def get_pending_tx_cnt(self, ctx_id: str, sender: NeonAccount) -> int | None:
async def get_pending_tx_cnt(self, ctx_id: dict, sender: NeonAccount) -> int | None:
req = MpTxCntRequest(ctx_id=ctx_id, sender=sender)
resp = await self._get_pending_tx_cnt(req)
return resp.tx_cnt

async def get_mempool_tx_cnt(self, ctx_id: str, sender: NeonAccount) -> int | None:
async def get_mempool_tx_cnt(self, ctx_id: dict, sender: NeonAccount) -> int | None:
req = MpTxCntRequest(ctx_id=ctx_id, sender=sender)
resp = await self._get_mempool_tx_cnt(req)
return resp.tx_cnt

async def send_raw_transaction(self, ctx_id: str, eth_tx_rlp: bytes, chain_id: int, state_tx_cnt: int) -> MpTxResp:
async def send_raw_transaction(self, ctx_id: dict, eth_tx_rlp: bytes, chain_id: int, state_tx_cnt: int) -> MpTxResp:
req = MpTxRequest(
ctx_id=ctx_id,
tx=MpTxModel.from_raw(eth_tx_rlp, chain_id),
state_tx_cnt=state_tx_cnt,
)
return await self._send_raw_transaction(req)

async def get_tx_by_hash(self, ctx_id: str, neon_tx_hash: EthTxHash) -> NeonTxModel:
async def get_tx_by_hash(self, ctx_id: dict, neon_tx_hash: EthTxHash) -> NeonTxModel:
req = MpGetTxByHashRequest(ctx_id=ctx_id, neon_tx_hash=neon_tx_hash)
resp = await self._get_tx_by_hash(req)
return resp.tx

async def get_tx_by_sender_nonce(self, ctx_id: str, sender: NeonAccount, tx_nonce: int) -> NeonTxModel:
async def get_tx_by_sender_nonce(self, ctx_id: dict, sender: NeonAccount, tx_nonce: int) -> NeonTxModel:
req = MpGetTxBySenderNonceRequest(ctx_id=ctx_id, sender=sender, tx_nonce=tx_nonce)
resp = await self._get_tx_by_sender_nonce(req)
return resp.tx

async def get_content(self, ctx_id: str, chain_id: int) -> MpTxPoolContentResp:
async def get_content(self, ctx_id: dict, chain_id: int) -> MpTxPoolContentResp:
return await self._get_content(MpRequest(ctx_id=ctx_id, chain_id=chain_id))

@AppDataClient.method(name="getGasPrice")
Expand Down
21 changes: 12 additions & 9 deletions proxy/base/rpc_server_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _msg_filter(self) -> LogMsgFilter:
def _is_default_chain_id(self, ctx: HttpRequestCtx) -> bool:
return self._server.is_default_chain_id(ctx)

def _get_ctx_id(self, ctx: HttpRequestCtx) -> str:
def _get_ctx_id(self, ctx: HttpRequestCtx) -> dict:
return self._server.get_ctx_id(ctx)

def _get_chain_id(self, ctx: HttpRequestCtx) -> int:
Expand Down Expand Up @@ -129,14 +129,17 @@ def start(self) -> None:
def stop(self) -> None:
self._process_pool.stop()

@staticmethod
def get_ctx_id(ctx: HttpRequestCtx) -> str:
@classmethod
def get_ctx_id(cls, ctx: HttpRequestCtx) -> dict:
if ctx_id := ctx.get_property_value("ctx_id", None):
return ctx_id

size = len(ctx.request.body)
raw_value = f"{ctx.ip_addr}:{size}:{ctx.start_time_nsec}"
ctx_id = hashlib.md5(bytes(raw_value, "utf-8")).hexdigest()[:8]
req_id = hashlib.md5(bytes(raw_value, "utf-8")).hexdigest()[:8]
chain_id = cls.get_chain_id(ctx)
ctx_id = dict(ctx=req_id, chain_id=chain_id)

ctx.set_property_value("ctx_id", ctx_id)
return ctx_id

Expand All @@ -159,12 +162,12 @@ async def get_evm_cfg(self) -> EvmConfigModel:
return evm_cfg

async def on_request_list(self, ctx: HttpRequestCtx, request: JsonRpcListRequest) -> None:
chain_id = await self._validate_chain_id(ctx)
with logging_context(ctx=self.get_ctx_id(ctx), chain_id=chain_id):
await self._validate_chain_id(ctx)
with logging_context(**self.get_ctx_id(ctx)):
_LOG.info(log_msg("handle BIG request <<< {IP} size={Size}", IP=ctx.ip_addr, Size=len(request.root)))

def on_response_list(self, ctx: HttpRequestCtx, resp: JsonRpcListResp) -> None:
with logging_context(ctx=self.get_ctx_id(ctx), chain_id=self.get_chain_id(ctx)):
with logging_context(**self.get_ctx_id(ctx)):
msg = log_msg(
"done BIG request >>> {IP} size={Size} resp_time={TimeMS} msec",
IP=ctx.ip_addr,
Expand All @@ -188,10 +191,10 @@ async def handle_request(
request: JsonRpcRequest,
handler: Callable,
) -> JsonRpcResp:
chain_id = await self._validate_chain_id(ctx)
await self._validate_chain_id(ctx)

info = dict(IP=ctx.ip_addr, ReqID=request.id, Method=request.method)
with logging_context(ctx=self.get_ctx_id(ctx), chain_id=chain_id):
with logging_context(**self.get_ctx_id(ctx)):
_LOG.info(log_msg("handle request <<< {IP} req={ReqID} {Method} {Params}", Params=request.params, **info))

resp = await handler(ctx, request)
Expand Down
12 changes: 6 additions & 6 deletions proxy/mempool/mp_transaction_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,34 @@ def _tx_executor(self) -> MpTxExecutor:

@MempoolApi.method(name="getPendingTransactionCounter")
def get_pending_tx_cnt(self, request: MpTxCntRequest) -> MpTxCntResp:
with logging_context(ctx=request.ctx_id):
with logging_context(**request.ctx_id):
tx_cnt = self._tx_executor.get_pending_tx_cnt(request.sender)
return MpTxCntResp(tx_cnt=tx_cnt)

@MempoolApi.method(name="getMempoolTransactionCounter")
def get_mempool_tx_cnt(self, request: MpTxCntRequest) -> MpTxCntResp:
with logging_context(ctx=request.ctx_id):
with logging_context(**request.ctx_id):
tx_cnt = self._tx_executor.get_last_tx_cnt(request.sender)
return MpTxCntResp(tx_cnt=tx_cnt)

@MempoolApi.method(name="sendRawTransaction")
async def send_raw_transaction(self, request: MpTxRequest) -> MpTxResp:
with logging_context(ctx=request.ctx_id, tx=request.tx.tx_id):
with logging_context(**request.ctx_id, tx=request.tx.tx_id):
return await self._tx_executor.schedule_tx_request(request.tx, request.state_tx_cnt)

@MempoolApi.method(name="getPendingTransactionByHash")
def get_tx_by_hash(self, request: MpGetTxByHashRequest) -> MpGetTxResp:
with logging_context(ctx=request.ctx_id):
with logging_context(**request.ctx_id):
tx = self._tx_executor.get_tx_by_hash(request.neon_tx_hash)
return MpGetTxResp(tx=tx)

@MempoolApi.method(name="getPendingTransactionBySenderNonce")
def get_tx_by_sender_nonce(self, request: MpGetTxBySenderNonceRequest) -> MpGetTxResp:
with logging_context(ctx=request.ctx_id):
with logging_context(**request.ctx_id):
tx = self._tx_executor.get_tx_by_sender_nonce(request.sender, request.tx_nonce)
return MpGetTxResp(tx=tx)

@MempoolApi.method(name="getMempoolContent")
async def _get_content(self, request: MpRequest) -> MpTxPoolContentResp:
with logging_context(ctx=request.ctx_id):
with logging_context(**request.ctx_id):
return self._tx_executor.get_content(request.chain_id)
2 changes: 1 addition & 1 deletion proxy/private_rpc/pr_eth_account_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ class PrEthAccountApi(PrivateRpcApi):

@PrivateRpcApi.method(name="eth_accounts")
async def eth_accounts(self, ctx: HttpRequestCtx) -> list[EthAddressField]:
eth_address_list = await self._op_client.get_eth_address_list(dict(ctx=self._get_ctx_id(ctx)))
eth_address_list = await self._op_client.get_eth_address_list(self._get_ctx_id(ctx))
return [a.eth_address for a in eth_address_list]
5 changes: 2 additions & 3 deletions proxy/private_rpc/pr_eth_tx_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def eth_sign(self, ctx: HttpRequestCtx, eth_address: EthAddressField, data
data = hex_to_bytes(data)
msg = str.encode(f"\x19Ethereum Signed Message:\n{len(data)}") + data

resp = await self._op_client.sign_eth_msg(dict(ctx_id=self._get_ctx_id(ctx)), eth_address, msg)
resp = await self._op_client.sign_eth_msg(self._get_ctx_id(ctx), eth_address, msg)
if resp.error:
raise EthError(message=resp.error)

Expand All @@ -75,8 +75,7 @@ async def _eth_sign_tx(self, ctx: HttpRequestCtx, tx: RpcEthTxRequest) -> bytes:
nonce = await self._core_api_client.get_state_tx_cnt(sender_acct)
object.__setattr__(neon_tx, "nonce", nonce)

ctx_id = self._get_ctx_id(ctx)
resp = await self._op_client.sign_eth_tx(dict(ctx=ctx_id), neon_tx, chain_id)
resp = await self._op_client.sign_eth_tx(self._get_ctx_id(ctx), neon_tx, chain_id)
if resp.error:
raise EthError(message=resp.error)
return resp.signed_tx.to_bytes()
3 changes: 1 addition & 2 deletions proxy/private_rpc/pr_mempool_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ class PrMempoolApi(PrivateRpcApi):

@PrivateRpcApi.method(name="txpool_content")
async def txpool_content(self, ctx: HttpRequestCtx) -> _RpcTxPoolResp:
ctx_id = self._get_ctx_id(ctx)
chain_id = self._get_chain_id(ctx)
txpool_content = await self._mp_client.get_content(ctx_id, chain_id)
txpool_content = await self._mp_client.get_content(self._get_ctx_id(ctx), chain_id)
return _RpcTxPoolResp(
pending=self._get_queue(txpool_content.pending_list),
queued=self._get_queue(txpool_content.queued_list),
Expand Down
3 changes: 1 addition & 2 deletions proxy_client/operator_balance_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ async def _send_value(
return False

eth_tx_rlp = resp.signed_tx.to_bytes()
ctx = str(req_id["uuid"])
tx = NeonTxModel.from_raw(eth_tx_rlp)

value = value / (10 ** 18)
Expand All @@ -216,7 +215,7 @@ async def _send_value(
f"{tx.neon_tx_hash.to_string()}"
)

resp = await mp_client.send_raw_transaction(ctx, resp.signed_tx.to_bytes(), chain_id, state_tx_cnt)
resp = await mp_client.send_raw_transaction(req_id, resp.signed_tx.to_bytes(), chain_id, state_tx_cnt)
if resp.code != MpTxRespCode.Success:
_LOG.error("fail to send tx: %s", resp.code.name)
return False
Expand Down

0 comments on commit 6281d32

Please sign in to comment.