Skip to content

Commit

Permalink
Merge branch 'add_logo_and_diagram' of https://github.com/koralium/fl…
Browse files Browse the repository at this point in the history
…owtide into add_logo_and_diagram
  • Loading branch information
Ulimo committed Feb 8, 2025
2 parents e0cbacc + 23b0e8e commit 93e2119
Show file tree
Hide file tree
Showing 186 changed files with 3,103 additions and 1,364 deletions.
13 changes: 13 additions & 0 deletions docs/docs/deployment/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
sidebar_position: 4
---

import DocCardList from '@theme/DocCardList';



# Deployment

This section covers information that can be useful when deploying flowtide.

<DocCardList />
73 changes: 73 additions & 0 deletions docs/docs/deployment/pauseresume.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
sidebar_position: 1
---

# Pause And Resume

It is possible to pause and resume a flowtide stream, this is useful when there might be maintainance work on a source system, or as a panic button to stop
the execution.

A pause does not stop the stream, but stops the traversal of all events inside of the stream. This means that resuming a paused stream is quick and does
not need to download any state from persistent storage.

## Pause and stop using IConfiguration

One of the easier ways to add pause and resume is to utilize `IConfiguration` in .NET. Flowtide uses an `IOptionsMonitor<FlowtidePauseOptions>` to check
for changes on the configuration and pauses and resumes the stream based on the value.

The class looks as follows:

```
public class FlowtidePauseOptions
{
public bool IsPaused { get; set; }
}
```

So to utilize this when using `FlowtideDotNet.AspNetCore` or `FlowtideDotNet.DependencyInjection` you add the following:

```
var builder = WebApplication.CreateBuilder(args);
...
// Map flowtide pause options to enable pausing and resuming from configuration
builder.Services.AddOptions<FlowtidePauseOptions>()
.Bind(builder.Configuration.GetSection("your_section"));
...
// Add the stream as normal
builder.Services.AddFlowtideStream("stream_name")
...
```

This is best fitted with an `IConfiguration` provider that supports loading changes dynamically such as Hashicorp Vault or Azure Key Vault.
Pausing and resuming using `IConfiguration`provider is dependent on the update frequency of the provider, so to utilize this fully this interval should be
kept quite low.

## Pause and stop using API endpoint

When using `FlowtideDotNet.AspNetCore` you can also map API endpoints to allow for pause and resume.

Example:

```
var builder = WebApplication.CreateBuilder(args);
...
// Add the stream as normal
builder.Services.AddFlowtideStream("stream_name")
...
var app = builder.Build();
...
// Map pause and resume endpoints
app.UseFlowtidePauseResumeEndpoints("base_path");
```

Two endpoints are registered under the base path, `/pause` and `/resume`.
32 changes: 32 additions & 0 deletions docs/docs/internal/storage/objectstate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
sidebar_position: 2
---

# Object State

The object state is the simplest type of state for an operator, it saves a C# object using JSON serialization to store it in persistent storage.

An object state can be fetched from the `IStateManagerClient` using `GetOrCreateObjectStateAsync<T>(string name)`.

Example:

```csharp
protected override async Task InitializeOrRestore(long restoreTime, IStateManagerClient stateManagerClient)
{
_state = await stateManagerClient.GetOrCreateObjectStateAsync<MyState>("my_state");
}

protected override async Task OnCheckpoint()
{
// Commit any changes made to the state
await _state.Commit();
}

public void OtherMethod()
{
_state.Value.Test = "hello";
}
```

The object state saves an internal copy of the value that was last commited and checkpointed, if the value has not changed nothing will be written to persistent storage.
So a user of the object state does not have to handle conditional calls to `Commit` to reduce the number of writes to persistent storage.
26 changes: 10 additions & 16 deletions docs/docs/statepersistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,24 +196,18 @@ A data page is fetched using the following logic:
## Compression

It is possible to compress pages in the state.
This is done by providing two functions to state serialize options, a compress function and a decompress function.
The option that exist today is to compress pages with Zstd. Most storage backends add zstd compression by default to save on network throughput and storage size.

Example using ZLib compression:
To set compression, it is set under add storage:

```csharp
builder.WithStateOptions(() => new StateManagerOptions()
{
builder.AddStorage(b => {
...
SerializeOptions = new StateSerializeOptions()
{
CompressFunc = (stream) =>
{
return new System.IO.Compression.ZLibStream(stream, CompressionMode.Compress);
},
DecompressFunc = (stream) =>
{
return new System.IO.Compression.ZLibStream(stream, CompressionMode.Decompress);
}
}
})

// Use zstd page compression
b.ZstdPageCompression();

// Use no compression even if the storage medium added compression
b.NoCompression();
});
```
8 changes: 4 additions & 4 deletions samples/MonitoringAzureMonitor/DummyReadOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public override IStreamIngressVertex CreateSource(ReadRelation readRelation, IFu
}
}

