diff --git a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj index 658658cb..5d2950b3 100644 --- a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj +++ b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj @@ -51,8 +51,8 @@ - - + + diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs index d465d848..abb78754 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs @@ -32,6 +32,7 @@ public class NetheriteProviderFactory : IDurabilityProviderFactory readonly IServiceProvider serviceProvider; readonly DurableTask.Netherite.ConnectionResolver connectionResolver; + readonly bool usesNewPassthroughMiddlewareForEntities; readonly bool inConsumption; // the following are boolean options that can be specified in host.json, @@ -96,6 +97,14 @@ bool ReadBooleanSetting(string name) => this.options.StorageProvider.TryGetValue this.TraceToConsole = ReadBooleanSetting(nameof(this.TraceToConsole)); this.TraceToBlob = ReadBooleanSetting(nameof(this.TraceToBlob)); + + WorkerRuntimeType runtimeType = platformInfo.GetWorkerRuntimeType(); + if (runtimeType == WorkerRuntimeType.DotNetIsolated || + runtimeType == WorkerRuntimeType.Java || + runtimeType == WorkerRuntimeType.Custom) + { + this.usesNewPassthroughMiddlewareForEntities = true; + } } NetheriteOrchestrationServiceSettings GetNetheriteOrchestrationServiceSettings(string taskHubNameOverride = null, string connectionName = null) @@ -109,12 +118,19 @@ NetheriteOrchestrationServiceSettings GetNetheriteOrchestrationServiceSettings(s // different defaults for key configuration values. int maxConcurrentOrchestratorsDefault = this.inConsumption ? 5 : 10 * Environment.ProcessorCount; int maxConcurrentActivitiesDefault = this.inConsumption ? 20 : 25 * Environment.ProcessorCount; + int maxConcurrentEntitiesDefault = this.inConsumption ? 20 : 25 * Environment.ProcessorCount; int maxEntityOperationBatchSizeDefault = this.inConsumption ? 50 : 5000; // The following defaults are only applied if the customer did not explicitely set them on `host.json` - this.options.MaxConcurrentOrchestratorFunctions = this.options.MaxConcurrentOrchestratorFunctions ?? maxConcurrentOrchestratorsDefault; - this.options.MaxConcurrentActivityFunctions = this.options.MaxConcurrentActivityFunctions ?? maxConcurrentActivitiesDefault; - this.options.MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize ?? maxEntityOperationBatchSizeDefault; + this.options.MaxConcurrentOrchestratorFunctions ??= maxConcurrentOrchestratorsDefault; + this.options.MaxConcurrentActivityFunctions ??= maxConcurrentActivitiesDefault; + this.options.MaxConcurrentEntityFunctions ??= maxConcurrentEntitiesDefault; + this.options.MaxEntityOperationBatchSize ??= maxEntityOperationBatchSizeDefault; + + if (this.usesNewPassthroughMiddlewareForEntities) + { + netheriteSettings.UseSeparateQueueForEntityWorkItems = true; + } // copy all applicable fields from both the options and the storageProvider options JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options), netheriteSettings); diff --git a/src/DurableTask.Netherite/DurableTask.Netherite.csproj b/src/DurableTask.Netherite/DurableTask.Netherite.csproj index cd88b390..96825500 100644 --- a/src/DurableTask.Netherite/DurableTask.Netherite.csproj +++ b/src/DurableTask.Netherite/DurableTask.Netherite.csproj @@ -56,7 +56,7 @@ - + diff --git a/src/DurableTask.Netherite/OrchestrationService/InstanceQuery.cs b/src/DurableTask.Netherite/OrchestrationService/InstanceQuery.cs index 683f7850..65006a54 100644 --- a/src/DurableTask.Netherite/OrchestrationService/InstanceQuery.cs +++ b/src/DurableTask.Netherite/OrchestrationService/InstanceQuery.cs @@ -52,6 +52,11 @@ public class InstanceQuery [DataMember] internal bool PrefetchHistory { get; set; } + /// + /// Whether to exclude entities from the results. + /// + [DataMember] + internal bool ExcludeEntities { get; set; } /// /// Construct an instance query with the given parameters. @@ -77,9 +82,6 @@ public InstanceQuery( internal bool HasRuntimeStatus => this.RuntimeStatus != null && this.RuntimeStatus.Length > 0; - internal bool IsSet => this.HasRuntimeStatus || !string.IsNullOrWhiteSpace(this.InstanceIdPrefix) - || !(this.CreatedTimeFrom is null) || !(this.CreatedTimeTo is null); - internal bool Matches(OrchestrationState targetState) { if (targetState == null) @@ -88,7 +90,8 @@ internal bool Matches(OrchestrationState targetState) return (!this.HasRuntimeStatus || this.RuntimeStatus.Contains(targetState.OrchestrationStatus)) && (string.IsNullOrWhiteSpace(this.InstanceIdPrefix) || targetState.OrchestrationInstance.InstanceId.StartsWith(this.InstanceIdPrefix)) && (!this.CreatedTimeFrom.HasValue || targetState.CreatedTime >= this.CreatedTimeFrom.Value) - && (!this.CreatedTimeTo.HasValue || targetState.CreatedTime <= this.CreatedTimeTo.Value); + && (!this.CreatedTimeTo.HasValue || targetState.CreatedTime <= this.CreatedTimeTo.Value) + && (!this.ExcludeEntities || !DurableTask.Core.Common.Entities.IsEntityInstance(targetState.OrchestrationInstance.InstanceId)); } } } diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs index 5b60c876..8c5e94eb 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs @@ -5,6 +5,7 @@ namespace DurableTask.Netherite { using DurableTask.Core; using DurableTask.Core.Common; + using DurableTask.Core.Entities; using DurableTask.Core.History; using DurableTask.Netherite.Abstractions; using DurableTask.Netherite.Faster; @@ -24,10 +25,12 @@ namespace DurableTask.Netherite /// Local partition of the distributed orchestration service. /// public class NetheriteOrchestrationService : - DurableTask.Core.IOrchestrationService, + DurableTask.Core.IOrchestrationService, DurableTask.Core.IOrchestrationServiceClient, DurableTask.Core.IOrchestrationServicePurgeClient, + DurableTask.Core.Query.IOrchestrationServiceQueryClient, DurableTask.Netherite.IOrchestrationServiceQueryClient, + DurableTask.Core.Entities.IEntityOrchestrationService, TransportAbstraction.IHost { /// @@ -43,6 +46,7 @@ public class NetheriteOrchestrationService : readonly ITransportLayer transport; readonly IStorageLayer storage; + readonly EntityBackendQueriesImplementation EntityBackendQueries; readonly WorkItemTraceHelper workItemTraceHelper; @@ -88,6 +92,8 @@ internal async ValueTask GetClientAsync() internal WorkItemQueue ActivityWorkItemQueue { get; private set; } internal WorkItemQueue OrchestrationWorkItemQueue { get; private set; } + internal WorkItemQueue EntityWorkItemQueue { get; private set; } + internal LoadPublishWorker LoadPublisher { get; private set; } internal ILoggerFactory LoggerFactory { get; } @@ -124,7 +130,8 @@ public NetheriteOrchestrationService(NetheriteOrchestrationServiceSettings setti this.Settings = settings; this.TraceHelper = new OrchestrationServiceTraceHelper(loggerFactory, settings.LogLevelLimit, settings.WorkerId, settings.HubName); this.workItemTraceHelper = new WorkItemTraceHelper(loggerFactory, settings.WorkItemLogLevelLimit, settings.HubName); - + this.EntityBackendQueries = new EntityBackendQueriesImplementation(this); + try { this.TraceHelper.TraceProgress("Reading configuration for transport and storage layers"); @@ -379,6 +386,7 @@ async Task StartClientAsync(Task greenLight) this.ActivityWorkItemQueue = new WorkItemQueue(); this.OrchestrationWorkItemQueue = new WorkItemQueue(); + this.EntityWorkItemQueue = new WorkItemQueue(); this.TraceHelper.TraceProgress($"Started client"); @@ -475,6 +483,7 @@ async Task TryStopAsync(bool quickly) await this.transport.StopAsync(fatalExceptionObserved: false); this.ActivityWorkItemQueue.Dispose(); + this.EntityWorkItemQueue.Dispose(); this.OrchestrationWorkItemQueue.Dispose(); } @@ -539,7 +548,7 @@ TransportAbstraction.IClient TransportAbstraction.IHost.AddClient(Guid clientId, TransportAbstraction.IPartition TransportAbstraction.IHost.AddPartition(uint partitionId, TransportAbstraction.ISender batchSender) { var partition = new Partition(this, partitionId, this.GetPartitionId, this.GetNumberPartitions, batchSender, this.Settings, this.StorageAccountName, - this.ActivityWorkItemQueue, this.OrchestrationWorkItemQueue, this.LoadPublisher, this.workItemTraceHelper); + this.ActivityWorkItemQueue, this.OrchestrationWorkItemQueue, this.EntityWorkItemQueue, this.LoadPublisher, this.workItemTraceHelper); return partition; } @@ -551,7 +560,7 @@ TransportAbstraction.ILoadMonitor TransportAbstraction.IHost.AddLoadMonitor(Guid IPartitionErrorHandler TransportAbstraction.IHost.CreateErrorHandler(uint partitionId) { - return new PartitionErrorHandler((int) partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this); + return new PartitionErrorHandler((int)partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this); } void TransportAbstraction.IHost.TraceWarning(string message) @@ -744,7 +753,30 @@ async Task IOrchestrationServiceQueryClient.PurgeInstanceHistoryAsync(DateT /// async Task IOrchestrationServiceQueryClient.QueryOrchestrationStatesAsync(InstanceQuery instanceQuery, int pageSize, string continuationToken, CancellationToken cancellationToken) - => await (await this.GetClientAsync().ConfigureAwait(false)).QueryOrchestrationStatesAsync(instanceQuery, pageSize, continuationToken, cancellationToken).ConfigureAwait(false); + { + return await (await this.GetClientAsync().ConfigureAwait(false)).QueryOrchestrationStatesAsync(instanceQuery, pageSize, continuationToken, cancellationToken).ConfigureAwait(false); + } + + /// + async Task DurableTask.Core.Query.IOrchestrationServiceQueryClient.GetOrchestrationWithQueryAsync( + DurableTask.Core.Query.OrchestrationQuery query, + CancellationToken cancellationToken) + { + InstanceQuery instanceQuery = new() + { + CreatedTimeFrom = query.CreatedTimeFrom, + CreatedTimeTo = query.CreatedTimeTo, + ExcludeEntities = query.ExcludeEntities, + FetchInput = query.FetchInputsAndOutputs, + InstanceIdPrefix = query.InstanceIdPrefix, + PrefetchHistory = false, + RuntimeStatus = query.RuntimeStatus?.ToArray(), + }; + + Client client = await this.GetClientAsync().ConfigureAwait(false); + InstanceQueryResult result = await client.QueryOrchestrationStatesAsync(instanceQuery, query.PageSize, query.ContinuationToken, cancellationToken).ConfigureAwait(false); + return new DurableTask.Core.Query.OrchestrationQueryResult(result.Instances.ToList(), result.ContinuationToken); + } /// async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(string instanceId) @@ -754,24 +786,191 @@ async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter) => new PurgeResult(await (await this.GetClientAsync()).PurgeInstanceHistoryAsync(purgeInstanceFilter.CreatedTimeFrom, purgeInstanceFilter.CreatedTimeTo, purgeInstanceFilter.RuntimeStatus)); + /// + EntityBackendQueries IEntityOrchestrationService.EntityBackendQueries => this.EntityBackendQueries; + + class EntityBackendQueriesImplementation : EntityBackendQueries + { + readonly NetheriteOrchestrationService service; + + public EntityBackendQueriesImplementation(NetheriteOrchestrationService netheriteOrchestrationService) + { + this.service = netheriteOrchestrationService; + } + public override async Task GetEntityAsync(EntityId id, bool includeState = false, bool includeTransient = false, CancellationToken cancellation = default) + { + string instanceId = id.ToString(); + OrchestrationState state = await(await this.service.GetClientAsync().ConfigureAwait(false)) + .GetOrchestrationStateAsync(this.service.GetPartitionId(instanceId.ToString()), instanceId, fetchInput: includeState, false).ConfigureAwait(false); + + return this.GetEntityMetadata(state, includeState, includeTransient); + } + + public override async Task QueryEntitiesAsync(EntityQuery filter, CancellationToken cancellation) + { + string adjustedPrefix = string.IsNullOrEmpty(filter.InstanceIdStartsWith) ? "@" : filter.InstanceIdStartsWith; + + if (adjustedPrefix[0] != '@') + { + return new EntityQueryResult() + { + Results = new List(), + ContinuationToken = null, + }; + } + + var condition = new InstanceQuery() + { + InstanceIdPrefix = adjustedPrefix, + CreatedTimeFrom = filter.LastModifiedFrom, + CreatedTimeTo = filter.LastModifiedTo, + FetchInput = filter.IncludeState, + PrefetchHistory = false, + ExcludeEntities = false, + }; + + List metadataList = new List(); + + InstanceQueryResult result = await (await this.service.GetClientAsync().ConfigureAwait(false)) + .QueryOrchestrationStatesAsync(condition, filter.PageSize ?? 200, filter.ContinuationToken, cancellation).ConfigureAwait(false); + + foreach(var entry in result.Instances) + { + var metadata = this.GetEntityMetadata(entry, filter.IncludeState, filter.IncludeTransient); + if (metadata.HasValue) + { + metadataList.Add(metadata.Value); + } + } + + return new EntityQueryResult() + { + Results = metadataList, + ContinuationToken = result.ContinuationToken, + }; + } + + public override async Task CleanEntityStorageAsync(CleanEntityStorageRequest request = default, CancellationToken cancellation = default) + { + if (!request.ReleaseOrphanedLocks) + { + // there is no need to do anything since deletion is implicit + return new CleanEntityStorageResult(); + } + + var condition = new InstanceQuery() + { + InstanceIdPrefix = "@", + FetchInput = false, + PrefetchHistory = false, + ExcludeEntities = false, + }; + + var client = await this.service.GetClientAsync().ConfigureAwait(false); + + string continuationToken = null; + int orphanedLocksReleased = 0; + + // list all entities (without fetching the input) and for each locked one, + // check if the lock owner is still running. If not, release the lock. + do + { + var page = await client.QueryOrchestrationStatesAsync(condition, 500, continuationToken, cancellation).ConfigureAwait(false); + + // The checks run in parallel for all entities in the page + List tasks = new List(); + foreach (var state in page.Instances) + { + EntityStatus status = ClientEntityHelpers.GetEntityStatus(state.Status); + if (status != null && status.LockedBy != null) + { + tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy)); + } + } + + async Task CheckForOrphanedLockAndFixIt(OrchestrationState state, string lockOwner) + { + uint partitionId = this.service.GetPartitionId(lockOwner); + + OrchestrationState ownerState + = await client.GetOrchestrationStateAsync(partitionId, lockOwner, fetchInput: false, fetchOutput: false); + + bool OrchestrationIsRunning(OrchestrationStatus? status) + => status != null && (status == OrchestrationStatus.Running || status == OrchestrationStatus.Suspended); + + if (!OrchestrationIsRunning(ownerState?.OrchestrationStatus)) + { + // the owner is not a running orchestration. Send a lock release. + EntityMessageEvent eventToSend = ClientEntityHelpers.EmitUnlockForOrphanedLock(state.OrchestrationInstance, lockOwner); + await client.SendTaskOrchestrationMessageBatchAsync( + this.service.GetPartitionId(state.OrchestrationInstance.InstanceId), + new TaskMessage[] { eventToSend.AsTaskMessage() }); + + Interlocked.Increment(ref orphanedLocksReleased); + } + } + + // wait for all of the checks to finish before moving on to the next page. + await Task.WhenAll(tasks); + } + while (continuationToken != null); + + return new CleanEntityStorageResult() + { + EmptyEntitiesRemoved = 0, + OrphanedLocksReleased = orphanedLocksReleased, + }; + } + + EntityMetadata? GetEntityMetadata(OrchestrationState state, bool includeState, bool includeTransient) + { + if (state != null) + { + // determine the status of the entity by deserializing the custom status field + EntityStatus status = ClientEntityHelpers.GetEntityStatus(state.Status); + + if (status?.EntityExists == true || includeTransient) + { + return new EntityMetadata() + { + EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId), + LastModifiedTime = state.CreatedTime, + SerializedState = (includeState && status?.EntityExists == true) ? ClientEntityHelpers.GetEntityState(state.Input) : null, + LockedBy = status?.LockedBy, + BacklogQueueSize = status?.BacklogQueueSize ?? 0, + }; + } + } + + return null; + } + } + /******************************/ // Task orchestration methods /******************************/ - async Task IOrchestrationService.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan receiveTimeout, - CancellationToken cancellationToken) + Task IOrchestrationService.LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken) + => this.LockNextWorkItemInternal(this.OrchestrationWorkItemQueue, receiveTimeout, cancellationToken); + + Task IEntityOrchestrationService.LockNextOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken) + => this.LockNextWorkItemInternal(this.OrchestrationWorkItemQueue, receiveTimeout, cancellationToken); + + Task IEntityOrchestrationService.LockNextEntityWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken) + => this.LockNextWorkItemInternal(this.EntityWorkItemQueue, receiveTimeout, cancellationToken); + + async Task LockNextWorkItemInternal(WorkItemQueue workItemQueue, TimeSpan receiveTimeout, CancellationToken cancellationToken) { - var nextOrchestrationWorkItem = await this.OrchestrationWorkItemQueue.GetNext(receiveTimeout, cancellationToken).ConfigureAwait(false); + var nextOrchestrationWorkItem = await workItemQueue.GetNext(receiveTimeout, cancellationToken).ConfigureAwait(false); if (nextOrchestrationWorkItem != null) { nextOrchestrationWorkItem.MessageBatch.WaitingSince = null; this.workItemTraceHelper.TraceWorkItemStarted( - nextOrchestrationWorkItem.Partition.PartitionId, - WorkItemTraceHelper.WorkItemType.Orchestration, + nextOrchestrationWorkItem.Partition.PartitionId, + nextOrchestrationWorkItem.WorkItemType, nextOrchestrationWorkItem.MessageBatch.WorkItemId, nextOrchestrationWorkItem.MessageBatch.InstanceId, nextOrchestrationWorkItem.Type.ToString(), @@ -857,7 +1056,7 @@ Task IOrchestrationService.CompleteTaskOrchestrationWorkItemAsync( // It's unavoidable by design, but let's at least create a warning. this.workItemTraceHelper.TraceWorkItemDiscarded( partition.PartitionId, - WorkItemTraceHelper.WorkItemType.Orchestration, + orchestrationWorkItem.WorkItemType, messageBatch.WorkItemId, workItem.InstanceId, "", @@ -899,7 +1098,7 @@ Task IOrchestrationService.CompleteTaskOrchestrationWorkItemAsync( this.workItemTraceHelper.TraceWorkItemCompleted( partition.PartitionId, - WorkItemTraceHelper.WorkItemType.Orchestration, + orchestrationWorkItem.WorkItemType, messageBatch.WorkItemId, workItem.InstanceId, batchProcessedEvent.OrchestrationStatus, @@ -1077,5 +1276,15 @@ Task IOrchestrationService.RenewTaskActivityWorkItemLockAs int IOrchestrationService.MaxConcurrentTaskActivityWorkItems => this.Settings.MaxConcurrentActivityFunctions; int IOrchestrationService.TaskActivityDispatcherCount => this.Settings.ActivityDispatcherCount; + + EntityBackendProperties IEntityOrchestrationService.EntityBackendProperties => new EntityBackendProperties() + { + EntityMessageReorderWindow = TimeSpan.Zero, + MaxConcurrentTaskEntityWorkItems = this.Settings.MaxConcurrentEntityFunctions, + MaxEntityOperationBatchSize = this.Settings.MaxEntityOperationBatchSize, + MaximumSignalDelayTime = TimeSpan.MaxValue, + SupportsImplicitEntityDeletion = true, + UseSeparateQueueForEntityWorkItems = this.Settings.UseSeparateQueueForEntityWorkItems, + }; } } \ No newline at end of file diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index 46a14f84..52fc2542 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -18,7 +18,8 @@ namespace DurableTask.Netherite public class NetheriteOrchestrationServiceSettings { /// - /// The name of the taskhub. Matches Microsoft.Azure.WebJobs.Extensions.DurableTask. + /// The name of the taskhub. + /// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions. /// public string HubName { get; set; } @@ -57,19 +58,40 @@ public class NetheriteOrchestrationServiceSettings public Faster.BlobManager.FasterTuningParameters FasterTuningParameters { get; set; } = null; /// - /// Gets or sets the maximum number of work items that can be processed concurrently on a single node. + /// Gets or sets the maximum number of activity work items that can be processed concurrently on a single node. /// The default value is 100. - /// Matches Microsoft.Azure.WebJobs.Extensions.DurableTask. + /// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions. /// public int MaxConcurrentActivityFunctions { get; set; } = 100; /// - /// Gets or sets the maximum number of orchestrations that can be processed concurrently on a single node. + /// Gets or sets the maximum number of orchestration work items that can be processed concurrently on a single node. /// The default value is 100. - /// Matches Microsoft.Azure.WebJobs.Extensions.DurableTask. + /// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions. /// public int MaxConcurrentOrchestratorFunctions { get; set; } = 100; + /// + /// Gets or sets the maximum number of entity work items that can be processed concurrently on a single node. + /// The default value is 100. + /// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions. + /// + public int MaxConcurrentEntityFunctions { get; set; } = 100; + + /// + /// Whether to use separate work item queues for entities and orchestrators. + /// This defaults to false, to maintain compatility with legacy front ends. + /// Newer front ends explicitly set this to true. + /// + public bool UseSeparateQueueForEntityWorkItems { get; set; } = false; + + /// + /// Gets or sets the maximum number of entity operations that are processed as a single batch. + /// The default value is 1000. + /// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions. + /// + public int MaxEntityOperationBatchSize { get; set; } = 1000; + /// /// Gets or sets the number of dispatchers used to dispatch orchestrations. /// diff --git a/src/DurableTask.Netherite/OrchestrationService/OrchestrationWorkItem.cs b/src/DurableTask.Netherite/OrchestrationService/OrchestrationWorkItem.cs index eab9bf6f..11cb5f01 100644 --- a/src/DurableTask.Netherite/OrchestrationService/OrchestrationWorkItem.cs +++ b/src/DurableTask.Netherite/OrchestrationService/OrchestrationWorkItem.cs @@ -32,6 +32,8 @@ public enum ExecutionType { Fresh, ContinueFromHistory, ContinueFromCursor }; public override bool RestoreOriginalRuntimeStateDuringCompletion => false; + public WorkItemTraceHelper.WorkItemType WorkItemType => DurableTask.Core.Common.Entities.IsEntityInstance(this.InstanceId) ? WorkItemTraceHelper.WorkItemType.Entity : WorkItemTraceHelper.WorkItemType.Orchestration; + public OrchestrationWorkItem(Partition partition, OrchestrationMessageBatch messageBatch, List previousHistory = null, string customStatus = null) { this.Partition = partition; diff --git a/src/DurableTask.Netherite/OrchestrationService/Partition.cs b/src/DurableTask.Netherite/OrchestrationService/Partition.cs index 7f641cce..a90343ef 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Partition.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Partition.cs @@ -46,6 +46,7 @@ partial class Partition : TransportAbstraction.IPartition public TransportAbstraction.ISender BatchSender { get; private set; } public WorkItemQueue ActivityWorkItemQueue { get; private set; } public WorkItemQueue OrchestrationWorkItemQueue { get; private set; } + public WorkItemQueue EntityWorkItemQueue { get; private set; } public LoadPublishWorker LoadPublisher { get; private set; } public BatchTimer PendingTimers { get; private set; } @@ -70,6 +71,7 @@ public Partition( string storageAccountName, WorkItemQueue activityWorkItemQueue, WorkItemQueue orchestrationWorkItemQueue, + WorkItemQueue entityWorkItemQueue, LoadPublishWorker loadPublisher, WorkItemTraceHelper workItemTraceHelper) @@ -83,6 +85,7 @@ public Partition( this.StorageAccountName = storageAccountName; this.ActivityWorkItemQueue = activityWorkItemQueue; this.OrchestrationWorkItemQueue = orchestrationWorkItemQueue; + this.EntityWorkItemQueue = entityWorkItemQueue; this.LoadPublisher = loadPublisher; this.TraceHelper = new PartitionTraceHelper(host.TraceHelper.Logger, settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this.PartitionId); this.EventTraceHelper = new EventTraceHelper(host.LoggerFactory, settings.EventLogLevelLimit, this); @@ -326,14 +329,21 @@ public void EnqueueOrchestrationWorkItem(OrchestrationWorkItem item) { this.WorkItemTraceHelper.TraceWorkItemQueued( this.PartitionId, - WorkItemTraceHelper.WorkItemType.Orchestration, + item.WorkItemType, item.MessageBatch.WorkItemId, item.InstanceId, item.Type.ToString(), item.EventCount, WorkItemTraceHelper.FormatMessageIdList(item.MessageBatch.TracedMessages)); - this.OrchestrationWorkItemQueue.Add(item); + if (this.Settings.UseSeparateQueueForEntityWorkItems && item.WorkItemType == WorkItemTraceHelper.WorkItemType.Entity) + { + this.EntityWorkItemQueue.Add(item); + } + else + { + this.OrchestrationWorkItemQueue.Add(item); + } } } } diff --git a/src/DurableTask.Netherite/PartitionState/SessionsState.cs b/src/DurableTask.Netherite/PartitionState/SessionsState.cs index f37f0966..02f25e39 100644 --- a/src/DurableTask.Netherite/PartitionState/SessionsState.cs +++ b/src/DurableTask.Netherite/PartitionState/SessionsState.cs @@ -327,7 +327,7 @@ public override void Process(BatchProcessed evt, EffectTracker effects) { this.Partition.WorkItemTraceHelper.TraceWorkItemDiscarded( this.Partition.PartitionId, - WorkItemTraceHelper.WorkItemType.Orchestration, + DurableTask.Core.Common.Entities.IsEntityInstance(evt.InstanceId) ? WorkItemTraceHelper.WorkItemType.Entity : WorkItemTraceHelper.WorkItemType.Orchestration, evt.WorkItemId, evt.InstanceId, session != null ? this.GetSessionPosition(session) : null, "session was replaced"); diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs index 8d99045c..193f0cd1 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs @@ -974,8 +974,8 @@ async Task ReadIssueLoop() { while (enumerator.MoveNext()) { - if (!string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) - && !enumerator.Current.StartsWith(instanceQuery.InstanceIdPrefix)) + if ((!string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) && !enumerator.Current.StartsWith(instanceQuery.InstanceIdPrefix)) + || (instanceQuery.ExcludeEntities && DurableTask.Core.Common.Entities.IsEntityInstance(enumerator.Current))) { // the instance does not match the prefix continue; @@ -1194,8 +1194,8 @@ void ReportProgress(string status) scanned++; //this.partition.EventDetailTracer?.TraceEventProcessingDetail($"found instance {key.InstanceId}"); - if (string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) - || key.Val.InstanceId.StartsWith(instanceQuery.InstanceIdPrefix)) + if ((string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) || key.Val.InstanceId.StartsWith(instanceQuery.InstanceIdPrefix)) + || (instanceQuery.ExcludeEntities && DurableTask.Core.Common.Entities.IsEntityInstance(key.Val.InstanceId))) { //this.partition.EventDetailTracer?.TraceEventProcessingDetail($"reading instance {key.InstanceId}"); diff --git a/src/DurableTask.Netherite/Tracing/WorkItemTraceHelper.cs b/src/DurableTask.Netherite/Tracing/WorkItemTraceHelper.cs index 8c8e3903..0def4ca5 100644 --- a/src/DurableTask.Netherite/Tracing/WorkItemTraceHelper.cs +++ b/src/DurableTask.Netherite/Tracing/WorkItemTraceHelper.cs @@ -57,7 +57,8 @@ public enum WorkItemType None, Client, Activity, - Orchestration + Orchestration, + Entity, } public enum ClientStatus diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj index a2b96cf6..fdc40efd 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj @@ -9,6 +9,7 @@ + @@ -24,7 +25,7 @@ - + diff --git a/test/LoadGeneratorApp/LoadGeneratorApp.csproj b/test/LoadGeneratorApp/LoadGeneratorApp.csproj index 512d9ccb..6c7c37a9 100644 --- a/test/LoadGeneratorApp/LoadGeneratorApp.csproj +++ b/test/LoadGeneratorApp/LoadGeneratorApp.csproj @@ -5,7 +5,7 @@ - + diff --git a/test/PerformanceTests/PerformanceTests.csproj b/test/PerformanceTests/PerformanceTests.csproj index 38d9dee7..f0a0807d 100644 --- a/test/PerformanceTests/PerformanceTests.csproj +++ b/test/PerformanceTests/PerformanceTests.csproj @@ -6,7 +6,7 @@ - +