Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto wire up grafana #3397

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -280,25 +280,6 @@ debug *args:
dlv_pid=$!
wait "$dlv_pid"

# Run `ftl dev` with the given args after setting the necessary envar.
otel-dev *args:
#!/bin/bash
set -euo pipefail

export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:${OTEL_GRPC_PORT}"
export OTEL_METRIC_EXPORT_INTERVAL=${OTEL_METRIC_EXPORT_INTERVAL}
# Uncomment this line for much richer debug logs
# export FTL_O11Y_LOG_LEVEL="debug"
ftl dev {{args}}

# runs the otel-lgtm observability stack locallt which includes
# an otel collector, loki (for logs), prometheus metrics db (for metrics), tempo (trace storage) and grafana (for visualization)
observe:
docker compose up otel-lgtm

observe-stop:
docker compose down otel-lgtm

localstack:
docker compose up localstack -d --wait

Expand Down
8 changes: 7 additions & 1 deletion backend/controller/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/TBD54566975/ftl/internal/localdebug"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/observability"
)

var _ scaling.RunnerScaling = &localScaling{}
Expand All @@ -42,6 +43,7 @@ type localScaling struct {
prevRunnerSuffix int
ideSupport optional.Option[localdebug.IDEIntegration]
registryConfig artefacts.RegistryConfig
enableOtel bool
}

