Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
eduwercamacaro committed Dec 18, 2024
2 parents e42182b + 641743d commit 745d67e
Show file tree
Hide file tree
Showing 17 changed files with 113 additions and 83 deletions.
2 changes: 0 additions & 2 deletions canary/src/main/java/io/littlehorse/canary/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ private static void initialize(final String[] args) throws IOException {
lhConfig.getApiBootstrapHost(),
lhConfig.getApiBootstrapPort(),
lhClient.getServerVersion(),
canaryConfig.getMetronomeServerId(),
canaryConfig.getMetronomeServerDataplaneId(),
canaryConfig.getTopicName(),
canaryConfig.toKafkaConfig().toMap(),
canaryConfig.getMetronomeBeatExtraTags());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ private static List<Tag> toTags(final MetricKey key) {
final List<Tag> tags = new ArrayList<>();
tags.add(Tag.of("server", "%s:%s".formatted(key.getServerHost(), key.getServerPort())));
tags.add(Tag.of("server_version", key.getServerVersion()));
tags.add(Tag.of("server_id", key.getServerId()));
tags.add(Tag.of("dataplane_id", key.getDataplaneId()));
tags.addAll(key.getTagsList().stream()
.map(tag -> Tag.of(tag.getKey(), tag.getValue()))
.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ private static MetricKey buildMetricKey(final BeatKey key, final String id) {
.setServerVersion(key.getServerVersion())
.setServerPort(key.getServerPort())
.setServerHost(key.getServerHost())
.setServerId(key.getServerId())
.setDataplaneId(key.getDataplaneId())
.setId("canary_%s".formatted(id));

if (key.hasStatus() && !Strings.isNullOrEmpty(key.getStatus())) {
Expand Down Expand Up @@ -206,8 +204,6 @@ private static BeatKey removeWfId(final BeatKey key, final BeatValue value) {
.setServerHost(key.getServerHost())
.setServerPort(key.getServerPort())
.setStatus(key.getStatus())
.setServerId(key.getServerId())
.setDataplaneId(key.getDataplaneId())
.addAllTags(key.getTagsList())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public class CanaryConfig implements Config {
public static final String METRONOME_GET_RETRIES = "metronome.get.retries";
public static final String METRONOME_WORKER_ENABLE = "metronome.worker.enable";
public static final String METRONOME_DATA_PATH = "metronome.data.path";
public static final String METRONOME_SERVER_ID = "metronome.server.id";
public static final String METRONOME_SERVER_DATAPLANE_ID = "metronome.server.dataplane.id";
public static final String METRONOME_BEAT_EXTRA_TAGS = "metronome.beat.extra.tags";
public static final String METRONOME_BEAT_EXTRA_TAGS_PREFIX = "%s.".formatted(METRONOME_BEAT_EXTRA_TAGS);

Expand Down Expand Up @@ -72,7 +70,7 @@ public KafkaConfig toKafkaConfig() {
return new KafkaConfig(configs);
}

private String getConfig(final String configName) {
public String getConfig(final String configName) {
final Object value = configs.get(configName);
if (value == null) {
throw new IllegalArgumentException("Configuration 'lh.canary." + configName + "' not found");
Expand Down Expand Up @@ -187,12 +185,4 @@ public int getWorkflowVersion() {
public String getMetronomeDataPath() {
return getConfig(METRONOME_DATA_PATH);
}

public String getMetronomeServerId() {
return getConfig(METRONOME_SERVER_ID);
}

public String getMetronomeServerDataplaneId() {
return getConfig(METRONOME_SERVER_DATAPLANE_ID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,17 @@ public class BeatProducer {
private final int lhServerPort;
private final String lhServerVersion;
private final String topicName;
private final String lhServerId;
private final String dataplaneId;

public BeatProducer(
final String lhServerHost,
final int lhServerPort,
final String lhServerVersion,
final String lhServerId,
final String dataplaneId,
final String topicName,
final Map<String, Object> producerConfig,
final Map<String, String> extraTags) {
this.lhServerHost = lhServerHost;
this.lhServerPort = lhServerPort;
this.lhServerVersion = lhServerVersion;
this.lhServerId = lhServerId;
this.dataplaneId = dataplaneId;
this.topicName = topicName;
this.extraTags = extraTags;

Expand Down Expand Up @@ -95,8 +89,6 @@ private BeatKey buildKey(final String id, final BeatType type, final String stat
.setServerHost(lhServerHost)
.setServerPort(lhServerPort)
.setServerVersion(lhServerVersion)
.setServerId(lhServerId)
.setDataplaneId(dataplaneId)
.setId(id)
.setType(type);

Expand Down
2 changes: 0 additions & 2 deletions canary/src/main/proto/beats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ message BeatKey {
optional string status = 5;
optional string id = 6;
repeated Tag tags = 7;
string server_id = 8;
string dataplane_id = 9;
}

message BeatValue {
Expand Down
2 changes: 0 additions & 2 deletions canary/src/main/proto/metrics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ message MetricKey {
string server_version = 3;
string id = 4;
repeated Tag tags = 5;
string server_id = 6;
string dataplane_id = 7;
}

message MetricValue {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void shouldScrapeSimpleMetric() throws InterruptedException {

assertThat(prometheusRegistry.scrape())
.contains(
"my_metric{custom_tag=\"custom_value\",dataplane_id=\"my-dataplane\",server=\"localhost:2023\",server_id=\"my_server\",server_version=\"test\"} 1.0");
"my_metric{custom_tag=\"custom_value\",server=\"localhost:2023\",server_version=\"test\"} 1.0");
}

private static MetricKey createMetricsKey(List<Tag> tags) {
Expand All @@ -92,8 +92,6 @@ private static MetricKey createMetricsKey(String host, List<Tag> tags) {
.setServerPort(2023)
.setServerVersion("test")
.setId("my_metric")
.setServerId("my_server")
.setDataplaneId("my-dataplane")
.addAllTags(tags)
.build();
}
Expand Down Expand Up @@ -126,7 +124,7 @@ void printMetricsWithTwoDifferentServers() throws InterruptedException {
assertThat(prometheusRegistry.scrape())
.isEqualTo(
"# HELP my_metric \n" + "# TYPE my_metric gauge\n"
+ "my_metric{custom_tag=\"custom_value\",dataplane_id=\"my-dataplane\",server=\"localhost2:2023\",server_id=\"my_server\",server_version=\"test\"} 1.0\n"
+ "my_metric{custom_tag=\"custom_value\",dataplane_id=\"my-dataplane\",server=\"localhost:2023\",server_id=\"my_server\",server_version=\"test\"} 1.0\n");
+ "my_metric{custom_tag=\"custom_value\",server=\"localhost2:2023\",server_version=\"test\"} 1.0\n"
+ "my_metric{custom_tag=\"custom_value\",server=\"localhost:2023\",server_version=\"test\"} 1.0\n");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ class MetricsTopologyTest {

public static final String HOST_2 = "localhost2";
public static final int PORT_2 = 2024;
public static final String SERVER_ID = "LH";
public static final String DATAPLANE_ID = "DP";

private TopologyTestDriver testDriver;
private TestInputTopic<BeatKey, BeatValue> inputTopic;
Expand Down Expand Up @@ -62,12 +60,8 @@ private static MetricKey newMetricKey(String id, String status, Map<String, Stri
}

private static MetricKey newMetricKey(String host, int port, String id, String status, Map<String, String> tags) {
MetricKey.Builder builder = MetricKey.newBuilder()
.setServerHost(host)
.setServerPort(port)
.setId(id)
.setServerId(SERVER_ID)
.setDataplaneId(DATAPLANE_ID);
MetricKey.Builder builder =
MetricKey.newBuilder().setServerHost(host).setServerPort(port).setId(id);

if (status != null) {
builder.addTags(Tag.newBuilder().setKey("status").setValue(status).build());
Expand Down Expand Up @@ -111,8 +105,6 @@ private static TestRecord<BeatKey, BeatValue> newBeat(
.setServerHost(host)
.setServerPort(port)
.setType(type)
.setServerId(SERVER_ID)
.setDataplaneId(DATAPLANE_ID)
.setId(id);
BeatValue.Builder valueBuilder = BeatValue.newBuilder().setTime(Timestamps.now());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ void throwsExceptionIfConfigurationIsNotFound() {
CanaryConfig canaryConfig = new CanaryConfig(Map.of());

IllegalArgumentException result =
assertThrows(IllegalArgumentException.class, canaryConfig::getMetronomeServerId);
assertThrows(IllegalArgumentException.class, () -> canaryConfig.getConfig("my.config"));

assertThat(result.getMessage()).isEqualTo("Configuration 'lh.canary.metronome.server.id' not found");
assertThat(result.getMessage()).isEqualTo("Configuration 'lh.canary.my.config' not found");
}
}
23 changes: 3 additions & 20 deletions docs/CANARY_CONFIGURATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
* [`lh.canary.metronome.get.retries`](#lhcanarymetronomegetretries)
* [`lh.canary.metronome.data.path`](#lhcanarymetronomedatapath)
* [`lh.canary.metronome.beat.extra.tags.<additional tag>`](#lhcanarymetronomebeatextratagsadditional-tag)
* [`lh.canary.metronome.server.id`](#lhcanarymetronomeserverid)
* [`lh.canary.metronome.server.dataplane.id`](#lhcanarymetronomeserverdataplaneid)
* [Kafka Configurations](#kafka-configurations)
* [LH Client Configurations](#lh-client-configurations)
* [Task Worker](#task-worker)
Expand Down Expand Up @@ -168,28 +166,13 @@ For example: `lh.canary.metronome.beat.extra.tags.my_tag=my-value`.
- **Default:** null
- **Importance:** low

#### `lh.canary.metronome.server.id`

Add the tag server id the prometheus metrics (**mandatory**).
For example: `lh.canary.metronome.server.id=lh`.

- **Type:** string
- **Default:** null
- **Importance:** high

#### `lh.canary.metronome.server.dataplane.id`

Add the tag dataplane id the prometheus metrics (**mandatory**).
For example: `lh.canary.metronome.server.dataplane.id=my-cluster-aws`.

- **Type:** string
- **Default:** null
- **Importance:** high
Limitations: Please note that it is not recommended to change the tag set once the canary has been started.
For more information check https://github.com/prometheus/client_java/issues/696.

### Kafka Configurations

LH Canary supports all kafka configurations. Use the prefix `lh.canary.kafka` and append the kafka config.
Examples
Examples:

- For `security.protocol`, use `lh.canary.kafka.security.protocol`.
- For `bootstrap.servers`, use `lh.canary.kafka.bootstrap.servers`.
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/05-developer-guide/05-task-worker-development.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ The Go SDK currently (as of `0.11.0`) does not yet support throwing `LHTaskExcep
<TabItem value="python" label="Python">

```python
from littlehorse.exceptions import LHTaskExceptio
from littlehorse.exceptions import LHTaskException

async def ship_item(item_sku: str) -> str:
if is_out_of_stock():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void LHHelper_WithSystemBytesVariableType_ShouldReturnLHVariableBytesType
}

[Fact]
public void LHHelper_WithSystemArrayObjectVariableType_ShouldReturnLHVariableJsonArrType()
public void LHHelper_WithIlistObjectType_ShouldReturnLHVariableJsonArrType()
{
var test_allowed_types = new List<Type>() { typeof(List<object>), typeof(List<string>), typeof(List<int>)};

Expand All @@ -85,7 +85,8 @@ public void LHHelper_WithSystemArrayObjectVariableType_ShouldReturnLHVariableJso
[Fact]
public void LHHelper_WithNotAllowedSystemVariableTypes_ShouldReturnLHJsonObj()
{
var test_not_allowed_types = new List<Type>() { typeof(decimal), typeof(char), typeof(void) };
var test_not_allowed_types = new List<Type>() { typeof(decimal), typeof(char), typeof(void),
typeof(Dictionary<string, string>) };

foreach (var type in test_not_allowed_types)
{
Expand Down
37 changes: 30 additions & 7 deletions sdk-dotnet/LittleHorse.Sdk.Tests/Worker/VariableMappingTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void VariableMapping_WithValidLHTypes_ShouldBeBuiltSuccessfully()
var variableType = LHMappingHelper.MapDotNetTypeToLHVariableType(type);
TaskDef? taskDef = getTaskDefForTest(variableType);

var result = new VariableMapping(taskDef, position, type, paramName);
var result = new VariableMapping(taskDef!, position, type, paramName);

Assert.True(result is not null);
}
Expand All @@ -56,7 +56,7 @@ public void VariableMapping_WithMismatchTypesInt_ShouldThrowException()
TaskDef? taskDef = getTaskDefForTest(variableType);

var exception = Assert.Throws<LHTaskSchemaMismatchException>(
() => new VariableMapping(taskDef, 0, type2, "any param name"));
() => new VariableMapping(taskDef!, 0, type2, "any param name"));

Assert.Contains($"TaskDef provides INT, func accepts", exception.Message);
}
Expand All @@ -70,7 +70,7 @@ public void VariableMapping_WithMismatchTypeDouble_ShouldThrowException()
TaskDef? taskDef = getTaskDefForTest(variableType);

var exception = Assert.Throws<LHTaskSchemaMismatchException>(
() => new VariableMapping(taskDef, 0, type2, "any param name"));
() => new VariableMapping(taskDef!, 0, type2, "any param name"));

Assert.Contains($"TaskDef provides DOUBLE, func accepts", exception.Message);
}
Expand All @@ -84,7 +84,7 @@ public void VariableMapping_WithMismatchTypeString_ShouldThrowException()
TaskDef? taskDef = getTaskDefForTest(variableType);

var exception = Assert.Throws<LHTaskSchemaMismatchException>(
() => new VariableMapping(taskDef, 0, type2, "any param name"));
() => new VariableMapping(taskDef!, 0, type2, "any param name"));

Assert.Contains($"TaskDef provides STRING, func accepts", exception.Message);
}
Expand All @@ -98,7 +98,7 @@ public void VariableMapping_WithMismatchTypeBool_ShouldThrowException()
TaskDef? taskDef = getTaskDefForTest(variableType);

var exception = Assert.Throws<LHTaskSchemaMismatchException>(
() => new VariableMapping(taskDef, 0, type2, "any param name"));
() => new VariableMapping(taskDef!, 0, type2, "any param name"));

Assert.Contains($"TaskDef provides BOOL, func accepts", exception.Message);
}
Expand All @@ -112,7 +112,7 @@ public void VariableMapping_WithMismatchTypeBytes_ShouldThrowException()
TaskDef? taskDef = getTaskDefForTest(variableType);

var exception = Assert.Throws<LHTaskSchemaMismatchException>(
() => new VariableMapping(taskDef, 0, type2, "any param name"));
() => new VariableMapping(taskDef!, 0, type2, "any param name"));

Assert.Contains($"TaskDef provides BYTES, func accepts", exception.Message);
}
Expand Down Expand Up @@ -280,6 +280,29 @@ public void VariableMapping_WithAssignArrayObjectValue_ShouldReturnArrayObject()
Assert.Equal(expectedList[0].Cars!.Count, actualList[0].Cars!.Count);
}

[Fact]
public void VariableMapping_WithAssignJsonObjectValue_ShouldReturnDictionaryObject()
{
string value = "{\"FirstName\":\"Test\",\"Age\":\"35\",\"Address\":\"NA-Street\"}";
Type type = typeof(Dictionary<string, string>);
int position = 0;
string paramName = "param_test";
var variableMapping = getVariableMappingForTest(type, paramName, position);
VariableValue variableValue = new VariableValue { JsonObj = value};
ScheduledTask taskInstance = getScheduledTaskForTest(variableValue, paramName);
var mockWorkerContext = new Mock<LHWorkerContext>(taskInstance, new DateTime());

var result = variableMapping.Assign(taskInstance, mockWorkerContext.Object);

var expectedList = (Dictionary<string, string>)JsonConvert.DeserializeObject(value, type)!;
var actualList = (Dictionary<string, string>)result!;

Assert.Equal(expectedList.Count, actualList.Count);
Assert.Equal(expectedList["FirstName"], actualList["FirstName"]);
Assert.Equal(expectedList["Age"], actualList["Age"]);
Assert.Equal(expectedList["Address"], actualList["Address"]);
}

[Fact]
public void VariableMapping_WithAssignJsonStringValue_ShouldReturnCustomObject()
{
Expand Down Expand Up @@ -319,7 +342,7 @@ private VariableMapping getVariableMappingForTest(Type type, string paramName, i
var variableType = LHMappingHelper.MapDotNetTypeToLHVariableType(type);
TaskDef? taskDef = getTaskDefForTest(variableType);

var variableMapping = new VariableMapping(taskDef, position, type, paramName);
var variableMapping = new VariableMapping(taskDef!, position, type, paramName);

return variableMapping;
}
Expand Down
2 changes: 1 addition & 1 deletion sdk-dotnet/LittleHorse.Sdk/Helper/LHMappingHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static VariableType MapDotNetTypeToLHVariableType(Type type)
return VariableType.Bytes;
}

if (typeof(IEnumerable).IsAssignableFrom(type))
if (typeof(IList).IsAssignableFrom(type))
{
return VariableType.JsonArr;
}
Expand Down
9 changes: 9 additions & 0 deletions sdk-go/littlehorse/lh_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package littlehorse

import "github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"

type LHTaskException struct {
Name string
Message string
Content *lhproto.VariableValue
}
Loading

0 comments on commit 745d67e

Please sign in to comment.