diff --git a/src/Foundatio.MessagePack/Foundatio.MessagePack.csproj b/src/Foundatio.MessagePack/Foundatio.MessagePack.csproj index da100e47..8404ebda 100644 --- a/src/Foundatio.MessagePack/Foundatio.MessagePack.csproj +++ b/src/Foundatio.MessagePack/Foundatio.MessagePack.csproj @@ -1,8 +1,8 @@  - + - \ No newline at end of file + diff --git a/src/Foundatio/Jobs/IJob.cs b/src/Foundatio/Jobs/IJob.cs index 3d9f171d..0de04a7c 100644 --- a/src/Foundatio/Jobs/IJob.cs +++ b/src/Foundatio/Jobs/IJob.cs @@ -13,6 +13,11 @@ public interface IJob Task RunAsync(CancellationToken cancellationToken = default); } +public interface IJobWithOptions : IJob +{ + JobOptions Options { get; set; } +} + public static class JobExtensions { public static async Task TryRunAsync(this IJob job, CancellationToken cancellationToken = default) @@ -35,41 +40,53 @@ public static async Task TryRunAsync(this IJob job, CancellationToken /// Runs the job continuously until the cancellation token is set or the iteration limit is reached. /// /// Returns the iteration count for normal jobs. For queue based jobs this will be the amount of items processed successfully. - public static async Task RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, CancellationToken cancellationToken = default, Func> continuationCallback = null) + public static Task RunContinuousAsync(this IJob job, TimeSpan? interval = null, int iterationLimit = -1, + CancellationToken cancellationToken = default, Func> continuationCallback = null) + { + var options = JobOptions.GetDefaults(job); + options.Interval = interval; + options.IterationLimit = iterationLimit; + return RunContinuousAsync(job, options, cancellationToken, continuationCallback); + } + + /// + /// Runs the job continuously until the cancellation token is set or the iteration limit is reached. + /// + /// Returns the iteration count for normal jobs. For queue based jobs this will be the amount of items processed successfully. + public static async Task RunContinuousAsync(this IJob job, JobOptions options, CancellationToken cancellationToken = default, Func> continuationCallback = null) { int iterations = 0; - string jobName = job.GetType().Name; var logger = job.GetLogger(); bool isInformationLogLevelEnabled = logger.IsEnabled(LogLevel.Information); int queueItemsProcessed = 0; bool isQueueJob = job.GetType().GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == typeof(IQueueJob<>)); - using var _ = logger.BeginScope(new Dictionary { { "job", jobName } }); + using var _ = logger.BeginScope(new Dictionary { { "job", options.Name } }); if (isInformationLogLevelEnabled) - logger.LogInformation("Starting continuous job type {JobName} on machine {MachineName}...", jobName, Environment.MachineName); + logger.LogInformation("Starting continuous job type {JobName} on machine {MachineName}...", options.Name, Environment.MachineName); while (!cancellationToken.IsCancellationRequested) { - using var activity = FoundatioDiagnostics.ActivitySource.StartActivity("Job: " + jobName); + using var activity = FoundatioDiagnostics.ActivitySource.StartActivity("Job: " + options.Name); var result = await job.TryRunAsync(cancellationToken).AnyContext(); - logger.LogJobResult(result, jobName); + logger.LogJobResult(result, options.Name); iterations++; if (isQueueJob && result.IsSuccess) queueItemsProcessed++; - if (cancellationToken.IsCancellationRequested || (iterationLimit > -1 && iterationLimit <= iterations)) + if (cancellationToken.IsCancellationRequested || (options.IterationLimit > -1 && options.IterationLimit <= iterations)) break; if (result.Error != null) { - await job.GetTimeProvider().SafeDelay(TimeSpan.FromMilliseconds(Math.Max((int)(interval?.TotalMilliseconds ?? 0), 100)), cancellationToken).AnyContext(); + await job.GetTimeProvider().SafeDelay(TimeSpan.FromMilliseconds(Math.Max((int)(options.Interval?.TotalMilliseconds ?? 0), 100)), cancellationToken).AnyContext(); } - else if (interval.HasValue && interval.Value > TimeSpan.Zero) + else if (options.Interval.HasValue && options.Interval.Value > TimeSpan.Zero) { - await job.GetTimeProvider().SafeDelay(interval.Value, cancellationToken).AnyContext(); + await job.GetTimeProvider().SafeDelay(options.Interval.Value, cancellationToken).AnyContext(); } // needed to yield back a task for jobs that aren't async @@ -97,17 +114,17 @@ public static async Task RunContinuousAsync(this IJob job, TimeSpan? interv if (isInformationLogLevelEnabled) { - if (iterationLimit > 0) + if (options.IterationLimit > 0) { logger.LogInformation( "Stopping continuous job type {JobName} on machine {MachineName}: Job ran {Iterations} times (Limit={IterationLimit})", - jobName, Environment.MachineName, iterationLimit, iterations); + options.Name, Environment.MachineName, options.IterationLimit, iterations); } else { logger.LogInformation( "Stopping continuous job type {JobName} on machine {MachineName}: Job ran {Iterations} times", - jobName, Environment.MachineName, iterations); + options.Name, Environment.MachineName, iterations); } } diff --git a/src/Foundatio/Jobs/JobRunner.cs b/src/Foundatio/Jobs/JobRunner.cs index 3792db20..4c0deb76 100644 --- a/src/Foundatio/Jobs/JobRunner.cs +++ b/src/Foundatio/Jobs/JobRunner.cs @@ -15,7 +15,6 @@ public class JobRunner { private readonly TimeProvider _timeProvider; private readonly ILogger _logger; - private string _jobName; private readonly JobOptions _options; private readonly IServiceProvider _serviceProvider; @@ -86,7 +85,7 @@ public async Task RunInConsoleAsync() catch (Exception e) { if (_logger.IsEnabled(LogLevel.Error)) - _logger.LogError(e, "Job {JobName} error: {Message}", _jobName, e.GetMessage()); + _logger.LogError(e, "Job {JobName} error: {Message}", _options.Name, e.GetMessage()); if (Debugger.IsAttached) Console.ReadKey(); @@ -136,6 +135,8 @@ public async Task RunAsync(CancellationToken cancellationToken = default) try { job = _options.JobFactory(_serviceProvider); + if (job is IJobWithOptions jobWithOptions) + jobWithOptions.Options = _options; } catch (Exception ex) { @@ -149,20 +150,19 @@ public async Task RunAsync(CancellationToken cancellationToken = default) return false; } - _jobName = TypeHelper.GetTypeDisplayName(job.GetType()); - using (_logger.BeginScope(s => s.Property("job", _jobName))) + using (_logger.BeginScope(s => s.Property("job", _options.Name))) { if (_logger.IsEnabled(LogLevel.Information)) - _logger.LogInformation("Starting job type {JobName} on machine {MachineName}...", _jobName, Environment.MachineName); + _logger.LogInformation("Starting job type {JobName} on machine {MachineName}...", _options.Name, Environment.MachineName); var jobLifetime = job as IAsyncLifetime; if (jobLifetime != null) { if (_logger.IsEnabled(LogLevel.Information)) - _logger.LogInformation("Initializing job lifetime {JobName} on machine {MachineName}...", _jobName, Environment.MachineName); + _logger.LogInformation("Initializing job lifetime {JobName} on machine {MachineName}...", _options.Name, Environment.MachineName); await jobLifetime.InitializeAsync().AnyContext(); if (_logger.IsEnabled(LogLevel.Information)) - _logger.LogInformation("Done initializing job lifetime {JobName} on machine {MachineName}.", _jobName, Environment.MachineName); + _logger.LogInformation("Done initializing job lifetime {JobName} on machine {MachineName}.", _options.Name, Environment.MachineName); } try @@ -183,8 +183,10 @@ public async Task RunAsync(CancellationToken cancellationToken = default) { await using var scope = _serviceProvider.CreateAsyncScope(); var jobInstance = _options.JobFactory(scope.ServiceProvider); - await jobInstance.RunContinuousAsync(_options.Interval, _options.IterationLimit, - cancellationToken).AnyContext(); + if (jobInstance is IJobWithOptions jobWithOptions) + jobWithOptions.Options = _options; + + await jobInstance.RunContinuousAsync(_options, cancellationToken).AnyContext(); } catch (TaskCanceledException) { @@ -210,10 +212,10 @@ await jobInstance.RunContinuousAsync(_options.Interval, _options.IterationLimit, } else { - using var activity = FoundatioDiagnostics.ActivitySource.StartActivity("Job: " + _jobName); + using var activity = FoundatioDiagnostics.ActivitySource.StartActivity("Job: " + _options.Name); var result = await job.TryRunAsync(cancellationToken).AnyContext(); - _logger.LogJobResult(result, _jobName); + _logger.LogJobResult(result, _options.Name); return result.IsSuccess; } @@ -224,10 +226,10 @@ await jobInstance.RunContinuousAsync(_options.Interval, _options.IterationLimit, if (jobDisposable != null) { if (_logger.IsEnabled(LogLevel.Information)) - _logger.LogInformation("Disposing job lifetime {JobName} on machine {MachineName}...", _jobName, Environment.MachineName); + _logger.LogInformation("Disposing job lifetime {JobName} on machine {MachineName}...", _options.Name, Environment.MachineName); await jobDisposable.DisposeAsync().AnyContext(); if (_logger.IsEnabled(LogLevel.Information)) - _logger.LogInformation("Done disposing job lifetime {JobName} on machine {MachineName}.", _jobName, Environment.MachineName); + _logger.LogInformation("Done disposing job lifetime {JobName} on machine {MachineName}.", _options.Name, Environment.MachineName); } } } diff --git a/src/Foundatio/Jobs/JobWithLockBase.cs b/src/Foundatio/Jobs/JobWithLockBase.cs index f7c8b023..96d7cd31 100644 --- a/src/Foundatio/Jobs/JobWithLockBase.cs +++ b/src/Foundatio/Jobs/JobWithLockBase.cs @@ -8,7 +8,7 @@ namespace Foundatio.Jobs; -public abstract class JobWithLockBase : IJob, IHaveLogger, IHaveTimeProvider +public abstract class JobWithLockBase : IJobWithOptions, IHaveLogger, IHaveTimeProvider { protected readonly ILogger _logger; private readonly TimeProvider _timeProvider; @@ -30,11 +30,12 @@ public JobWithLockBase(TimeProvider timeProvider, ILoggerFactory loggerFactory = public string JobId { get; } = Guid.NewGuid().ToString("N").Substring(0, 10); ILogger IHaveLogger.Logger => _logger; TimeProvider IHaveTimeProvider.TimeProvider => _timeProvider; + public JobOptions Options { get; set; } public virtual async Task RunAsync(CancellationToken cancellationToken = default) { ILock lockValue; - using (var lockActivity = FoundatioDiagnostics.ActivitySource.StartActivity("Job Lock: " + _jobName)) + using (var lockActivity = FoundatioDiagnostics.ActivitySource.StartActivity("Job Lock: " + Options?.Name ?? _jobName)) { lockActivity?.AddTag("job.id", JobId);