Skip to content

Commit

Permalink
feat(core): #41 improve logging (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
chertby authored Sep 27, 2023
1 parent 1c89601 commit 4d2b9e5
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 79 deletions.
6 changes: 3 additions & 3 deletions src/libp2p/Libp2p.Core/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public async ValueTask<ReadOnlySequence<byte>> ReadAsync(int length,
{
anotherChunk = _bytes;
bytesToRead -= _bytes.Length;
_logger?.LogTrace("Read chunk {0} bytes", _bytes.Length);
_logger?.ReadChunk(_bytes.Length);
_bytes = default;
_read.Release();
_canWrite.Release();
Expand All @@ -159,7 +159,7 @@ public async ValueTask<ReadOnlySequence<byte>> ReadAsync(int length,
{
anotherChunk = _bytes.Slice(0, bytesToRead);
_bytes = _bytes.Slice(bytesToRead, _bytes.End);
_logger?.LogTrace("Read enough {0} bytes", anotherChunk.Length);
_logger?.ReadEnough(_bytes.Length);
bytesToRead = 0;
_canRead.Release();
}
Expand All @@ -185,7 +185,7 @@ public async ValueTask WriteAsync(ReadOnlySequence<byte> bytes)
throw new InvalidProgramException();
}

_logger?.LogTrace("Write {0} bytes", bytes.Length);
_logger?.WriteBytes(bytes.Length);

if (bytes.Length == 0)
{
Expand Down
160 changes: 84 additions & 76 deletions src/libp2p/Libp2p.Core/ChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Nethermind.Libp2p.Core.Extensions;

namespace Nethermind.Libp2p.Core;

Expand All @@ -24,102 +25,109 @@ public ChannelFactory(IServiceProvider serviceProvider)
public IChannel SubDial(IPeerContext context, IChannelRequest? req = null)
{
IProtocol? subProtocol = req?.SubProtocol ?? SubProtocols.FirstOrDefault();
Channel channel = CreateChannel(subProtocol);
ChannelFactory? channelFactory = _factories[subProtocol] as ChannelFactory;

Channel chan = CreateChannel(subProtocol);
ChannelFactory? sf = _factories[subProtocol] as ChannelFactory;
_logger?.DialStarted(channel.Id, subProtocol.Id, channelFactory.GetSubProtocols());

_logger?.LogDebug("Dial {chan} {sf}", chan.Id, sf.SubProtocols);
_ = subProtocol.DialAsync(chan.Reverse, sf, context).ContinueWith(async t =>
{
if (!t.IsCompletedSuccessfully)
_ = subProtocol.DialAsync(channel.Reverse, channelFactory, context)
.ContinueWith(async task =>
{
_logger?.LogError("Dial error {proto} via {chan}: {error}", chan.Id, subProtocol.Id, t.Exception?.Message ?? "unknown");
}
if (!chan.IsClosed)
{
await chan.CloseAsync(t.Exception is null);
}

req?.CompletionSource?.SetResult();
});


return chan;
if (!task.IsCompletedSuccessfully)
{
_logger?.DialFailed(channel.Id, subProtocol.Id, task.Exception, task.Exception.GetErrorMessage());
}
if (!channel.IsClosed)
{
await channel.CloseAsync(task.Exception is null);
}

req?.CompletionSource?.SetResult();
});

return channel;
}

public IChannel SubListen(IPeerContext context, IChannelRequest? req = null)
{
IProtocol? subProtocol = req?.SubProtocol ?? SubProtocols.FirstOrDefault();
PeerContext peerContext = (PeerContext)context;
Channel channel = CreateChannel(subProtocol);
ChannelFactory? channelFactory = _factories[subProtocol] as ChannelFactory;

Channel chan = CreateChannel(subProtocol);
_logger?.ListenStarted(channel.Id, subProtocol.Id, channelFactory.GetSubProtocols());

_logger?.LogDebug("Listen {chan} on protocol {sp} with sub-protocols {sf}", chan.Id, subProtocol.Id, _factories[subProtocol].SubProtocols.Select(s => s.Id));

_ = subProtocol.ListenAsync(chan.Reverse, _factories[subProtocol], context).ContinueWith(async t =>
{
if (!t.IsCompletedSuccessfully)
{
_logger?.LogError("Listen error {proto} via {chan}: {error}", subProtocol.Id, chan.Id, t.Exception?.Message ?? "unknown");
}
IEnumerable<IProtocol> dd = _factories[subProtocol].SubProtocols;

if (!chan.IsClosed)
_ = subProtocol.ListenAsync(channel.Reverse, channelFactory, context)
.ContinueWith(async task =>
{
await chan.CloseAsync();
}

req?.CompletionSource?.SetResult();
});

return chan;
if (!task.IsCompletedSuccessfully)
{
_logger?.ListenFailed(channel.Id, subProtocol.Id, task.Exception, task.Exception.GetErrorMessage());
}
if (!channel.IsClosed)
{
await channel.CloseAsync();
}

req?.CompletionSource?.SetResult();
});

return channel;
}

public IChannel SubDialAndBind(IChannel parent, IPeerContext context,
IChannelRequest? req = null)
{
IProtocol? subProtocol = req?.SubProtocol ?? SubProtocols.FirstOrDefault();
Channel chan = CreateChannel(subProtocol);
chan.Bind(parent);
_ = subProtocol.DialAsync(chan.Reverse, _factories[subProtocol], context).ContinueWith(async t =>
{
if (!t.IsCompletedSuccessfully)
{
_logger?.LogError("SubDialAndBind error {proto} via {chan}: {error}", chan.Id, subProtocol.Id, t.Exception?.Message ?? "unknown");
}

if (!chan.IsClosed)
{
await chan.CloseAsync();
}

req?.CompletionSource?.SetResult();
});

return chan;
Channel channel = CreateChannel(subProtocol);
ChannelFactory? channelFactory = _factories[subProtocol] as ChannelFactory;

_logger?.DialAndBindStarted(channel.Id, subProtocol.Id, channelFactory.GetSubProtocols());

channel.Bind(parent);
_ = subProtocol.DialAsync(channel.Reverse, channelFactory, context)
.ContinueWith(async task =>
{
if (!task.IsCompletedSuccessfully)
{
_logger?.DialAndBindFailed(channel.Id, subProtocol.Id, task.Exception, task.Exception.GetErrorMessage());
}
if (!channel.IsClosed)
{
await channel.CloseAsync();
}

req?.CompletionSource?.SetResult();
});

return channel;
}

public IChannel SubListenAndBind(IChannel parent, IPeerContext context,
IChannelRequest? req = null)
{
IProtocol? subProtocol = req?.SubProtocol ?? SubProtocols.FirstOrDefault();
Channel chan = CreateChannel(subProtocol);
chan.Bind(parent);
_ = subProtocol.ListenAsync(chan.Reverse, _factories[subProtocol], context).ContinueWith(async t =>
{
if (!t.IsCompletedSuccessfully)
{
_logger?.LogError("SubListenAndBind error {proto} via {chan}: {error}", subProtocol.Id, chan.Id, t.Exception?.Message ?? "unknown");
}
if (!chan.IsClosed)
{
await chan.CloseAsync();
}
Channel channel = CreateChannel(subProtocol);
ChannelFactory? channelFactory = _factories[subProtocol] as ChannelFactory;

req?.CompletionSource?.SetResult();
});
_logger?.ListenAndBindStarted(channel.Id, subProtocol.Id, channelFactory.GetSubProtocols());

return chan;
channel.Bind(parent);
_ = subProtocol.ListenAsync(channel.Reverse, channelFactory, context)
.ContinueWith(async task =>
{
if (!task.IsCompletedSuccessfully)
{
_logger?.ListenAndBindFailed(channel.Id, subProtocol.Id, task.Exception, task.Exception.GetErrorMessage());
}
if (!channel.IsClosed)
{
await channel.CloseAsync();
}

req?.CompletionSource?.SetResult();
});

return channel;
}

public ChannelFactory Setup(IProtocol parent, IDictionary<IProtocol, IChannelFactory> factories)
Expand All @@ -129,11 +137,11 @@ public ChannelFactory Setup(IProtocol parent, IDictionary<IProtocol, IChannelFac
return this;
}

private Channel CreateChannel(IProtocol subprotocol)
private Channel CreateChannel(IProtocol? subProtocol)
{
Channel chan = ActivatorUtilities.CreateInstance<Channel>(_serviceProvider);
chan.Id = $"{_parent.Id} <> {subprotocol?.Id}";
_logger?.LogDebug("Create chan {0}", chan.Id);
return chan;
Channel channel = ActivatorUtilities.CreateInstance<Channel>(_serviceProvider);
channel.Id = $"{_parent.Id} <> {subProtocol?.Id}";
_logger?.ChannelCreated(channel.Id);
return channel;
}
}
10 changes: 10 additions & 0 deletions src/libp2p/Libp2p.Core/Extensions/ChannelFactoryExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: MIT

