From 608a593025e69304d635b6008b3c7df2ab6e2954 Mon Sep 17 00:00:00 2001 From: KarlaCarvajal Date: Mon, 16 Dec 2024 15:11:01 -0600 Subject: [PATCH] fix(sdk-dotnet): fix task worker connection manager (#1191) * Fix task worker connection manager to send hosts and ports available * Fix rebalance when the boostrap sever is down and then up --- sdk-dotnet/Examples/BasicExample/MyWorker.cs | 2 +- sdk-dotnet/Examples/BasicExample/Program.cs | 4 +- .../Worker/VariableMappingTest.cs | 18 +-- sdk-dotnet/LittleHorse.Sdk/Helper/LHHelper.cs | 2 +- sdk-dotnet/LittleHorse.Sdk/LHConfig.cs | 10 +- .../Worker/Internal/LHServerConnection.cs | 28 ++--- .../Internal/LHServerConnectionManager.cs | 88 +++++++------- .../LittleHorse.Sdk/Worker/Internal/LHTask.cs | 105 +++++++++++++++++ .../LittleHorse.Sdk/Worker/LHTaskWorker.cs | 111 +++--------------- .../LittleHorse.Sdk/Worker/LHWorkerContext.cs | 7 +- 10 files changed, 199 insertions(+), 176 deletions(-) create mode 100644 sdk-dotnet/LittleHorse.Sdk/Worker/Internal/LHTask.cs diff --git a/sdk-dotnet/Examples/BasicExample/MyWorker.cs b/sdk-dotnet/Examples/BasicExample/MyWorker.cs index ef3d797aa..49c106989 100644 --- a/sdk-dotnet/Examples/BasicExample/MyWorker.cs +++ b/sdk-dotnet/Examples/BasicExample/MyWorker.cs @@ -4,7 +4,7 @@ namespace Examples.BasicExample { public class MyWorker { - [LHTaskMethod("greet-dotnet")] + [LHTaskMethod("greet")] public string Greeting(string name) { var message = $"Hello team, This is a Dotnet Worker"; diff --git a/sdk-dotnet/Examples/BasicExample/Program.cs b/sdk-dotnet/Examples/BasicExample/Program.cs index 2d91f7951..617a38cab 100644 --- a/sdk-dotnet/Examples/BasicExample/Program.cs +++ b/sdk-dotnet/Examples/BasicExample/Program.cs @@ -51,9 +51,9 @@ static void Main(string[] args) { var loggerFactory = _serviceProvider.GetRequiredService(); var config = GetLHConfig(args, loggerFactory); - + MyWorker executable = new MyWorker(); - var taskWorker = new LHTaskWorker(executable, "greet-dotnet", config); + var taskWorker = new LHTaskWorker(executable, "greet", config); taskWorker.RegisterTaskDef(); diff --git a/sdk-dotnet/LittleHorse.Sdk.Tests/Worker/VariableMappingTest.cs b/sdk-dotnet/LittleHorse.Sdk.Tests/Worker/VariableMappingTest.cs index 0e664e5f4..1f06e4050 100644 --- a/sdk-dotnet/LittleHorse.Sdk.Tests/Worker/VariableMappingTest.cs +++ b/sdk-dotnet/LittleHorse.Sdk.Tests/Worker/VariableMappingTest.cs @@ -39,7 +39,7 @@ public void VariableMapping_WithValidLHTypes_ShouldBeBuiltSuccessfully() foreach (var type in testAllowedTypes) { var variableType = LHMappingHelper.MapDotNetTypeToLHVariableType(type); - TaskDef taskDef = getTaskDefForTest(variableType); + TaskDef? taskDef = getTaskDefForTest(variableType); var result = new VariableMapping(taskDef, position, type, paramName); @@ -53,7 +53,7 @@ public void VariableMapping_WithMismatchTypesInt_ShouldThrowException() Type type1 = typeof(Int64); Type type2 = typeof(string); var variableType = LHMappingHelper.MapDotNetTypeToLHVariableType(type1); - TaskDef taskDef = getTaskDefForTest(variableType); + TaskDef? taskDef = getTaskDefForTest(variableType); var exception = Assert.Throws( () => new VariableMapping(taskDef, 0, type2, "any param name")); @@ -67,7 +67,7 @@ public void VariableMapping_WithMismatchTypeDouble_ShouldThrowException() Type type1 = typeof(double); Type type2 = typeof(Int64); var variableType = LHMappingHelper.MapDotNetTypeToLHVariableType(type1); - TaskDef taskDef = getTaskDefForTest(variableType); + TaskDef? taskDef = getTaskDefForTest(variableType); var exception = Assert.Throws( () => new VariableMapping(taskDef, 0, type2, "any param name")); @@ -81,7 +81,7 @@ public void VariableMapping_WithMismatchTypeString_ShouldThrowException() Type type1 = typeof(string); Type type2 = typeof(double); var variableType = LHMappingHelper.MapDotNetTypeToLHVariableType(type1); - TaskDef taskDef = getTaskDefForTest(variableType); + TaskDef? taskDef = getTaskDefForTest(variableType); var exception = Assert.Throws( () => new VariableMapping(taskDef, 0, type2, "any param name")); @@ -95,7 +95,7 @@ public void VariableMapping_WithMismatchTypeBool_ShouldThrowException() Type type1 = typeof(bool); Type type2 = typeof(string); var variableType = LHMappingHelper.MapDotNetTypeToLHVariableType(type1); - TaskDef taskDef = getTaskDefForTest(variableType); + TaskDef? taskDef = getTaskDefForTest(variableType); var exception = Assert.Throws( () => new VariableMapping(taskDef, 0, type2, "any param name")); @@ -109,7 +109,7 @@ public void VariableMapping_WithMismatchTypeBytes_ShouldThrowException() Type type1 = typeof(byte[]); Type type2 = typeof(string); var variableType = LHMappingHelper.MapDotNetTypeToLHVariableType(type1); - TaskDef taskDef = getTaskDefForTest(variableType); + TaskDef? taskDef = getTaskDefForTest(variableType); var exception = Assert.Throws( () => new VariableMapping(taskDef, 0, type2, "any param name")); @@ -302,11 +302,11 @@ public void VariableMapping_WithAssignJsonStringValue_ShouldReturnCustomObject() Assert.Equal(expectedObject.Cars!.Count, actualObject.Cars!.Count); } - private TaskDef getTaskDefForTest(VariableType type) + private TaskDef? getTaskDefForTest(VariableType type) { var inputVar = new VariableDef(); inputVar.Type = type; - TaskDef taskDef = new TaskDef(); + TaskDef? taskDef = new TaskDef(); TaskDefId taskDefId = new TaskDefId(); taskDef.Id = taskDefId; taskDef.InputVars.Add(inputVar); @@ -317,7 +317,7 @@ private TaskDef getTaskDefForTest(VariableType type) private VariableMapping getVariableMappingForTest(Type type, string paramName, int position) { var variableType = LHMappingHelper.MapDotNetTypeToLHVariableType(type); - TaskDef taskDef = getTaskDefForTest(variableType); + TaskDef? taskDef = getTaskDefForTest(variableType); var variableMapping = new VariableMapping(taskDef, position, type, paramName); diff --git a/sdk-dotnet/LittleHorse.Sdk/Helper/LHHelper.cs b/sdk-dotnet/LittleHorse.Sdk/Helper/LHHelper.cs index 76208d884..1b55390e1 100644 --- a/sdk-dotnet/LittleHorse.Sdk/Helper/LHHelper.cs +++ b/sdk-dotnet/LittleHorse.Sdk/Helper/LHHelper.cs @@ -5,7 +5,7 @@ namespace LittleHorse.Sdk.Helper { public static class LHHelper { - public static WfRunId GetWFRunId(TaskRunSource taskRunSource) + public static WfRunId? GetWfRunId(TaskRunSource taskRunSource) { switch (taskRunSource.TaskRunSourceCase) { diff --git a/sdk-dotnet/LittleHorse.Sdk/LHConfig.cs b/sdk-dotnet/LittleHorse.Sdk/LHConfig.cs index 1723853b2..f1c6ca057 100644 --- a/sdk-dotnet/LittleHorse.Sdk/LHConfig.cs +++ b/sdk-dotnet/LittleHorse.Sdk/LHConfig.cs @@ -116,14 +116,14 @@ private bool IsOAuth } } - public LittleHorseClient GetGrcpClientInstance() + public LittleHorseClient GetGrpcClientInstance() { - return GetGrcpClientInstance(BootstrapHost, BootstrapPort); + return GetGrpcClientInstance(BootstrapHost, BootstrapPort); } - public LittleHorseClient GetGrcpClientInstance(string host, int port) + public LittleHorseClient GetGrpcClientInstance(string host, int port) { - string channelKey = BootstrapServer; + string channelKey = $"{BootstrapProtocol}://{host}:{port}"; if (_createdChannels.ContainsKey(channelKey)) { @@ -208,7 +208,7 @@ public TaskDef GetTaskDef(string taskDefName) { try { - var client = GetGrcpClientInstance(); + var client = GetGrpcClientInstance(); var taskDefId = new TaskDefId() { Name = taskDefName diff --git a/sdk-dotnet/LittleHorse.Sdk/Worker/Internal/LHServerConnection.cs b/sdk-dotnet/LittleHorse.Sdk/Worker/Internal/LHServerConnection.cs index 774119c37..0691e41f9 100644 --- a/sdk-dotnet/LittleHorse.Sdk/Worker/Internal/LHServerConnection.cs +++ b/sdk-dotnet/LittleHorse.Sdk/Worker/Internal/LHServerConnection.cs @@ -8,25 +8,25 @@ namespace LittleHorse.Sdk.Worker.Internal { public class LHServerConnection : IDisposable { - private LHServerConnectionManager _connectionManager; - private LHHostInfo _hostInfo; + private readonly LHServerConnectionManager _connectionManager; + private readonly LHHostInfo _hostInfo; private bool _running; - private LittleHorseClient _client; - private AsyncDuplexStreamingCall _call; - private ILogger? _logger; + private readonly LittleHorseClient _client; + private readonly AsyncDuplexStreamingCall _call; + private readonly ILogger? _logger; - public LHHostInfo HostInfo { get { return _hostInfo; } } + public LHHostInfo HostInfo => _hostInfo; public LHServerConnection(LHServerConnectionManager connectionManager, LHHostInfo hostInfo) { _connectionManager = connectionManager; _hostInfo = hostInfo; _logger = LHLoggerFactoryProvider.GetLogger>(); - _client = _connectionManager.Config.GetGrcpClientInstance(); + _client = _connectionManager.Config.GetGrpcClientInstance(hostInfo.Host, hostInfo.Port); _call = _client.PollTask(); } - public void Connect() + public void Open() { _running = true; Task.Run(RequestMoreWorkAsync); @@ -48,12 +48,12 @@ private async Task RequestMoreWorkAsync() if (taskToDo.Result != null) { var scheduledTask = taskToDo.Result; - var wFRunId = LHHelper.GetWFRunId(scheduledTask.Source); - _logger?.LogDebug($"Received task schedule request for wfRun {wFRunId.Id}"); + var wFRunId = LHHelper.GetWfRunId(scheduledTask.Source); + _logger?.LogDebug($"Received task schedule request for wfRun {wFRunId?.Id}"); - _connectionManager.SubmitTaskForExecution(scheduledTask, _client); + _connectionManager.SubmitTaskForExecution(scheduledTask); - _logger?.LogDebug($"Scheduled task on threadpool for wfRun {wFRunId.Id}"); + _logger?.LogDebug($"Scheduled task on threadpool for wfRun {wFRunId?.Id}"); } else { @@ -82,9 +82,9 @@ public void Dispose() _running = false; } - public bool IsSame(LHHostInfo hostInfoToCompare) + public bool IsSame(string host, int port) { - return _hostInfo.Host.Equals(hostInfoToCompare.Host) && _hostInfo.Port == hostInfoToCompare.Port; + return _hostInfo.Host.Equals(host) && _hostInfo.Port == port; } } } diff --git a/sdk-dotnet/LittleHorse.Sdk/Worker/Internal/LHServerConnectionManager.cs b/sdk-dotnet/LittleHorse.Sdk/Worker/Internal/LHServerConnectionManager.cs index 381a3d70e..f7677be94 100644 --- a/sdk-dotnet/LittleHorse.Sdk/Worker/Internal/LHServerConnectionManager.cs +++ b/sdk-dotnet/LittleHorse.Sdk/Worker/Internal/LHServerConnectionManager.cs @@ -17,35 +17,25 @@ public class LHServerConnectionManager : IDisposable private const int BALANCER_SLEEP_TIME = 5000; private const int MAX_REPORT_RETRIES = 5; - private LHConfig _config; - private MethodInfo _taskMethod; - private TaskDef _taskDef; - private List _mappings; - private T _executable; - private ILogger? _logger; - private LittleHorseClient _bootstrapClient; + private readonly LHConfig _config; + private readonly ILogger? _logger; + private readonly LittleHorseClient _bootstrapClient; private bool _running; private List> _runningConnections; - private Thread _rebalanceThread; - private SemaphoreSlim _semaphore; + private readonly Thread _rebalanceThread; + private readonly SemaphoreSlim _semaphore; + private readonly LHTask _task; - public LHConfig Config { get { return _config; } } - public TaskDef TaskDef { get { return _taskDef; } } + public LHConfig Config => _config; + public TaskDef TaskDef => _task.TaskDef!; public LHServerConnectionManager(LHConfig config, - MethodInfo taskMethod, - TaskDef taskDef, - List mappings, - T executable) + LHTask task) { _config = config; - _taskMethod = taskMethod; - _taskDef = taskDef; - _mappings = mappings; - _executable = executable; _logger = LHLoggerFactoryProvider.GetLogger>(); - - _bootstrapClient = config.GetGrcpClientInstance(); + _task = task; + _bootstrapClient = config.GetGrpcClientInstance(); _running = false; _runningConnections = new List>(); @@ -85,22 +75,22 @@ private void DoHeartBeat() { var request = new RegisterTaskWorkerRequest { - TaskDefId = _taskDef.Id, + TaskDefId = _task.TaskDef!.Id, TaskWorkerId = _config.WorkerId, }; var response = _bootstrapClient.RegisterTaskWorker(request); - HandleRegisterTaskWorkResponse(response); - + HandleRegisterTaskWorkerResponse(response); } catch (Exception ex) { _logger?.LogError(ex, $"Failed contacting bootstrap host {_config.BootstrapHost}:{_config.BootstrapPort}"); + _runningConnections = new List>(); } } - private void HandleRegisterTaskWorkResponse(RegisterTaskWorkerResponse response) + private void HandleRegisterTaskWorkerResponse(RegisterTaskWorkerResponse response) { response.YourHosts.ToList().ForEach(host => { @@ -109,9 +99,9 @@ private void HandleRegisterTaskWorkResponse(RegisterTaskWorkerResponse response) try { var newConnection = new LHServerConnection(this, host); - newConnection.Connect(); + newConnection.Open(); _runningConnections.Add(newConnection); - _logger?.LogInformation($"Adding connection to: {host.Host}:{host.Port} for task '{_taskDef.Id}'"); + _logger?.LogInformation($"Adding connection to: {host.Host}:{host.Port} for task '{_task.TaskDef!.Id}'"); } catch (IOException ex) { @@ -125,7 +115,7 @@ private void HandleRegisterTaskWorkResponse(RegisterTaskWorkerResponse response) for (int i = lastIndexOfRunningConnection; i >= 0; i--) { var runningThread = _runningConnections[i]; - + if (!ShouldBeRunning(runningThread, response.YourHosts)) { _logger?.LogInformation($"Stopping worker thread for host {runningThread.HostInfo.Host} : {runningThread.HostInfo.Port}"); @@ -138,51 +128,56 @@ private void HandleRegisterTaskWorkResponse(RegisterTaskWorkerResponse response) private bool ShouldBeRunning(LHServerConnection runningThread, RepeatedField hosts) { - return hosts.ToList().Any(host => runningThread.IsSame(host)); + return hosts.ToList().Any(host => runningThread.IsSame(host.Host, host.Port)); } private bool IsAlreadyRunning(LHHostInfo host) { - return _runningConnections.Any(conn => conn.IsSame(host)); + return _runningConnections.Any(conn => conn.IsSame(host.Host, host.Port)); } - public async void SubmitTaskForExecution(ScheduledTask scheduledTask, LittleHorseClient client) + public async void SubmitTaskForExecution(ScheduledTask scheduledTask) { await _semaphore.WaitAsync(); - DoTask(scheduledTask, client); + DoTask(scheduledTask); } - private void DoTask(ScheduledTask scheduledTask, LittleHorseClient client) + private void DoTask(ScheduledTask scheduledTask) { ReportTaskRun result = ExecuteTask(scheduledTask, LHMappingHelper.MapDateTimeFromProtoTimeStamp(scheduledTask.CreatedAt)); - _semaphore.Release(); - var wfRunId = LHHelper.GetWFRunId(scheduledTask.Source); + var wfRunId = LHHelper.GetWfRunId(scheduledTask.Source); try { var retriesLeft = MAX_REPORT_RETRIES; - _logger?.LogDebug($"Going to report task for wfRun {wfRunId.Id}"); + _logger?.LogDebug($"Going to report task for wfRun {wfRunId?.Id}"); Policy.Handle().WaitAndRetry(MAX_REPORT_RETRIES, retryAttempt => TimeSpan.FromSeconds(5), onRetry: (exception, timeSpan, retryCount, context) => - { - --retriesLeft; - _logger?.LogDebug($"Failed to report task for wfRun {wfRunId}: {exception.Message}. Retries left: {retriesLeft}"); - _logger?.LogDebug($"Retrying reportTask rpc on taskRun {LHHelper.TaskRunIdToString(result.TaskRunId)}"); - }).Execute(() => RunReportTask(result)); + { + --retriesLeft; + _logger?.LogDebug( + $"Failed to report task for wfRun {wfRunId}: {exception.Message}. Retries left: {retriesLeft}"); + _logger?.LogDebug( + $"Retrying reportTask rpc on taskRun {LHHelper.TaskRunIdToString(result.TaskRunId)}"); + }).Execute(() => RunReportTask(result)); } catch (Exception ex) { _logger?.LogDebug($"Failed to report task for wfRun {wfRunId}: {ex.Message}. No retries left."); } + finally + { + _semaphore.Release(); + } } private void RunReportTask(ReportTaskRun reportedTask) { - var response = _bootstrapClient.ReportTask(reportedTask); + _bootstrapClient.ReportTask(reportedTask); } private ReportTaskRun ExecuteTask(ScheduledTask scheduledTask, DateTime? scheduleTime) @@ -278,14 +273,15 @@ private ReportTaskRun ExecuteTask(ScheduledTask scheduledTask, DateTime? schedul private object? Invoke(ScheduledTask scheduledTask, LHWorkerContext workerContext) { - var inputs = _mappings.Select(mapping => mapping.Assign(scheduledTask, workerContext)).ToArray(); + var inputs = _task.TaskMethodMappings.Select(mapping => mapping.Assign(scheduledTask, workerContext)).ToArray(); - return _taskMethod.Invoke(_executable, inputs); + return _task.TaskMethod!.Invoke(_task.Executable, inputs); } - public void CloseConnection(LHServerConnection connection) + public void CloseConnection(string host, int port) { - var currConn = _runningConnections.Where(c => c.IsSame(connection.HostInfo)).FirstOrDefault(); + var currConn = _runningConnections.FirstOrDefault(c => + c.IsSame(host, port)); if (currConn != null) { diff --git a/sdk-dotnet/LittleHorse.Sdk/Worker/Internal/LHTask.cs b/sdk-dotnet/LittleHorse.Sdk/Worker/Internal/LHTask.cs new file mode 100644 index 000000000..4d809f723 --- /dev/null +++ b/sdk-dotnet/LittleHorse.Sdk/Worker/Internal/LHTask.cs @@ -0,0 +1,105 @@ +using System.Reflection; +using LittleHorse.Common.Proto; +using LittleHorse.Sdk.Exceptions; +using static LittleHorse.Common.Proto.LittleHorse; + +namespace LittleHorse.Sdk.Worker.Internal; + +public class LHTask +{ + private TaskDef? _taskDef; + private MethodInfo? _taskMethod; + private List _taskMethodMappings; + private readonly string _taskDefName; + private LHTaskSignature? _taskSignature; + private readonly T _executable; + private readonly LittleHorseClient _lhClient; + + public MethodInfo? TaskMethod => _taskMethod; + public List TaskMethodMappings => _taskMethodMappings; + public T Executable => _executable; + public string TaskDefName => _taskDefName; + public TaskDef? TaskDef => _taskDef; + + public LHTask(T executable, string taskDefName, LittleHorseClient lhClient) + { + _taskDefName = taskDefName; + _executable = executable; + _lhClient = lhClient; + _taskMethodMappings = new List(); + } + + internal void PrepareLHTaskMethod() + { + _taskSignature = new LHTaskSignature(_taskDefName, _executable); + _taskMethod = _taskSignature.TaskMethod; + + ValidateTaskMethodParameters(_taskMethod, _taskSignature); + _taskMethodMappings = CreateVariableMappings(_taskMethod, _taskSignature); + } + + private void ValidateTaskMethodParameters(MethodInfo taskMethod, LHTaskSignature taskSignature) + { + _taskDef = GetTaskDef(); + if (taskSignature.HasWorkerContextAtEnd) + { + if (taskSignature.TaskMethod.GetParameters().Length - 1 != _taskDef.InputVars.Count) + { + throw new LHTaskSchemaMismatchException("Number of task method params doesn't match number of taskdef params!"); + } + } + else + { + if (taskMethod.GetParameters().Length != _taskDef.InputVars.Count) + { + throw new LHTaskSchemaMismatchException("Number of task method params doesn't match number of taskdef params!"); + } + } + } + + private List CreateVariableMappings(MethodInfo taskMethod, LHTaskSignature taskSignature) + { + var mappings = new List(); + + var taskParams = taskMethod.GetParameters(); + _taskDef = GetTaskDef(); + + for (int index = 0; index < _taskDef?.InputVars.Count; index++) + { + var taskParam = taskParams[index]; + + if (taskParam.ParameterType.IsAssignableFrom(typeof(LHWorkerContext))) + { + throw new LHTaskSchemaMismatchException("Can only have WorkerContext after all required taskDef params."); + } + + mappings.Add(CreateVariableMapping(_taskDef, index, taskParam.ParameterType, taskParam.Name)); + } + + if (taskSignature.HasWorkerContextAtEnd) + { + mappings.Add(CreateVariableMapping(_taskDef, taskParams.Count() - 1, typeof(LHWorkerContext), null)); + } + + return mappings; + } + + private VariableMapping CreateVariableMapping(TaskDef? taskDef, int index, Type type, string? paramName) + { + return new VariableMapping(taskDef!, index, type, paramName); + } + + internal TaskDef GetTaskDef() + { + if (_taskDef is null) + { + var taskDefId = new TaskDefId + { + Name = _taskDefName + }; + _taskDef = _lhClient.GetTaskDef(taskDefId); + } + + return _taskDef; + } +} \ No newline at end of file diff --git a/sdk-dotnet/LittleHorse.Sdk/Worker/LHTaskWorker.cs b/sdk-dotnet/LittleHorse.Sdk/Worker/LHTaskWorker.cs index 41db02a5d..bcf69d4c1 100644 --- a/sdk-dotnet/LittleHorse.Sdk/Worker/LHTaskWorker.cs +++ b/sdk-dotnet/LittleHorse.Sdk/Worker/LHTaskWorker.cs @@ -1,5 +1,4 @@ -using System.Reflection; -using Grpc.Core; +using Grpc.Core; using LittleHorse.Common.Proto; using LittleHorse.Sdk.Exceptions; using LittleHorse.Sdk.Helper; @@ -18,27 +17,20 @@ namespace LittleHorse.Sdk.Worker /// public class LHTaskWorker { - private LHConfig _config; - private ILogger>? _logger; - private T _executable; - private TaskDef? _taskDef; - private MethodInfo? _taskMethod; - private List _mappings; - private LHTaskSignature? _taskSignature; + private readonly LHConfig _config; + private readonly ILogger>? _logger; private LHServerConnectionManager? _manager; - private string _taskDefName; - private LittleHorseClient _grpcClient; + private readonly LittleHorseClient _lhClient; + private readonly LHTask _task; - public string TaskDefName { get => _taskDefName; } + public string TaskDefName => _task.TaskDefName; public LHTaskWorker(T executable, string taskDefName, LHConfig config) { _config = config; _logger = LHLoggerFactoryProvider.GetLogger>(); - _executable = executable; - _mappings = new List(); - _taskDefName = taskDefName; - _grpcClient = _config.GetGrcpClientInstance(); + _lhClient = _config.GetGrpcClientInstance(); + _task = new LHTask(executable, taskDefName, _lhClient); } /// @@ -53,17 +45,11 @@ public void Start() { if (!TaskDefExists()) { - throw new LHMisconfigurationException($"Couldn't find TaskDef: {_taskDefName}"); + throw new LHMisconfigurationException($"Couldn't find TaskDef: {_task.TaskDefName}"); } - _taskSignature = new LHTaskSignature(_taskDefName, _executable); - _taskMethod = _taskSignature.TaskMethod; - - ValidateTaskMethodParameters(_taskMethod, _taskSignature); - _mappings = CreateVariableMappings(_taskMethod, _taskSignature); - - _manager = new LHServerConnectionManager(_config, _taskMethod, GetTaskDef(), _mappings, _executable); - + _task.PrepareLHTaskMethod(); + _manager = new LHServerConnectionManager(_config, _task); _manager.Start(); } @@ -79,11 +65,7 @@ public bool TaskDefExists() { try { - var taskDefId = new TaskDefId - { - Name = _taskDefName, - }; - _grpcClient.GetTaskDef(taskDefId); + _task.GetTaskDef(); return true; } @@ -124,15 +106,15 @@ public void RegisterTaskDef() /// private void RegisterTaskDef(bool swallowAlreadyExists) { - _logger?.LogInformation($"Creating TaskDef: {_taskDefName}"); + _logger?.LogInformation($"Creating TaskDef: {_task.TaskDefName}"); try { - var signature = new LHTaskSignature(_taskDefName, _executable); + var signature = new LHTaskSignature(_task.TaskDefName, _task.Executable); var request = new PutTaskDefRequest { - Name = _taskDefName + Name = _task.TaskDefName }; foreach (var lhMethodParam in signature.LhMethodParams) @@ -151,7 +133,7 @@ private void RegisterTaskDef(bool swallowAlreadyExists) request.OutputSchema = signature.TaskDefOutputSchema; } - var response = _grpcClient.PutTaskDef(request); + var response = _lhClient.PutTaskDef(request); _logger?.LogInformation($"Created TaskDef:\n{LHMappingHelper.MapProtoToJson(response)}"); } @@ -159,7 +141,7 @@ private void RegisterTaskDef(bool swallowAlreadyExists) { if (swallowAlreadyExists && ex.StatusCode == StatusCode.AlreadyExists) { - _logger?.LogInformation($"TaskDef {_taskDefName} already exists!"); + _logger?.LogInformation($"TaskDef {_task.TaskDefName} already exists!"); } else { @@ -167,64 +149,5 @@ private void RegisterTaskDef(bool swallowAlreadyExists) } } } - - private TaskDef GetTaskDef() - { - if (_taskDef is null) - { - _taskDef = _config.GetTaskDef(_taskDefName); - } - - return _taskDef; - } - - private void ValidateTaskMethodParameters(MethodInfo taskMethod, LHTaskSignature taskSignature) - { - if (taskSignature.HasWorkerContextAtEnd) - { - if (taskSignature.TaskMethod.GetParameters().Length - 1 != GetTaskDef().InputVars.Count) - { - throw new LHTaskSchemaMismatchException("Number of task method params doesn't match number of taskdef params!"); - } - } - else - { - if (taskMethod.GetParameters().Length != GetTaskDef().InputVars.Count) - { - throw new LHTaskSchemaMismatchException("Number of task method params doesn't match number of taskdef params!"); - } - } - } - - private List CreateVariableMappings(MethodInfo taskMethod, LHTaskSignature taskSignature) - { - var mappings = new List(); - - var taskParams = taskMethod.GetParameters(); - - for (int index = 0; index < GetTaskDef().InputVars.Count; index++) - { - var taskParam = taskParams[index]; - - if (taskParam.ParameterType.IsAssignableFrom(typeof(LHWorkerContext))) - { - throw new LHTaskSchemaMismatchException("Can only have WorkerContext after all required taskDef params."); - } - - mappings.Add(CreateVariableMapping(GetTaskDef(), index, taskParam.ParameterType, taskParam.Name)); - } - - if (taskSignature.HasWorkerContextAtEnd) - { - mappings.Add(CreateVariableMapping(GetTaskDef(), taskParams.Count() - 1, typeof(LHWorkerContext), null)); - } - - return mappings; - } - - private VariableMapping CreateVariableMapping(TaskDef taskDef, int index, Type type, string? paramName) - { - return new VariableMapping(taskDef, index, type, paramName); - } } } diff --git a/sdk-dotnet/LittleHorse.Sdk/Worker/LHWorkerContext.cs b/sdk-dotnet/LittleHorse.Sdk/Worker/LHWorkerContext.cs index 4850f59b7..75991a537 100644 --- a/sdk-dotnet/LittleHorse.Sdk/Worker/LHWorkerContext.cs +++ b/sdk-dotnet/LittleHorse.Sdk/Worker/LHWorkerContext.cs @@ -1,5 +1,4 @@ -using System.Text; -using LittleHorse.Common.Proto; +using LittleHorse.Common.Proto; using LittleHorse.Sdk.Helper; namespace LittleHorse.Sdk.Worker @@ -35,7 +34,7 @@ public LHWorkerContext(ScheduledTask scheduleTask, DateTime? scheduleDateTime) /// public WfRunId GetWfRunId() { - return LHHelper.GetWFRunId(_scheduleTask.Source); + return LHHelper.GetWfRunId(_scheduleTask.Source)!; } /// @@ -44,7 +43,7 @@ public WfRunId GetWfRunId() /// /// @return a `NodeRunIdPb` protobuf class with the ID from the executed NodeRun. /// - public NodeRunId GetNodeRunId() + public NodeRunId? GetNodeRunId() { TaskRunSource source = _scheduleTask.Source; switch (source.TaskRunSourceCase) {