Skip to content

Commit

Permalink
Merge branch 'master' into 628-searchwfrun-duplicates-with-variablematch
Browse files Browse the repository at this point in the history
  • Loading branch information
eduwercamacaro committed Jan 31, 2024
2 parents 05de712 + fc0fe12 commit a8cd56f
Show file tree
Hide file tree
Showing 115 changed files with 2,633 additions and 1,655 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ permissions:
contents: read
jobs:
publish-docker:
uses: ./.github/workflows/publish-docker.yaml
uses: ./.github/workflows/publish-docker.yml

sdk-java:
runs-on: ubuntu-latest
Expand Down
39 changes: 21 additions & 18 deletions dashboard/apps/web/littlehorse-public-api/acls.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* eslint-disable */
import * as _m0 from "protobufjs/minimal";
import { Timestamp } from "./google/protobuf/timestamp";
import { PrincipalId, TenantId } from "./object_id";

export const protobufPackage = "littlehorse";

Expand Down Expand Up @@ -157,7 +158,7 @@ export interface Principal {
* the id is retrieved by looking at the claims on the request. In mTLS, the
* id is retrived by looking at the Subject Name of the client certificate.
*/
id: string;
id: PrincipalId | undefined;
createdAt:
| string
| undefined;
Expand All @@ -177,7 +178,9 @@ export interface Principal_PerTenantAclsEntry {

/** This is a GlobalGetable */
export interface Tenant {
id: string;
id:
| TenantId
| undefined;
/** Future versions will include quotas on a per-Tenant basis. */
createdAt: string | undefined;
}
Expand Down Expand Up @@ -214,13 +217,13 @@ export interface PutTenantRequest {
}

function createBasePrincipal(): Principal {
return { id: "", createdAt: undefined, perTenantAcls: {}, globalAcls: undefined };
return { id: undefined, createdAt: undefined, perTenantAcls: {}, globalAcls: undefined };
}

export const Principal = {
encode(message: Principal, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.id !== "") {
writer.uint32(10).string(message.id);
if (message.id !== undefined) {
PrincipalId.encode(message.id, writer.uint32(10).fork()).ldelim();
}
if (message.createdAt !== undefined) {
Timestamp.encode(toTimestamp(message.createdAt), writer.uint32(18).fork()).ldelim();
Expand All @@ -246,7 +249,7 @@ export const Principal = {
break;
}

message.id = reader.string();
message.id = PrincipalId.decode(reader, reader.uint32());
continue;
case 2:
if (tag !== 18) {
Expand Down Expand Up @@ -283,7 +286,7 @@ export const Principal = {

fromJSON(object: any): Principal {
return {
id: isSet(object.id) ? globalThis.String(object.id) : "",
id: isSet(object.id) ? PrincipalId.fromJSON(object.id) : undefined,
createdAt: isSet(object.createdAt) ? globalThis.String(object.createdAt) : undefined,
perTenantAcls: isObject(object.perTenantAcls)
? Object.entries(object.perTenantAcls).reduce<{ [key: string]: ServerACLs }>((acc, [key, value]) => {
Expand All @@ -297,8 +300,8 @@ export const Principal = {

toJSON(message: Principal): unknown {
const obj: any = {};
if (message.id !== "") {
obj.id = message.id;
if (message.id !== undefined) {
obj.id = PrincipalId.toJSON(message.id);
}
if (message.createdAt !== undefined) {
obj.createdAt = message.createdAt;
Expand All @@ -323,7 +326,7 @@ export const Principal = {
},
fromPartial<I extends Exact<DeepPartial<Principal>, I>>(object: I): Principal {
const message = createBasePrincipal();
message.id = object.id ?? "";
message.id = (object.id !== undefined && object.id !== null) ? PrincipalId.fromPartial(object.id) : undefined;
message.createdAt = object.createdAt ?? undefined;
message.perTenantAcls = Object.entries(object.perTenantAcls ?? {}).reduce<{ [key: string]: ServerACLs }>(
(acc, [key, value]) => {
Expand Down Expand Up @@ -418,13 +421,13 @@ export const Principal_PerTenantAclsEntry = {
};

function createBaseTenant(): Tenant {
return { id: "", createdAt: undefined };
return { id: undefined, createdAt: undefined };
}

export const Tenant = {
encode(message: Tenant, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.id !== "") {
writer.uint32(10).string(message.id);
if (message.id !== undefined) {
TenantId.encode(message.id, writer.uint32(10).fork()).ldelim();
}
if (message.createdAt !== undefined) {
Timestamp.encode(toTimestamp(message.createdAt), writer.uint32(18).fork()).ldelim();
Expand All @@ -444,7 +447,7 @@ export const Tenant = {
break;
}

message.id = reader.string();
message.id = TenantId.decode(reader, reader.uint32());
continue;
case 2:
if (tag !== 18) {
Expand All @@ -464,15 +467,15 @@ export const Tenant = {

fromJSON(object: any): Tenant {
return {
id: isSet(object.id) ? globalThis.String(object.id) : "",
id: isSet(object.id) ? TenantId.fromJSON(object.id) : undefined,
createdAt: isSet(object.createdAt) ? globalThis.String(object.createdAt) : undefined,
};
},

toJSON(message: Tenant): unknown {
const obj: any = {};
if (message.id !== "") {
obj.id = message.id;
if (message.id !== undefined) {
obj.id = TenantId.toJSON(message.id);
}
if (message.createdAt !== undefined) {
obj.createdAt = message.createdAt;
Expand All @@ -485,7 +488,7 @@ export const Tenant = {
},
fromPartial<I extends Exact<DeepPartial<Tenant>, I>>(object: I): Tenant {
const message = createBaseTenant();
message.id = object.id ?? "";
message.id = (object.id !== undefined && object.id !== null) ? TenantId.fromPartial(object.id) : undefined;
message.createdAt = object.createdAt ?? undefined;
return message;
},
Expand Down
117 changes: 93 additions & 24 deletions docs/SERVER_CONFIGURATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,21 @@
- [`LHS_SHOULD_CREATE_TOPICS`](#lhs_should_create_topics)
- [`LHS_REPLICATION_FACTOR`](#lhs_replication_factor)
- [`LHS_CLUSTER_PARTITIONS`](#lhs_cluster_partitions)
- [`LHS_STREAMS_NUM_THREADS`](#lhs_streams_num_threads)
- [`LHS_CORE_STREAM_THREADS`](#lhs_core_stream_threads)
- [`LHS_TIMER_STREAM_THREADS`](#lhs_timer_stream_threads)
- [`LHS_NUM_NETWORK_THREADS`](#lhs_num_network_threads)
- [`LHS_STREAMS_SESSION_TIMEOUT`](#lhs_streams_session_timeout)
- [`LHS_STREAMS_COMMIT_INTERVAL`](#lhs_streams_commit_interval)
- [`LHS_CORE_STREAMS_COMMIT_INTERVAL`](#lhs_core_streams_commit_interval)
- [`LHS_TIMER_STREAMS_COMMIT_INTERVAL`](#lhs_timer_streams_commit_interval)
- [`LHS_STATE_DIR`](#lhs_state_dir)
- [`LHS_STREAMS_NUM_STANDBY_REPLICAS`](#lhs_streams_num_standby_replicas)
- [`LHS_STREAMS_NUM_WARMUP_REPLICAS`](#lhs_streams_num_warmup_replicas)
- [`LHS_DEFAULT_WFRUN_RETENTION_HOURS`](#lhs_default_wfrun_retention_hours)
- [`LHS_DEFAULT_EXTERNAL_EVENT_RETENTION_HOURS`](#lhs_default_external_event_retention_hours)
- [`LHS_CORE_MEMTABLE_SIZE_BYTES`](#lhs_core_memtable_size_bytes)
- [`LHS_TIMER_MEMTABLE_SIZE_BYTES`](#lhs_timer_memtable_size_bytes)
- [`LHS_CORE_STATESTORE_CACHE_BYTES`](#lhs_core_statestore_cache_bytes)
- [`LHS_TIMER_STATESTORE_CACHE_BYTES`](#lhs_timer_statestore_cache_bytes)
- [`LHS_ROCKSDB_TOTAL_BLOCK_CACHE_BYTES`](#lhs_rocksdb_total_block_cache_bytes)
- [`LHS_ROCKSDB_TOTAL_MEMTABLE_BYTES`](#lhs_rocksdb_total_memtable_bytes)
- [Monitoring](#monitoring)
- [`LHS_HEALTH_SERVICE_PORT`](#lhs_health_service_port)
- [`LHS_HEALTH_PATH_METRICS`](#lhs_health_path_metrics)
Expand Down Expand Up @@ -433,10 +439,13 @@ A unique identifier of the consumer instance provided by the end user. [Kafka Of

### `LHS_RACK_ID`

Provides rack awareness to the cluster. [Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_rack.aware.assignment.tags).
Provides rack awareness to the cluster. Used in two ways:

* To ensure that standby tasks are scheduled in different rack's than their active tasks ([Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_rack.aware.assignment.tags)).
* To enable follower fetching for standby tasks. Reduces networking costs without impacting application performance.

- **Type:** string
- **Default:** unset-rack-id
- **Default:** null
- **Importance:** medium

---
Expand Down Expand Up @@ -471,11 +480,21 @@ The number of partitions in each internal kafka topic. Necessary whether or not

---

### `LHS_STREAMS_NUM_THREADS`
### `LHS_CORE_STREAM_THREADS`

The number of threads to execute stream processing. [Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_num.stream.threads).
The number of threads to execute stream processing in the Core Topology. [Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_num.stream.threads). For a server with `N` cores, we recommend setting this to `N * 0.6`.

- **Type:** int
- **Type:** int, >= 1
- **Default:** 1
- **Importance:** medium

---

### `LHS_TIMER_STREAM_THREADS`

The number of threads to execute stream processing in the Timer Topology. [Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_num.stream.threads). For a server with `N` cores, we recommend setting this to `N * 0.4`.

- **Type:** int, >= 1
- **Default:** 1
- **Importance:** medium

Expand All @@ -502,13 +521,23 @@ The timeout used to detect client failures when using Kafka's group management f

---

### `LHS_STREAMS_COMMIT_INTERVAL`
### `LHS_CORE_STREAMS_COMMIT_INTERVAL`

The frequency in milliseconds with which to commit processing progress. [Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_commit.interval.ms).
The frequency in milliseconds with which to commit processing progress on the Core Topology. [Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_commit.interval.ms). For the Core Topology, we recommend setting it to 5000 milliseconds. A large enough value along with a large value for `LHS_CORE_STATESTORE_CACHE_BYTES` will result in fewer records emitted to the Kafka Streams Changelog topics.

- **Type:** int
- **Default:** 100
- **Importance:** low
- **Default:** 5000
- **Importance:** medium

---

### `LHS_TIMER_STREAMS_COMMIT_INTERVAL`

The frequency in milliseconds with which to commit processing progress on the Timer Topology. [Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_commit.interval.ms). For the Timer Topology, we recommend setting it to 30000 milliseconds. A large enough value along with a large value for `LHS_TIMER_STATESTORE_CACHE_BYTES` will result in fewer records emitted to the Kafka Streams Changelog topics.

- **Type:** int
- **Default:** 30000
- **Importance:** medium

---

Expand All @@ -524,7 +553,7 @@ Directory location for state store. This path must be unique for each streams in

### `LHS_STREAMS_NUM_STANDBY_REPLICAS`

The number of standby replicas for each task. [Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_num.standby.replicas).
The number of standby replicas for each task. Applies to both Core and Timer topologies. [Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_num.standby.replicas).

- **Type:** int
- **Default:** 0
Expand All @@ -536,32 +565,72 @@ The number of standby replicas for each task. [Kafka Official](https://kafka.apa

The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping the task available on one instance while it is warming up on another instance it has been reassigned to. [Kafka Official](https://kafka.apache.org/documentation/#streamsconfigs_max.warmup.replicas).

The same config is used by both the Core and Timer Topologies. Note that if you set `LHS_STREAMS_NUM_WARMUP_REPLICAS = N`, then there can be up to `2 * N` warmup replicas scheduled.

- **Type:** int
- **Default:** 12
- **Importance:** medium

---

### `LHS_DEFAULT_WFRUN_RETENTION_HOURS`
### `LHS_CORE_MEMTABLE_SIZE_BYTES`

Default total hours of life that a `WfRun` will live in the system in case this value was not set when
creating the `WfSpec`.
The size of a RocksDB Memtable (aka Write Buffer) for the Core Topology.

- **Type:** int
- **Default:** 168
- **Type:** long
- **Default:** 67108864 (64MB)
- **Importance:** low

---

### `LHS_DEFAULT_EXTERNAL_EVENT_RETENTION_HOURS`
### `LHS_TIMER_MEMTABLE_SIZE_BYTES`

Default total hours of life that a `ExternalEvent` will live in the system in case this value was not set when
creating the `ExternalEventDef`.
The size of a RocksDB Memtable (aka Write Buffer) for the Timer Topology.

- **Type:** int
- **Default:** 168
- **Type:** long
- **Default:** 33554432 (32MB)
- **Importance:** low

---

### `LHS_CORE_STATESTORE_CACHE_BYTES`

The size of the Kafka Streams State Store Cache on the Core Topology. This cache is put in front of RocksDB (i.e. before any writes to the Memtable) and is flushed on every Streams Commit (`LHS_CORE_STREAMS_COMMIT_INTERVAL`). This cache is shared by all Core Topology state stores on a server. A large enough value will result in fewer records emitted to the Kafka Streams Changelog topic.

- **Type:** long
- **Default:** 33554432 (32MB)
- **Importance:** high

---

### `LHS_TIMER_STATESTORE_CACHE_BYTES`

The size of the Kafka Streams State Store Cache on the Timer Topology. This cache is put in front of RocksDB (i.e. before any writes to the Memtable) and is flushed on every Streams Commit (`LHS_TIMER_STREAMS_COMMIT_INTERVAL`). A large enough value will result in fewer records emitted to the Kafka Streams Changelog topic.

- **Type:** long
- **Default:** 67108864 (64MB)
- **Importance:** high

---

### `LHS_ROCKSDB_TOTAL_BLOCK_CACHE_BYTES`

The size of the shared Block Cache for reads into RocksDB. Memory used by this cache is allocated off-heap. If not set, then there is no limit and the Kafka Streams default is used (each RocksDB instance gets its own 50-MB cache).

- **Type:** long
- **Default:** null
- **Importance:** low

---

### `LHS_ROCKSDB_TOTAL_MEMTABLE_BYTES`

The capacity of the Rocksdb Write Buffer Manager. Memory used by the Write Buffer Manager is allocated off-heap. If not set, then there is no limit for off-heap memory allocated to memtables.

- **Type:** long
- **Default:** null
- **Importance:** high

## Monitoring

### `LHS_HEALTH_SERVICE_PORT`
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=0.7.0
version=0.7.1
group=io.littlehorse
kafkaVersion=3.6.0
lombokVersion=1.18.28
Expand Down
2 changes: 1 addition & 1 deletion lhctl/cmd/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var versionCmd = &cobra.Command{
Use: "version",
Short: "Print Client and Server Version Information.",
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("lhctl version: 0.7.0")
fmt.Println("lhctl version: 0.7.2")

resp, err := getGlobalClient(cmd).GetServerVersion(requestContext(), &emptypb.Empty{})
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions local-dev/configs/larger-memory.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
LHS_KAFKA_BOOTSTRAP_SERVERS=localhost:9092
LHS_CLUSTER_ID=server
LHS_INSTANCE_ID=server1
LHS_CLUSTER_PARTITIONS=12
LHS_SHOULD_CREATE_TOPICS=true
LHS_CORE_STREAM_THREADS=2
LHS_TIMER_STREAM_THREADS=2

LHS_ROCKSDB_TOTAL_MEMTABLE_BYTES=4294967296
LHS_ROCKSDB_TOTAL_BLOCK_CACHE_BYTES=4294967296
LHS_CORE_STATESTORE_CACHE_BYTES=4294967296
LHS_TIMER_STATESTORE_CACHE_BYTES=4294967296
7 changes: 4 additions & 3 deletions schemas/acls.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ syntax = "proto3";
package littlehorse;

import "google/protobuf/timestamp.proto";
import "object_id.proto";

option go_package = ".;model";
option java_multiple_files = true;
option java_package = "io.littlehorse.common.proto";
option java_package = "io.littlehorse.sdk.common.proto";
option csharp_namespace = "LittleHorse.Common.Proto";

// This is a GlobalGetable.
message Principal {
// Principals are agnostic of the Authentication protocol that you use. In OAuth,
// the id is retrieved by looking at the claims on the request. In mTLS, the
// id is retrived by looking at the Subject Name of the client certificate.
string id = 1;
PrincipalId id = 1;
google.protobuf.Timestamp created_at = 2;

// Maps a Tenant ID to a list of ACL's that the Principal has permission to
Expand All @@ -28,7 +29,7 @@ message Principal {

// This is a GlobalGetable
message Tenant {
string id = 1;
TenantId id = 1;
google.protobuf.Timestamp created_at = 2;
// Future versions will include quotas on a per-Tenant basis.
}
Expand Down
Loading

0 comments on commit a8cd56f

Please sign in to comment.