func (l *localScaling) Start(ctx context.Context, endpoint url.URL, leaser leases.Leaser) error {
Expand Down Expand Up @@ -82,7 +84,7 @@ type runnerInfo struct {
port string
}

func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*url.URL, configPath string, enableIDEIntegration bool, registryConfig artefacts.RegistryConfig) (scaling.RunnerScaling, error) {
func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*url.URL, configPath string, enableIDEIntegration bool, registryConfig artefacts.RegistryConfig, enableOtel bool) (scaling.RunnerScaling, error) {

cacheDir, err := os.UserCacheDir()
if err != nil {
Expand All @@ -97,6 +99,7 @@ func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*u
prevRunnerSuffix: -1,
debugPorts: map[string]*localdebug.DebugInfo{},
registryConfig: registryConfig,
enableOtel: enableOtel,
}
if enableIDEIntegration && configPath != "" {
local.ideSupport = optional.Ptr(localdebug.NewIDEIntegration(configPath))
Expand Down Expand Up @@ -201,6 +204,9 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, in
Deployment: deploymentKey,
DebugPort: debugPort,
Registry: l.registryConfig,
ObservabilityConfig: observability.Config{
ExportOTEL: observability.ExportOTELFlag(l.enableOtel),
},
}

simpleName := fmt.Sprintf("runner%d", keySuffix)
Expand Down
6 changes: 6 additions & 0 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/TBD54566975/ftl"
"github.com/TBD54566975/ftl/backend/controller/artefacts"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
Expand Down Expand Up @@ -57,11 +58,16 @@ type Config struct {
Deployment string `help:"The deployment this runner is for." env:"FTL_DEPLOYMENT"`
DebugPort int `help:"The port to use for debugging." env:"FTL_DEBUG_PORT"`
Registry artefacts.RegistryConfig `embed:"" prefix:"oci-"`
ObservabilityConfig ftlobservability.Config `embed:"" prefix:"o11y-"`
}

func Start(ctx context.Context, config Config) error {
ctx, doneFunc := context.WithCancel(ctx)
defer doneFunc()
err := ftlobservability.Init(ctx, false, "", "ftl-runner", ftl.Version, config.ObservabilityConfig)
if err != nil {
return fmt.Errorf("failed to initialise observability: %w", err)
}
hostname, err := os.Hostname()
if err != nil {
observability.Runner.StartupFailed(ctx)
Expand Down
10 changes: 3 additions & 7 deletions cmd/ftl-runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ import (
"github.com/TBD54566975/ftl/backend/runner"
_ "github.com/TBD54566975/ftl/internal/automaxprocs" // Set GOMAXPROCS to match Linux container CPU quota.
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/observability"
)

var cli struct {
Version kong.VersionFlag `help:"Show version."`
LogConfig log.Config `prefix:"log-" embed:""`
ObservabilityConfig observability.Config `embed:"" prefix:"o11y-"`
RunnerConfig runner.Config `embed:""`
Version kong.VersionFlag `help:"Show version."`
LogConfig log.Config `prefix:"log-" embed:""`
RunnerConfig runner.Config `embed:""`
}

func main() {
Expand All @@ -44,8 +42,6 @@ and route to user code.
})
logger := log.Configure(os.Stderr, cli.LogConfig)
ctx := log.ContextWithLogger(context.Background(), logger)
err = observability.Init(ctx, false, "", "ftl-runner", ftl.Version, cli.ObservabilityConfig)
kctx.FatalIfErrorf(err, "failed to initialize observability")
err = runner.Start(ctx, cli.RunnerConfig)
kctx.FatalIfErrorf(err)
}
2 changes: 0 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ services:
retries: 60
start_period: 80s
otel-lgtm:
profiles:
- infra
image: grafana/otel-lgtm
platform: linux/amd64
ports:
Expand Down
2 changes: 1 addition & 1 deletion frontend/cli/cmd_box_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (b *boxRunCmd) Run(
if err != nil {
return fmt.Errorf("failed to create runner port allocator: %w", err)
}
runnerScaling, err := localscaling.NewLocalScaling(bindAllocator, []*url.URL{b.Bind}, "", false, b.Registry)
runnerScaling, err := localscaling.NewLocalScaling(bindAllocator, []*url.URL{b.Bind}, "", false, b.Registry, false)
if err != nil {
return fmt.Errorf("failed to create runner autoscaler: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion frontend/cli/cmd_schema_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *schemaImportCmd) setup(ctx context.Context) error {
return err
}

err = container.Run(ctx, "ollama/ollama", ollamaContainerName, s.OllamaPort, 11434, optional.Some(ollamaVolume))
err = container.Run(ctx, "ollama/ollama", ollamaContainerName, map[int]int{s.OllamaPort: 11434}, optional.Some(ollamaVolume))
if err != nil {
return err
}
Expand Down
13 changes: 12 additions & 1 deletion frontend/cli/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type serveCmd struct {
ObservabilityConfig observability.Config `embed:"" prefix:"o11y-"`
DatabaseImage string `help:"The container image to start for the database" default:"postgres:15.8" env:"FTL_DATABASE_IMAGE" hidden:""`
RegistryImage string `help:"The container image to start for the image registry" default:"registry:2" env:"FTL_REGISTRY_IMAGE" hidden:""`
GrafanaImage string `help:"The container image to start for the automatic Grafana instance" default:"grafana/otel-lgtm" env:"FTL_GRAFANA_IMAGE" hidden:""`
DisableGrafana bool `help:"Disable the automatic Grafana that is started if no telemetry collector is specified." default:"false"`
controller.CommonConfig
provisioner.CommonProvisionerConfig
}
Expand Down Expand Up @@ -116,6 +118,15 @@ func (s *serveCmd) run(
logger.Debugf("Starting FTL with %d controller(s)", s.Controllers)
}

if !s.DisableGrafana && !bool(s.ObservabilityConfig.ExportOTEL) {
err := dev.SetupGrafana(ctx, s.GrafanaImage)
if err != nil {
return fmt.Errorf("failed to setup grafana image: %w", err)
}
os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
os.Setenv("OTEL_METRIC_EXPORT_INTERVAL", "1000")
s.ObservabilityConfig.ExportOTEL = true
}
err := observability.Init(ctx, false, "", "ftl-serve", ftl.Version, s.ObservabilityConfig)
if err != nil {
return fmt.Errorf("observability init failed: %w", err)
Expand Down Expand Up @@ -174,7 +185,7 @@ func (s *serveCmd) run(
provisionerAddresses = append(provisionerAddresses, bind)
}

runnerScaling, err := localscaling.NewLocalScaling(bindAllocator, controllerAddresses, projConfig.Path, devMode && !projConfig.DisableIDEIntegration, registry)
runnerScaling, err := localscaling.NewLocalScaling(bindAllocator, controllerAddresses, projConfig.Path, devMode && !projConfig.DisableIDEIntegration, registry, bool(s.ObservabilityConfig.ExportOTEL))
if err != nil {
return err
}
Expand Down
23 changes: 13 additions & 10 deletions internal/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func Pull(ctx context.Context, imageName string) error {
}

// Run starts a new detached container with the given image, name, port map, and (optional) volume mount.
func Run(ctx context.Context, image, name string, hostPort, containerPort int, volume optional.Option[string]) error {
func Run(ctx context.Context, image, name string, hostToContainerPort map[int]int, volume optional.Option[string], env ...string) error {
cli, err := dockerClient.Get(ctx)
if err != nil {
return err
Expand All @@ -96,21 +96,23 @@ func Run(ctx context.Context, image, name string, hostPort, containerPort int, v
}

config := container.Config{
Image: image,
Image: image,
Env: env,
ExposedPorts: map[nat.Port]struct{}{},
}
bindings := nat.PortMap{}
for k, v := range hostToContainerPort {
containerNatPort := nat.Port(fmt.Sprintf("%d/tcp", v))
bindings[containerNatPort] = []nat.PortBinding{{HostPort: strconv.Itoa(k)}}
config.ExposedPorts[containerNatPort] = struct{}{}
}

containerNatPort := nat.Port(fmt.Sprintf("%d/tcp", containerPort))
hostConfig := container.HostConfig{
PublishAllPorts: true,
RestartPolicy: container.RestartPolicy{
Name: container.RestartPolicyAlways,
},
PortBindings: nat.PortMap{
containerNatPort: []nat.PortBinding{
{
HostPort: strconv.Itoa(hostPort),
},
},
},
PortBindings: bindings,
}
if v, ok := volume.Get(); ok {
hostConfig.Binds = []string{v}
Expand All @@ -132,6 +134,7 @@ func Run(ctx context.Context, image, name string, hostPort, containerPort int, v
// RunDB runs a new detached postgres container with the given name and exposed port.
func RunDB(ctx context.Context, name string, port int, image string) error {
cli, err := dockerClient.Get(ctx)

if err != nil {
return err
}
Expand Down
56 changes: 56 additions & 0 deletions internal/dev/grafana.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package dev

import (
"context"
"fmt"
"net"

"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/internal/container"
"github.com/TBD54566975/ftl/internal/log"
)

const ftlGrafanaName = "ftl-otel-lgtm-1"

func SetupGrafana(ctx context.Context, image string) error {
logger := log.FromContext(ctx)

exists, err := container.DoesExist(ctx, ftlGrafanaName, optional.Some(image))
if err != nil {
return fmt.Errorf("failed to check if container exists: %w", err)
}

if !exists {
logger.Debugf("Creating docker container '%s' for grafana", ftlGrafanaName)
// check if port is already in use
ports := []int{3000, 4317, 4318}
for _, port := range ports {
if l, err := net.Listen("tcp", fmt.Sprintf(":%d", port)); err != nil {
return fmt.Errorf("port %d is already in use", port)
} else if err = l.Close(); err != nil {
return fmt.Errorf("failed to close listener: %w", err)
}
}
err = container.Run(ctx, image, ftlGrafanaName, map[int]int{3000: 3000, 4317: 4317, 4318: 4318}, optional.None[string](), "ENABLE_LOGS_ALL=true", "GF_PATHS_DATA=/data/grafana")
if err != nil {
return fmt.Errorf("failed to run grafana container: %w", err)
}

} else {
// Start the existing container
err = container.Start(ctx, ftlGrafanaName)
if err != nil {
return fmt.Errorf("failed to start existing registry container: %w", err)
}

logger.Debugf("Reusing existing docker container %s for grafana", ftlGrafanaName)
}

err = WaitForPortReady(ctx, 3000)
if err != nil {
return fmt.Errorf("registry container failed to be healthy: %w", err)
}

return nil
}
10 changes: 5 additions & 5 deletions internal/dev/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func SetupRegistry(ctx context.Context, image string, port int) error {
return fmt.Errorf("failed to close listener: %w", err)
}

err = container.Run(ctx, image, ftlRegistryName, port, 5000, optional.None[string]())
err = container.Run(ctx, image, ftlRegistryName, map[int]int{port: 5000}, optional.None[string]())
if err != nil {
return fmt.Errorf("failed to run registry container: %w", err)
}
Expand All @@ -54,24 +54,24 @@ func SetupRegistry(ctx context.Context, image string, port int) error {
logger.Debugf("Reusing existing docker container %s on port %d for image registry", ftlRegistryName, port)
}

err = WaitForRegistryReady(ctx, port)
err = WaitForPortReady(ctx, port)
if err != nil {
return fmt.Errorf("registry container failed to be healthy: %w", err)
}

return nil
}

func WaitForRegistryReady(ctx context.Context, port int) error {
stuartwdouglas marked this conversation as resolved.
Show resolved Hide resolved
func WaitForPortReady(ctx context.Context, port int) error {

timeout := time.After(10 * time.Minute)
retry := time.NewTicker(5 * time.Millisecond)
for {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled waiting for registry")
return fmt.Errorf("context cancelled waiting for container")
case <-timeout:
return fmt.Errorf("timed out waiting for registry to be healthy")
return fmt.Errorf("timed out waiting for container to be healthy")
case <-retry.C:
url := fmt.Sprintf("http://127.0.0.1:%d", port)

Expand Down
Loading