diff --git a/canary/src/main/java/io/littlehorse/canary/Main.java b/canary/src/main/java/io/littlehorse/canary/Main.java index 61291551f..55085d54c 100644 --- a/canary/src/main/java/io/littlehorse/canary/Main.java +++ b/canary/src/main/java/io/littlehorse/canary/Main.java @@ -67,8 +67,6 @@ private static void initialize(final String[] args) throws IOException { lhConfig.getApiBootstrapHost(), lhConfig.getApiBootstrapPort(), lhClient.getServerVersion(), - canaryConfig.getMetronomeServerId(), - canaryConfig.getMetronomeServerDataplaneId(), canaryConfig.getTopicName(), canaryConfig.toKafkaConfig().toMap(), canaryConfig.getMetronomeBeatExtraTags()); diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporter.java b/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporter.java index 7a1b5d069..240a6163f 100644 --- a/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporter.java +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporter.java @@ -42,8 +42,6 @@ private static List toTags(final MetricKey key) { final List tags = new ArrayList<>(); tags.add(Tag.of("server", "%s:%s".formatted(key.getServerHost(), key.getServerPort()))); tags.add(Tag.of("server_version", key.getServerVersion())); - tags.add(Tag.of("server_id", key.getServerId())); - tags.add(Tag.of("dataplane_id", key.getDataplaneId())); tags.addAll(key.getTagsList().stream() .map(tag -> Tag.of(tag.getKey(), tag.getValue())) .toList()); diff --git a/canary/src/main/java/io/littlehorse/canary/aggregator/topology/MetricsTopology.java b/canary/src/main/java/io/littlehorse/canary/aggregator/topology/MetricsTopology.java index 477c718a2..d89fa61e1 100644 --- a/canary/src/main/java/io/littlehorse/canary/aggregator/topology/MetricsTopology.java +++ b/canary/src/main/java/io/littlehorse/canary/aggregator/topology/MetricsTopology.java @@ -172,8 +172,6 @@ private static MetricKey buildMetricKey(final BeatKey key, final String id) { .setServerVersion(key.getServerVersion()) .setServerPort(key.getServerPort()) .setServerHost(key.getServerHost()) - .setServerId(key.getServerId()) - .setDataplaneId(key.getDataplaneId()) .setId("canary_%s".formatted(id)); if (key.hasStatus() && !Strings.isNullOrEmpty(key.getStatus())) { @@ -206,8 +204,6 @@ private static BeatKey removeWfId(final BeatKey key, final BeatValue value) { .setServerHost(key.getServerHost()) .setServerPort(key.getServerPort()) .setStatus(key.getStatus()) - .setServerId(key.getServerId()) - .setDataplaneId(key.getDataplaneId()) .addAllTags(key.getTagsList()) .build(); } diff --git a/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java b/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java index c952245e4..4baafcc5d 100644 --- a/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java +++ b/canary/src/main/java/io/littlehorse/canary/config/CanaryConfig.java @@ -32,8 +32,6 @@ public class CanaryConfig implements Config { public static final String METRONOME_GET_RETRIES = "metronome.get.retries"; public static final String METRONOME_WORKER_ENABLE = "metronome.worker.enable"; public static final String METRONOME_DATA_PATH = "metronome.data.path"; - public static final String METRONOME_SERVER_ID = "metronome.server.id"; - public static final String METRONOME_SERVER_DATAPLANE_ID = "metronome.server.dataplane.id"; public static final String METRONOME_BEAT_EXTRA_TAGS = "metronome.beat.extra.tags"; public static final String METRONOME_BEAT_EXTRA_TAGS_PREFIX = "%s.".formatted(METRONOME_BEAT_EXTRA_TAGS); @@ -72,7 +70,7 @@ public KafkaConfig toKafkaConfig() { return new KafkaConfig(configs); } - private String getConfig(final String configName) { + public String getConfig(final String configName) { final Object value = configs.get(configName); if (value == null) { throw new IllegalArgumentException("Configuration 'lh.canary." + configName + "' not found"); @@ -187,12 +185,4 @@ public int getWorkflowVersion() { public String getMetronomeDataPath() { return getConfig(METRONOME_DATA_PATH); } - - public String getMetronomeServerId() { - return getConfig(METRONOME_SERVER_ID); - } - - public String getMetronomeServerDataplaneId() { - return getConfig(METRONOME_SERVER_DATAPLANE_ID); - } } diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/internal/BeatProducer.java b/canary/src/main/java/io/littlehorse/canary/metronome/internal/BeatProducer.java index 0ba4e8ae9..96671335c 100644 --- a/canary/src/main/java/io/littlehorse/canary/metronome/internal/BeatProducer.java +++ b/canary/src/main/java/io/littlehorse/canary/metronome/internal/BeatProducer.java @@ -25,23 +25,17 @@ public class BeatProducer { private final int lhServerPort; private final String lhServerVersion; private final String topicName; - private final String lhServerId; - private final String dataplaneId; public BeatProducer( final String lhServerHost, final int lhServerPort, final String lhServerVersion, - final String lhServerId, - final String dataplaneId, final String topicName, final Map producerConfig, final Map extraTags) { this.lhServerHost = lhServerHost; this.lhServerPort = lhServerPort; this.lhServerVersion = lhServerVersion; - this.lhServerId = lhServerId; - this.dataplaneId = dataplaneId; this.topicName = topicName; this.extraTags = extraTags; @@ -95,8 +89,6 @@ private BeatKey buildKey(final String id, final BeatType type, final String stat .setServerHost(lhServerHost) .setServerPort(lhServerPort) .setServerVersion(lhServerVersion) - .setServerId(lhServerId) - .setDataplaneId(dataplaneId) .setId(id) .setType(type); diff --git a/canary/src/main/proto/beats.proto b/canary/src/main/proto/beats.proto index 9675ce3b9..b0a2e7261 100644 --- a/canary/src/main/proto/beats.proto +++ b/canary/src/main/proto/beats.proto @@ -27,8 +27,6 @@ message BeatKey { optional string status = 5; optional string id = 6; repeated Tag tags = 7; - string server_id = 8; - string dataplane_id = 9; } message BeatValue { diff --git a/canary/src/main/proto/metrics.proto b/canary/src/main/proto/metrics.proto index 3b49d7b0e..90abbe8fc 100644 --- a/canary/src/main/proto/metrics.proto +++ b/canary/src/main/proto/metrics.proto @@ -12,8 +12,6 @@ message MetricKey { string server_version = 3; string id = 4; repeated Tag tags = 5; - string server_id = 6; - string dataplane_id = 7; } message MetricValue { diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java index a603b14c0..b73729d6a 100644 --- a/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/internal/MetricStoreExporterTest.java @@ -79,7 +79,7 @@ public void shouldScrapeSimpleMetric() throws InterruptedException { assertThat(prometheusRegistry.scrape()) .contains( - "my_metric{custom_tag=\"custom_value\",dataplane_id=\"my-dataplane\",server=\"localhost:2023\",server_id=\"my_server\",server_version=\"test\"} 1.0"); + "my_metric{custom_tag=\"custom_value\",server=\"localhost:2023\",server_version=\"test\"} 1.0"); } private static MetricKey createMetricsKey(List tags) { @@ -92,8 +92,6 @@ private static MetricKey createMetricsKey(String host, List tags) { .setServerPort(2023) .setServerVersion("test") .setId("my_metric") - .setServerId("my_server") - .setDataplaneId("my-dataplane") .addAllTags(tags) .build(); } @@ -126,7 +124,7 @@ void printMetricsWithTwoDifferentServers() throws InterruptedException { assertThat(prometheusRegistry.scrape()) .isEqualTo( "# HELP my_metric \n" + "# TYPE my_metric gauge\n" - + "my_metric{custom_tag=\"custom_value\",dataplane_id=\"my-dataplane\",server=\"localhost2:2023\",server_id=\"my_server\",server_version=\"test\"} 1.0\n" - + "my_metric{custom_tag=\"custom_value\",dataplane_id=\"my-dataplane\",server=\"localhost:2023\",server_id=\"my_server\",server_version=\"test\"} 1.0\n"); + + "my_metric{custom_tag=\"custom_value\",server=\"localhost2:2023\",server_version=\"test\"} 1.0\n" + + "my_metric{custom_tag=\"custom_value\",server=\"localhost:2023\",server_version=\"test\"} 1.0\n"); } } diff --git a/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java b/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java index f94aa35cf..0865547b3 100644 --- a/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java +++ b/canary/src/test/java/io/littlehorse/canary/aggregator/topology/MetricsTopologyTest.java @@ -30,8 +30,6 @@ class MetricsTopologyTest { public static final String HOST_2 = "localhost2"; public static final int PORT_2 = 2024; - public static final String SERVER_ID = "LH"; - public static final String DATAPLANE_ID = "DP"; private TopologyTestDriver testDriver; private TestInputTopic inputTopic; @@ -62,12 +60,8 @@ private static MetricKey newMetricKey(String id, String status, Map tags) { - MetricKey.Builder builder = MetricKey.newBuilder() - .setServerHost(host) - .setServerPort(port) - .setId(id) - .setServerId(SERVER_ID) - .setDataplaneId(DATAPLANE_ID); + MetricKey.Builder builder = + MetricKey.newBuilder().setServerHost(host).setServerPort(port).setId(id); if (status != null) { builder.addTags(Tag.newBuilder().setKey("status").setValue(status).build()); @@ -111,8 +105,6 @@ private static TestRecord newBeat( .setServerHost(host) .setServerPort(port) .setType(type) - .setServerId(SERVER_ID) - .setDataplaneId(DATAPLANE_ID) .setId(id); BeatValue.Builder valueBuilder = BeatValue.newBuilder().setTime(Timestamps.now()); diff --git a/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java b/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java index f34fa3fa5..75d20f053 100644 --- a/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java +++ b/canary/src/test/java/io/littlehorse/canary/config/CanaryConfigTest.java @@ -72,8 +72,8 @@ void throwsExceptionIfConfigurationIsNotFound() { CanaryConfig canaryConfig = new CanaryConfig(Map.of()); IllegalArgumentException result = - assertThrows(IllegalArgumentException.class, canaryConfig::getMetronomeServerId); + assertThrows(IllegalArgumentException.class, () -> canaryConfig.getConfig("my.config")); - assertThat(result.getMessage()).isEqualTo("Configuration 'lh.canary.metronome.server.id' not found"); + assertThat(result.getMessage()).isEqualTo("Configuration 'lh.canary.my.config' not found"); } } diff --git a/docs/CANARY_CONFIGURATIONS.md b/docs/CANARY_CONFIGURATIONS.md index 2c32f7803..4c8086278 100644 --- a/docs/CANARY_CONFIGURATIONS.md +++ b/docs/CANARY_CONFIGURATIONS.md @@ -15,8 +15,6 @@ * [`lh.canary.metronome.get.retries`](#lhcanarymetronomegetretries) * [`lh.canary.metronome.data.path`](#lhcanarymetronomedatapath) * [`lh.canary.metronome.beat.extra.tags.`](#lhcanarymetronomebeatextratagsadditional-tag) - * [`lh.canary.metronome.server.id`](#lhcanarymetronomeserverid) - * [`lh.canary.metronome.server.dataplane.id`](#lhcanarymetronomeserverdataplaneid) * [Kafka Configurations](#kafka-configurations) * [LH Client Configurations](#lh-client-configurations) * [Task Worker](#task-worker) @@ -168,28 +166,13 @@ For example: `lh.canary.metronome.beat.extra.tags.my_tag=my-value`. - **Default:** null - **Importance:** low -#### `lh.canary.metronome.server.id` - -Add the tag server id the prometheus metrics (**mandatory**). -For example: `lh.canary.metronome.server.id=lh`. - -- **Type:** string -- **Default:** null -- **Importance:** high - -#### `lh.canary.metronome.server.dataplane.id` - -Add the tag dataplane id the prometheus metrics (**mandatory**). -For example: `lh.canary.metronome.server.dataplane.id=my-cluster-aws`. - -- **Type:** string -- **Default:** null -- **Importance:** high +Limitations: Please note that it is not recommended to change the tag set once the canary has been started. +For more information check https://github.com/prometheus/client_java/issues/696. ### Kafka Configurations LH Canary supports all kafka configurations. Use the prefix `lh.canary.kafka` and append the kafka config. -Examples +Examples: - For `security.protocol`, use `lh.canary.kafka.security.protocol`. - For `bootstrap.servers`, use `lh.canary.kafka.bootstrap.servers`. diff --git a/docs/docs/05-developer-guide/05-task-worker-development.md b/docs/docs/05-developer-guide/05-task-worker-development.md index c1ee6ec8d..440fb30f7 100644 --- a/docs/docs/05-developer-guide/05-task-worker-development.md +++ b/docs/docs/05-developer-guide/05-task-worker-development.md @@ -178,7 +178,7 @@ The Go SDK currently (as of `0.11.0`) does not yet support throwing `LHTaskExcep ```python -from littlehorse.exceptions import LHTaskExceptio +from littlehorse.exceptions import LHTaskException async def ship_item(item_sku: str) -> str: if is_out_of_stock(): diff --git a/sdk-dotnet/LittleHorse.Sdk.Tests/Helper/LHMappingHelperTest.cs b/sdk-dotnet/LittleHorse.Sdk.Tests/Helper/LHMappingHelperTest.cs index 98fdc081b..96792231d 100644 --- a/sdk-dotnet/LittleHorse.Sdk.Tests/Helper/LHMappingHelperTest.cs +++ b/sdk-dotnet/LittleHorse.Sdk.Tests/Helper/LHMappingHelperTest.cs @@ -70,7 +70,7 @@ public void LHHelper_WithSystemBytesVariableType_ShouldReturnLHVariableBytesType } [Fact] - public void LHHelper_WithSystemArrayObjectVariableType_ShouldReturnLHVariableJsonArrType() + public void LHHelper_WithIlistObjectType_ShouldReturnLHVariableJsonArrType() { var test_allowed_types = new List() { typeof(List), typeof(List), typeof(List)}; @@ -85,7 +85,8 @@ public void LHHelper_WithSystemArrayObjectVariableType_ShouldReturnLHVariableJso [Fact] public void LHHelper_WithNotAllowedSystemVariableTypes_ShouldReturnLHJsonObj() { - var test_not_allowed_types = new List() { typeof(decimal), typeof(char), typeof(void) }; + var test_not_allowed_types = new List() { typeof(decimal), typeof(char), typeof(void), + typeof(Dictionary) }; foreach (var type in test_not_allowed_types) { diff --git a/sdk-dotnet/LittleHorse.Sdk.Tests/Worker/VariableMappingTest.cs b/sdk-dotnet/LittleHorse.Sdk.Tests/Worker/VariableMappingTest.cs index 1f06e4050..717dd96a8 100644 --- a/sdk-dotnet/LittleHorse.Sdk.Tests/Worker/VariableMappingTest.cs +++ b/sdk-dotnet/LittleHorse.Sdk.Tests/Worker/VariableMappingTest.cs @@ -41,7 +41,7 @@ public void VariableMapping_WithValidLHTypes_ShouldBeBuiltSuccessfully() var variableType = LHMappingHelper.MapDotNetTypeToLHVariableType(type); TaskDef? taskDef = getTaskDefForTest(variableType); - var result = new VariableMapping(taskDef, position, type, paramName); + var result = new VariableMapping(taskDef!, position, type, paramName); Assert.True(result is not null); } @@ -56,7 +56,7 @@ public void VariableMapping_WithMismatchTypesInt_ShouldThrowException() TaskDef? taskDef = getTaskDefForTest(variableType); var exception = Assert.Throws( - () => new VariableMapping(taskDef, 0, type2, "any param name")); + () => new VariableMapping(taskDef!, 0, type2, "any param name")); Assert.Contains($"TaskDef provides INT, func accepts", exception.Message); } @@ -70,7 +70,7 @@ public void VariableMapping_WithMismatchTypeDouble_ShouldThrowException() TaskDef? taskDef = getTaskDefForTest(variableType); var exception = Assert.Throws( - () => new VariableMapping(taskDef, 0, type2, "any param name")); + () => new VariableMapping(taskDef!, 0, type2, "any param name")); Assert.Contains($"TaskDef provides DOUBLE, func accepts", exception.Message); } @@ -84,7 +84,7 @@ public void VariableMapping_WithMismatchTypeString_ShouldThrowException() TaskDef? taskDef = getTaskDefForTest(variableType); var exception = Assert.Throws( - () => new VariableMapping(taskDef, 0, type2, "any param name")); + () => new VariableMapping(taskDef!, 0, type2, "any param name")); Assert.Contains($"TaskDef provides STRING, func accepts", exception.Message); } @@ -98,7 +98,7 @@ public void VariableMapping_WithMismatchTypeBool_ShouldThrowException() TaskDef? taskDef = getTaskDefForTest(variableType); var exception = Assert.Throws( - () => new VariableMapping(taskDef, 0, type2, "any param name")); + () => new VariableMapping(taskDef!, 0, type2, "any param name")); Assert.Contains($"TaskDef provides BOOL, func accepts", exception.Message); } @@ -112,7 +112,7 @@ public void VariableMapping_WithMismatchTypeBytes_ShouldThrowException() TaskDef? taskDef = getTaskDefForTest(variableType); var exception = Assert.Throws( - () => new VariableMapping(taskDef, 0, type2, "any param name")); + () => new VariableMapping(taskDef!, 0, type2, "any param name")); Assert.Contains($"TaskDef provides BYTES, func accepts", exception.Message); } @@ -280,6 +280,29 @@ public void VariableMapping_WithAssignArrayObjectValue_ShouldReturnArrayObject() Assert.Equal(expectedList[0].Cars!.Count, actualList[0].Cars!.Count); } + [Fact] + public void VariableMapping_WithAssignJsonObjectValue_ShouldReturnDictionaryObject() + { + string value = "{\"FirstName\":\"Test\",\"Age\":\"35\",\"Address\":\"NA-Street\"}"; + Type type = typeof(Dictionary); + int position = 0; + string paramName = "param_test"; + var variableMapping = getVariableMappingForTest(type, paramName, position); + VariableValue variableValue = new VariableValue { JsonObj = value}; + ScheduledTask taskInstance = getScheduledTaskForTest(variableValue, paramName); + var mockWorkerContext = new Mock(taskInstance, new DateTime()); + + var result = variableMapping.Assign(taskInstance, mockWorkerContext.Object); + + var expectedList = (Dictionary)JsonConvert.DeserializeObject(value, type)!; + var actualList = (Dictionary)result!; + + Assert.Equal(expectedList.Count, actualList.Count); + Assert.Equal(expectedList["FirstName"], actualList["FirstName"]); + Assert.Equal(expectedList["Age"], actualList["Age"]); + Assert.Equal(expectedList["Address"], actualList["Address"]); + } + [Fact] public void VariableMapping_WithAssignJsonStringValue_ShouldReturnCustomObject() { @@ -319,7 +342,7 @@ private VariableMapping getVariableMappingForTest(Type type, string paramName, i var variableType = LHMappingHelper.MapDotNetTypeToLHVariableType(type); TaskDef? taskDef = getTaskDefForTest(variableType); - var variableMapping = new VariableMapping(taskDef, position, type, paramName); + var variableMapping = new VariableMapping(taskDef!, position, type, paramName); return variableMapping; } diff --git a/sdk-dotnet/LittleHorse.Sdk/Helper/LHMappingHelper.cs b/sdk-dotnet/LittleHorse.Sdk/Helper/LHMappingHelper.cs index da4738bcf..6c225f16f 100644 --- a/sdk-dotnet/LittleHorse.Sdk/Helper/LHMappingHelper.cs +++ b/sdk-dotnet/LittleHorse.Sdk/Helper/LHMappingHelper.cs @@ -39,7 +39,7 @@ public static VariableType MapDotNetTypeToLHVariableType(Type type) return VariableType.Bytes; } - if (typeof(IEnumerable).IsAssignableFrom(type)) + if (typeof(IList).IsAssignableFrom(type)) { return VariableType.JsonArr; } diff --git a/sdk-go/littlehorse/lh_errors.go b/sdk-go/littlehorse/lh_errors.go new file mode 100644 index 000000000..4dd9610a0 --- /dev/null +++ b/sdk-go/littlehorse/lh_errors.go @@ -0,0 +1,9 @@ +package littlehorse + +import "github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto" + +type LHTaskException struct { + Name string + Message string + Content *lhproto.VariableValue +} diff --git a/sdk-go/littlehorse/task_worker_internal.go b/sdk-go/littlehorse/task_worker_internal.go index a928b9879..c2591bf18 100644 --- a/sdk-go/littlehorse/task_worker_internal.go +++ b/sdk-go/littlehorse/task_worker_internal.go @@ -5,6 +5,7 @@ import ( "github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto" "log" "reflect" + "runtime/debug" "strconv" "sync" "time" @@ -311,6 +312,7 @@ func (m *serverConnectionManager) submitTaskForExecution(task *lhproto.Scheduled } func (m *serverConnectionManager) doTask(taskToExec *taskExecutionInfo) { + defer m.recoverFromPanic(taskToExec) taskResult := m.doTaskHelper(taskToExec.task) _, err := (*taskToExec.specificStub).ReportTask(context.Background(), taskResult) if err != nil { @@ -318,6 +320,34 @@ func (m *serverConnectionManager) doTask(taskToExec *taskExecutionInfo) { } } +func (m *serverConnectionManager) recoverFromPanic(taskToExec *taskExecutionInfo) { + if v := recover(); v != nil { + varVal, _ := InterfaceToVarVal(v) + taskResult := &lhproto.ReportTaskRun{ + TaskRunId: taskToExec.task.TaskRunId, + Time: timestamppb.Now(), + Status: lhproto.TaskStatus(lhproto.LHStatus_ERROR), + LogOutput: &lhproto.VariableValue{ + Value: &lhproto.VariableValue_Str{ + Str: string(debug.Stack()), + }, + }, + AttemptNumber: taskToExec.task.AttemptNumber, + Result: &lhproto.ReportTaskRun_Error{ + Error: &lhproto.LHTaskError{ + Type: lhproto.LHErrorType_TASK_FAILURE, + Message: "Task Worker Panic: " + varVal.GetStr(), + }, + }, + } + _, err := (*taskToExec.specificStub).ReportTask(context.Background(), taskResult) + if err != nil { + log.Default().Print(err) + m.retryReportTask(context.Background(), taskResult, TOTAL_RETRIES) + } + } +} + func (m *serverConnectionManager) retryReportTask(ctx context.Context, taskResult *lhproto.ReportTaskRun, retries int) { log.Println("Retrying reportTask rpc on wfRun {}", taskResult.TaskRunId.WfRunId) @@ -399,16 +429,40 @@ func (m *serverConnectionManager) doTaskHelper(task *lhproto.ScheduledTask) *lhp errorReflect := invocationResults[len(invocationResults)-1] if errorReflect.Interface() != nil { - errorVarVal, err := InterfaceToVarVal(errorReflect.Interface()) - if err != nil { - log.Println("WARN: was unable to serialize error") + // Check if the error is an LHTaskException + if lhtErr, ok := errorReflect.Interface().(*LHTaskException); ok { + taskResult.Result = &lhproto.ReportTaskRun_Exception{ + Exception: &lhproto.LHTaskException{ + Name: lhtErr.Name, + Message: lhtErr.Message, + Content: lhtErr.Content, + }, + } } else { - taskResult.LogOutput = errorVarVal + // Otherwise, try to interpret the error + if err, ok := errorReflect.Interface().(error); ok { + taskResult.Result = &lhproto.ReportTaskRun_Error{ + Error: &lhproto.LHTaskError{ + Type: lhproto.LHErrorType_TASK_FAILURE, + Message: err.Error(), + }, + } + } else { + // If the error returned by the taskMethod does not match the error interface + taskResult.Result = &lhproto.ReportTaskRun_Error{ + Error: &lhproto.LHTaskError{ + Type: lhproto.LHErrorType_TASK_FAILURE, + Message: "Task Method error serialization failed.", + }, + } + } } + taskResult.Status = lhproto.TaskStatus_TASK_FAILED } } + taskResult.AttemptNumber = task.AttemptNumber taskResult.Time = timestamppb.Now() return taskResult