Skip to content

Commit

Permalink
refactor(rosenet-node): improve logs
Browse files Browse the repository at this point in the history
  • Loading branch information
mkermani144 committed Oct 6, 2024
1 parent 46034f1 commit b8338df
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 19 deletions.
2 changes: 2 additions & 0 deletions .changeset/empty-cats-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
---
---
3 changes: 3 additions & 0 deletions packages/rosenet-node/lib/address/address-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { isIP } from 'node:net';
import { fromNodeAddress } from '@multiformats/multiaddr';
import { publicIp } from 'public-ip';

import RoseNetNodeContext from '../context/RoseNetNodeContext';

/**
* identify public ip (v4 or v6) of current node
*/
Expand All @@ -14,6 +16,7 @@ const identifyPublicIP = () => publicIp();
*/
const getAnnounceMultiaddr = async (port: number) => {
const ip = await identifyPublicIP();
RoseNetNodeContext.logger.debug(`Public ip identified: ${ip}`);
const ipVersion = isIP(ip);

const multiaddr = fromNodeAddress(
Expand Down
2 changes: 1 addition & 1 deletion packages/rosenet-node/lib/createRoseNetNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ const createRoseNetNode = async ({
},
logger: libp2pLoggerFactory(logger, config.debug?.libp2pComponents ?? []),
});
RoseNetNodeContext.logger.debug('RoseNet node created');
RoseNetNodeContext.logger.info('RoseNet node created');

addEventListeners(node, RoseNetNodeContext.logger);

Expand Down
22 changes: 13 additions & 9 deletions packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const handleIncomingMessageFactory =
ROSENET_DIRECT_PROTOCOL_V1,
async ({ connection, stream }) => {
RoseNetNodeContext.logger.debug(
`incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`,
`Incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`,
{
remoteAddress: connection.remoteAddr.toString(),
transient: connection.transient,
Expand All @@ -74,19 +74,23 @@ const handleIncomingMessageFactory =
decode,
async function* (source) {
for await (const message of source) {
RoseNetNodeContext.logger.debug(
'message received, calling handler and sending ack',
{
message,
},
);
RoseNetNodeContext.logger.debug('Message decoded', {
message,
});
handler(connection.remotePeer.toString(), message);
RoseNetNodeContext.logger.debug('Handler called');
yield Uint8Array.of(ACK_BYTE);
RoseNetNodeContext.logger.debug(
'Ack sent back to the sender',
);
}
},
stream,
),
);
RoseNetNodeContext.logger.debug(
'Incoming message handling completed',
);
} catch (error) {
RoseNetNodeContext.logger.warn(
'An error occurred while handling incoming message',
Expand All @@ -105,8 +109,8 @@ const handleIncomingMessageFactory =
},
{ runOnTransientConnection: true },
);
RoseNetNodeContext.logger.debug(
`handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`,
RoseNetNodeContext.logger.info(
`Handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`,
);
};

Expand Down
26 changes: 20 additions & 6 deletions packages/rosenet-node/lib/rosenet-direct/sendMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,35 @@ const sendMessageFactory =
async (source) => first(source),
);

RoseNetNodeContext.logger.debug('Starting message sending process');
const result = await messageRoundTripTimeout.execute(() => messagePipe);
RoseNetNodeContext.logger.debug('Message sending process completed');

if (result?.length !== 1) {
RoseNetNodeContext.logger.debug('Invalid multi-chunk ack received');
throw new RoseNetDirectError(
`There are more than one chunk in the ack message`,
RoseNetDirectErrorType.InvalidAckChunks,
);
}
const ack = result?.subarray();
if (ack.length !== 1 || ack[0] !== ACK_BYTE) {
RoseNetNodeContext.logger.debug('Invalid ack byte received');
throw new RoseNetDirectError(
`Ack byte is invalid`,
RoseNetDirectErrorType.InvalidAckByte,
);
}
RoseNetNodeContext.logger.debug('Ack validation compeleted');

RoseNetNodeContext.logger.debug('message sent successfully', {
RoseNetNodeContext.logger.debug('Message sent successfully', {
message,
});
} catch (error) {
if (isBrokenCircuitError(error)) {
RoseNetNodeContext.logger.debug(
'Message sending attempt failed due to a broken circuit',
);
/**
* We were unable to dial, so `stream` is undefined and we don't need to
* abort it
Expand All @@ -88,13 +96,19 @@ const sendMessageFactory =
);
}
if (isTaskCancelledError(error)) {
RoseNetNodeContext.logger.debug(
'Message sending attempt failed due to timeout',
);
const errorToThrow = new RoseNetDirectError(
'Message sending timed out',
RoseNetDirectErrorType.Timeout,
);
stream?.abort(errorToThrow);
throw errorToThrow;
}
RoseNetNodeContext.logger.debug(
'Message sending attempt failed for some reason',
);
if (error instanceof RoseNetNodeError) {
stream?.abort(error);
throw error;
Expand Down Expand Up @@ -134,14 +148,14 @@ const sendMessageWithRetryAndBulkheadFactory =
}),
});
retryPolicy.onFailure((data) => {
RoseNetNodeContext.logger.debug('message sending failed', {
RoseNetNodeContext.logger.debug('Message sending failed', {
message,
reason: data.reason,
});
});
retryPolicy.onRetry((data) => {
RoseNetNodeContext.logger.debug(
`retry sending message (attempt #${data.attempt}/${MESSAGE_RETRY_ATTEMPTS})`,
`Retry sending message (attempt #${data.attempt}/${MESSAGE_RETRY_ATTEMPTS})`,
{
message,
},
Expand All @@ -154,10 +168,10 @@ const sendMessageWithRetryAndBulkheadFactory =
.execute(() => sendMessageInner(to, message))
.then(() => onSettled?.())
.catch(() => {
RoseNetNodeContext.logger.error(
'message sending failed regardless of 3 retries, dropping message',
RoseNetNodeContext.logger.warn(
'Message sending failed regardless of 3 retries, dropping message',
);
RoseNetNodeContext.logger.debug('message was: ', {
RoseNetNodeContext.logger.debug('Message was: ', {
message,
});
onSettled?.(new RoseNetNodeError('Message sending failed'));
Expand Down
5 changes: 3 additions & 2 deletions packages/rosenet-node/lib/rosenet-pubsub/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ const publishFactory =
await bulkheadPolicy.execute(() =>
node.services.pubsub.publish(topic, textEncoder.encode(message)),
);
RoseNetNodeContext.logger.debug('Message published successfully');
} catch (error) {
if (isBulkheadRejectedError(error)) {
RoseNetNodeContext.logger.warn('Maximum publish threshold reached');
RoseNetNodeContext.logger.debug('Maximum publish threshold reached');
} else {
RoseNetNodeContext.logger.warn('Message publish failed', {
RoseNetNodeContext.logger.debug('Message publish failed', {
message,
});
}
Expand Down
7 changes: 6 additions & 1 deletion packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ const subscribeFactory =
try {
await bulkheadPolicy.execute(() => {
if (event.detail.topic === topic) {
handler(textDecoder.decode(event.detail.data));
const message = textDecoder.decode(event.detail.data);
handler(message);
RoseNetNodeContext.logger.debug('Pubsub message received', {
message,
});
}
});
} catch {
Expand All @@ -35,6 +39,7 @@ const subscribeFactory =
);
}
});
RoseNetNodeContext.logger.info(`Topic ${topic} subscribed`);
};

export default subscribeFactory;

0 comments on commit b8338df

Please sign in to comment.