public class DummyReadOperator : ReadBaseOperator<object>
public class DummyReadOperator : ReadBaseOperator
{
public DummyReadOperator(DataflowBlockOptions options) : base(options)
{
Expand All @@ -56,14 +56,14 @@ protected override Task<IReadOnlySet<string>> GetWatermarkNames()
return Task.FromResult<IReadOnlySet<string>>(new HashSet<string>() { "dummy" });
}

protected override Task InitializeOrRestore(long restoreTime, object? state, IStateManagerClient stateManagerClient)
protected override Task InitializeOrRestore(long restoreTime, IStateManagerClient stateManagerClient)
{
return Task.CompletedTask;
}

protected override Task<object> OnCheckpoint(long checkpointTime)
protected override Task OnCheckpoint(long checkpointTime)
{
return Task.FromResult(new object());
return Task.CompletedTask;
}

protected override async Task SendInitial(IngressOutput<StreamEventBatch> output)
Expand Down
8 changes: 4 additions & 4 deletions samples/MonitoringAzureMonitor/DummyWriteOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public override IStreamEgressVertex CreateSink(WriteRelation writeRelation, IFun
}
}

public class DummyWriteOperator : WriteBaseOperator<object>
public class DummyWriteOperator : WriteBaseOperator
{
public DummyWriteOperator(ExecutionDataflowBlockOptions executionDataflowBlockOptions) : base(executionDataflowBlockOptions)
{
Expand All @@ -51,14 +51,14 @@ public override Task DeleteAsync()
return Task.CompletedTask;
}

protected override Task InitializeOrRestore(long restoreTime, object? state, IStateManagerClient stateManagerClient)
protected override Task InitializeOrRestore(long restoreTime, IStateManagerClient stateManagerClient)
{
return Task.CompletedTask;
}

protected override Task<object> OnCheckpoint(long checkpointTime)
protected override Task OnCheckpoint(long checkpointTime)
{
return Task.FromResult(new object());
return Task.CompletedTask;
}

protected override Task OnRecieve(StreamEventBatch msg, long time)
Expand Down
8 changes: 4 additions & 4 deletions samples/MonitoringPrometheus/DummyReadOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public override IStreamIngressVertex CreateSource(ReadRelation readRelation, IFu
}
}

public class DummyReadOperator : ReadBaseOperator<object>
public class DummyReadOperator : ReadBaseOperator
{
public DummyReadOperator(DataflowBlockOptions options) : base(options)
{
Expand All @@ -59,14 +59,14 @@ protected override Task<IReadOnlySet<string>> GetWatermarkNames()
return Task.FromResult<IReadOnlySet<string>>(new HashSet<string>() { "dummy" });
}

protected override Task InitializeOrRestore(long restoreTime, object? state, IStateManagerClient stateManagerClient)
protected override Task InitializeOrRestore(long restoreTime, IStateManagerClient stateManagerClient)
{
return Task.CompletedTask;
}

protected override Task<object> OnCheckpoint(long checkpointTime)
protected override Task OnCheckpoint(long checkpointTime)
{
return Task.FromResult(new object());
return Task.CompletedTask;
}

protected override async Task SendInitial(IngressOutput<StreamEventBatch> output)
Expand Down
8 changes: 4 additions & 4 deletions samples/MonitoringPrometheus/DummyWriteOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public override IStreamEgressVertex CreateSink(WriteRelation writeRelation, IFun
}
}

public class DummyWriteOperator : WriteBaseOperator<object>
public class DummyWriteOperator : WriteBaseOperator
{
public DummyWriteOperator(ExecutionDataflowBlockOptions executionDataflowBlockOptions) : base(executionDataflowBlockOptions)
{
Expand All @@ -51,14 +51,14 @@ public override Task DeleteAsync()
return Task.CompletedTask;
}

protected override Task InitializeOrRestore(long restoreTime, object? state, IStateManagerClient stateManagerClient)
protected override Task InitializeOrRestore(long restoreTime, IStateManagerClient stateManagerClient)
{
return Task.CompletedTask;
}

protected override Task<object> OnCheckpoint(long checkpointTime)
protected override Task OnCheckpoint(long checkpointTime)
{
return Task.FromResult(new object());
return Task.CompletedTask;
}

protected override Task OnRecieve(StreamEventBatch msg, long time)
Expand Down
6 changes: 3 additions & 3 deletions samples/SqlSampleWithUI/DummyReadOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public override IStreamIngressVertex CreateSource(ReadRelation readRelation, IFu
}
}