namespace Nethermind.Libp2p.Core.Extensions;

internal static class ChannelFactoryExtensions
{
public static IEnumerable<string> GetSubProtocols(this ChannelFactory? channelFactory)
=> channelFactory?.SubProtocols.Select(protocol => protocol.Id) ?? Enumerable.Empty<string>();
}
10 changes: 10 additions & 0 deletions src/libp2p/Libp2p.Core/Extensions/ExceptionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: MIT

namespace Nethermind.Libp2p.Core.Extensions;

internal static class ExceptionExtensions
{
public static string GetErrorMessage(this Exception? exception)
=> exception?.Message ?? "unknown";
}
117 changes: 117 additions & 0 deletions src/libp2p/Libp2p.Core/LogMessages.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: MIT

using Microsoft.Extensions.Logging;

namespace Nethermind.Libp2p.Core;

internal static partial class LogMessages
{
[LoggerMessage(
Message = "Read chunk {bytes} bytes",
Level = LogLevel.Trace)]
internal static partial void ReadChunk(
this ILogger logger,
long bytes);

[LoggerMessage(
Message = "Read enough {bytes} bytes",
Level = LogLevel.Trace)]
internal static partial void ReadEnough(
this ILogger logger,
long bytes);

[LoggerMessage(
Message = "Write {bytes} bytes",
Level = LogLevel.Trace)]
internal static partial void WriteBytes(
this ILogger logger,
long bytes);

[LoggerMessage(
Message = "Dial {channel} on protocol {protocol} with sub-protocols {subProtocols}",
Level = LogLevel.Debug)]
internal static partial void DialStarted(
this ILogger logger,
string channel,
string protocol,
IEnumerable<string> subProtocols);

[LoggerMessage(
Message = "Listen {channel} on protocol {protocol} with sub-protocols {subProtocols}",
Level = LogLevel.Debug)]
internal static partial void ListenStarted(
this ILogger logger,
string channel,
string protocol,
IEnumerable<string> subProtocols);

[LoggerMessage(
Message = "Dial and bind {channel} on protocol {protocol} with sub-protocols {subProtocols}",
Level = LogLevel.Debug)]
internal static partial void DialAndBindStarted(
this ILogger logger,
string channel,
string protocol,
IEnumerable<string> subProtocols);

[LoggerMessage(
Message = "Listen and bind {channel} on protocol {protocol} with sub-protocols {subProtocols}",
Level = LogLevel.Debug)]
internal static partial void ListenAndBindStarted(
this ILogger logger,
string channel,
string protocol,
IEnumerable<string> subProtocols);

[LoggerMessage(
Message = "Create channel {chan}",
Level = LogLevel.Debug)]
internal static partial void ChannelCreated(
this ILogger logger,
string chan);

[LoggerMessage(
Message = "Dial error {protocol} via {channel}: {errorMessage}",
Level = LogLevel.Error,
SkipEnabledCheck = true)]
internal static partial void DialFailed(
this ILogger logger,
string channel,
string protocol,
Exception? exception,
string errorMessage);

[LoggerMessage(
Message = "Listen error {protocol} via {channel}: {errorMessage}",
Level = LogLevel.Error,
SkipEnabledCheck = true)]
internal static partial void ListenFailed(
this ILogger logger,
string channel,
string protocol,
Exception? exception,
string errorMessage);

[LoggerMessage(
Message = "Dial and bind error {protocol} via {channel}: {errorMessage}",
Level = LogLevel.Error,
SkipEnabledCheck = true)]
internal static partial void DialAndBindFailed(
this ILogger logger,
string channel,
string protocol,
Exception? exception,
string errorMessage);

[LoggerMessage(
Message = "Listen and bind error {protocol} via {channel}: {errorMessage}",
Level = LogLevel.Error,
SkipEnabledCheck = true)]
internal static partial void ListenAndBindFailed(
this ILogger logger,
string channel,
string protocol,
Exception? exception,
string errorMessage);
}

0 comments on commit 4d2b9e5

Please sign in to comment.