public class DummyReadOperator : ReadBaseOperator<object>
public class DummyReadOperator : ReadBaseOperator
{
public DummyReadOperator(DataflowBlockOptions options) : base(options)
{
Expand All @@ -59,12 +59,12 @@ protected override Task<IReadOnlySet<string>> GetWatermarkNames()
return Task.FromResult<IReadOnlySet<string>>(new HashSet<string>() { "dummy" });
}

protected override Task InitializeOrRestore(long restoreTime, object? state, IStateManagerClient stateManagerClient)
protected override Task InitializeOrRestore(long restoreTime, IStateManagerClient stateManagerClient)
{
return Task.CompletedTask;
}

protected override Task<object> OnCheckpoint(long checkpointTime)
protected override Task OnCheckpoint(long checkpointTime)
{
return Task.FromResult(new object());
}
Expand Down
8 changes: 4 additions & 4 deletions samples/SqlSampleWithUI/DummyWriteOperator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public override IStreamEgressVertex CreateSink(WriteRelation writeRelation, IFun
}
}

public class DummyWriteOperator : WriteBaseOperator<object>
public class DummyWriteOperator : WriteBaseOperator
{
public DummyWriteOperator(ExecutionDataflowBlockOptions executionDataflowBlockOptions) : base(executionDataflowBlockOptions)
{
Expand All @@ -51,14 +51,14 @@ public override Task DeleteAsync()
return Task.CompletedTask;
}

protected override Task InitializeOrRestore(long restoreTime, object? state, IStateManagerClient stateManagerClient)
protected override Task InitializeOrRestore(long restoreTime, IStateManagerClient stateManagerClient)
{
return Task.CompletedTask;
}

protected override Task<object> OnCheckpoint(long checkpointTime)
protected override Task OnCheckpoint(long checkpointTime)
{
return Task.FromResult(new object());
return Task.CompletedTask;
}

protected override Task OnRecieve(StreamEventBatch msg, long time)
Expand Down
6 changes: 5 additions & 1 deletion samples/SqlSampleWithUI/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
using FlowtideDotNet.Core.Sources.Generic;
using OpenTelemetry.Metrics;
using FlowtideDotNet.Core.Sinks;
using FlowtideDotNet.Base;

var builder = WebApplication.CreateBuilder(args);

// Map flowtide pause options to enable pausing and resuming from configuration
builder.Services.AddOptions<FlowtidePauseOptions>()
.Bind(builder.Configuration.GetSection("flowtide"));

var sqlText = @"
CREATE TABLE testtable (
val any
Expand Down Expand Up @@ -69,5 +74,4 @@ LEFT JOIN other o
app.UseHealthChecks("/health");
app.UseFlowtideUI("/");


app.Run();
5 changes: 4 additions & 1 deletion samples/SqlSampleWithUI/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*"
"AllowedHosts": "*",
"flowtide": {
"isPaused": false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
using FlowtideDotNet.AspNetCore.Internal.TimeSeries.Middleware;
using FlowtideDotNet.AspNetCore.TimeSeries;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using System.Text.Json;

namespace FlowtideDotNet.AspNetCore.Extensions
{
Expand Down Expand Up @@ -48,5 +50,34 @@ public static IApplicationBuilder UseFlowtideUI(this IApplicationBuilder app, Fl
return app.UseMiddleware<UiMiddleware>(new UiMiddlewareState(new DiagnosticsEndpoint(dataflowStream), new ReactEndpoint(path), path));
}

public static IApplicationBuilder UseFlowtidePauseResumeEndpoints(this IApplicationBuilder app, string basePath = "/ui/stream")
{
if (basePath == "/")
{
basePath = string.Empty;
}
app.Map($"{basePath}/pause", appBuilder =>
{
appBuilder.Use((HttpContext context,Func<Task> next) =>
{
context.RequestServices.GetRequiredService<FlowtideDotNet.Base.Engine.DataflowStream>().Pause();
context.Response.StatusCode = 200;
context.Response.ContentType = "application/json";
return context.Response.BodyWriter.WriteAsync(JsonSerializer.SerializeToUtf8Bytes(new { message = "Stream paused" })).AsTask();
});
});

app.Map($"{basePath}/resume", appBuilder =>
{
appBuilder.Use((HttpContext context, Func<Task> next) =>
{
context.RequestServices.GetRequiredService<FlowtideDotNet.Base.Engine.DataflowStream>().Resume();
context.Response.StatusCode = 200;
context.Response.ContentType = "application/json";
return context.Response.BodyWriter.WriteAsync(JsonSerializer.SerializeToUtf8Bytes(new { message = "Stream resumed" })).AsTask();
});
});
return app;
}
}
}
Loading

0 comments on commit 93e2119

Please sign in to comment.