diff --git a/Justfile b/Justfile index cd455dda95..68cc81f4a5 100644 --- a/Justfile +++ b/Justfile @@ -280,25 +280,6 @@ debug *args: dlv_pid=$! wait "$dlv_pid" -# Run `ftl dev` with the given args after setting the necessary envar. -otel-dev *args: - #!/bin/bash - set -euo pipefail - - export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:${OTEL_GRPC_PORT}" - export OTEL_METRIC_EXPORT_INTERVAL=${OTEL_METRIC_EXPORT_INTERVAL} - # Uncomment this line for much richer debug logs - # export FTL_O11Y_LOG_LEVEL="debug" - ftl dev {{args}} - -# runs the otel-lgtm observability stack locallt which includes -# an otel collector, loki (for logs), prometheus metrics db (for metrics), tempo (trace storage) and grafana (for visualization) -observe: - docker compose up otel-lgtm - -observe-stop: - docker compose down otel-lgtm - localstack: docker compose up localstack -d --wait diff --git a/backend/controller/scaling/localscaling/local_scaling.go b/backend/controller/scaling/localscaling/local_scaling.go index 9867bb472b..adb5603970 100644 --- a/backend/controller/scaling/localscaling/local_scaling.go +++ b/backend/controller/scaling/localscaling/local_scaling.go @@ -22,6 +22,7 @@ import ( "github.com/TBD54566975/ftl/internal/localdebug" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/observability" ) var _ scaling.RunnerScaling = &localScaling{} @@ -42,6 +43,7 @@ type localScaling struct { prevRunnerSuffix int ideSupport optional.Option[localdebug.IDEIntegration] registryConfig artefacts.RegistryConfig + enableOtel bool } func (l *localScaling) Start(ctx context.Context, endpoint url.URL, leaser leases.Leaser) error { @@ -82,7 +84,7 @@ type runnerInfo struct { port string } -func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*url.URL, configPath string, enableIDEIntegration bool, registryConfig artefacts.RegistryConfig) (scaling.RunnerScaling, error) { +func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*url.URL, configPath string, enableIDEIntegration bool, registryConfig artefacts.RegistryConfig, enableOtel bool) (scaling.RunnerScaling, error) { cacheDir, err := os.UserCacheDir() if err != nil { @@ -97,6 +99,7 @@ func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*u prevRunnerSuffix: -1, debugPorts: map[string]*localdebug.DebugInfo{}, registryConfig: registryConfig, + enableOtel: enableOtel, } if enableIDEIntegration && configPath != "" { local.ideSupport = optional.Ptr(localdebug.NewIDEIntegration(configPath)) @@ -201,6 +204,9 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, in Deployment: deploymentKey, DebugPort: debugPort, Registry: l.registryConfig, + ObservabilityConfig: observability.Config{ + ExportOTEL: observability.ExportOTELFlag(l.enableOtel), + }, } simpleName := fmt.Sprintf("runner%d", keySuffix) diff --git a/backend/runner/runner.go b/backend/runner/runner.go index ba081bc5cd..5bec578f3a 100644 --- a/backend/runner/runner.go +++ b/backend/runner/runner.go @@ -27,6 +27,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/TBD54566975/ftl" "github.com/TBD54566975/ftl/backend/controller/artefacts" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" @@ -57,11 +58,16 @@ type Config struct { Deployment string `help:"The deployment this runner is for." env:"FTL_DEPLOYMENT"` DebugPort int `help:"The port to use for debugging." env:"FTL_DEBUG_PORT"` Registry artefacts.RegistryConfig `embed:"" prefix:"oci-"` + ObservabilityConfig ftlobservability.Config `embed:"" prefix:"o11y-"` } func Start(ctx context.Context, config Config) error { ctx, doneFunc := context.WithCancel(ctx) defer doneFunc() + err := ftlobservability.Init(ctx, false, "", "ftl-runner", ftl.Version, config.ObservabilityConfig) + if err != nil { + return fmt.Errorf("failed to initialise observability: %w", err) + } hostname, err := os.Hostname() if err != nil { observability.Runner.StartupFailed(ctx) diff --git a/cmd/ftl-runner/main.go b/cmd/ftl-runner/main.go index 1e9b784921..b75bf402cf 100644 --- a/cmd/ftl-runner/main.go +++ b/cmd/ftl-runner/main.go @@ -11,14 +11,12 @@ import ( "github.com/TBD54566975/ftl/backend/runner" _ "github.com/TBD54566975/ftl/internal/automaxprocs" // Set GOMAXPROCS to match Linux container CPU quota. "github.com/TBD54566975/ftl/internal/log" - "github.com/TBD54566975/ftl/internal/observability" ) var cli struct { - Version kong.VersionFlag `help:"Show version."` - LogConfig log.Config `prefix:"log-" embed:""` - ObservabilityConfig observability.Config `embed:"" prefix:"o11y-"` - RunnerConfig runner.Config `embed:""` + Version kong.VersionFlag `help:"Show version."` + LogConfig log.Config `prefix:"log-" embed:""` + RunnerConfig runner.Config `embed:""` } func main() { @@ -44,8 +42,6 @@ and route to user code. }) logger := log.Configure(os.Stderr, cli.LogConfig) ctx := log.ContextWithLogger(context.Background(), logger) - err = observability.Init(ctx, false, "", "ftl-runner", ftl.Version, cli.ObservabilityConfig) - kctx.FatalIfErrorf(err, "failed to initialize observability") err = runner.Start(ctx, cli.RunnerConfig) kctx.FatalIfErrorf(err) } diff --git a/docker-compose.yml b/docker-compose.yml index 824c220118..5b8cfc668d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,8 +19,6 @@ services: retries: 60 start_period: 80s otel-lgtm: - profiles: - - infra image: grafana/otel-lgtm platform: linux/amd64 ports: diff --git a/frontend/cli/cmd_box_run.go b/frontend/cli/cmd_box_run.go index e53d668c72..0c56932195 100644 --- a/frontend/cli/cmd_box_run.go +++ b/frontend/cli/cmd_box_run.go @@ -60,7 +60,7 @@ func (b *boxRunCmd) Run( if err != nil { return fmt.Errorf("failed to create runner port allocator: %w", err) } - runnerScaling, err := localscaling.NewLocalScaling(bindAllocator, []*url.URL{b.Bind}, "", false, b.Registry) + runnerScaling, err := localscaling.NewLocalScaling(bindAllocator, []*url.URL{b.Bind}, "", false, b.Registry, false) if err != nil { return fmt.Errorf("failed to create runner autoscaler: %w", err) } diff --git a/frontend/cli/cmd_schema_import.go b/frontend/cli/cmd_schema_import.go index 917967b46e..18073fb55d 100644 --- a/frontend/cli/cmd_schema_import.go +++ b/frontend/cli/cmd_schema_import.go @@ -156,7 +156,7 @@ func (s *schemaImportCmd) setup(ctx context.Context) error { return err } - err = container.Run(ctx, "ollama/ollama", ollamaContainerName, s.OllamaPort, 11434, optional.Some(ollamaVolume)) + err = container.Run(ctx, "ollama/ollama", ollamaContainerName, map[int]int{s.OllamaPort: 11434}, optional.Some(ollamaVolume)) if err != nil { return err } diff --git a/frontend/cli/cmd_serve.go b/frontend/cli/cmd_serve.go index 2c2aaae89f..a2b515baea 100644 --- a/frontend/cli/cmd_serve.go +++ b/frontend/cli/cmd_serve.go @@ -51,6 +51,8 @@ type serveCmd struct { ObservabilityConfig observability.Config `embed:"" prefix:"o11y-"` DatabaseImage string `help:"The container image to start for the database" default:"postgres:15.8" env:"FTL_DATABASE_IMAGE" hidden:""` RegistryImage string `help:"The container image to start for the image registry" default:"registry:2" env:"FTL_REGISTRY_IMAGE" hidden:""` + GrafanaImage string `help:"The container image to start for the automatic Grafana instance" default:"grafana/otel-lgtm" env:"FTL_GRAFANA_IMAGE" hidden:""` + DisableGrafana bool `help:"Disable the automatic Grafana that is started if no telemetry collector is specified." default:"false"` controller.CommonConfig provisioner.CommonProvisionerConfig } @@ -116,6 +118,15 @@ func (s *serveCmd) run( logger.Debugf("Starting FTL with %d controller(s)", s.Controllers) } + if !s.DisableGrafana && !bool(s.ObservabilityConfig.ExportOTEL) { + err := dev.SetupGrafana(ctx, s.GrafanaImage) + if err != nil { + return fmt.Errorf("failed to setup grafana image: %w", err) + } + os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") + os.Setenv("OTEL_METRIC_EXPORT_INTERVAL", "1000") + s.ObservabilityConfig.ExportOTEL = true + } err := observability.Init(ctx, false, "", "ftl-serve", ftl.Version, s.ObservabilityConfig) if err != nil { return fmt.Errorf("observability init failed: %w", err) @@ -174,7 +185,7 @@ func (s *serveCmd) run( provisionerAddresses = append(provisionerAddresses, bind) } - runnerScaling, err := localscaling.NewLocalScaling(bindAllocator, controllerAddresses, projConfig.Path, devMode && !projConfig.DisableIDEIntegration, registry) + runnerScaling, err := localscaling.NewLocalScaling(bindAllocator, controllerAddresses, projConfig.Path, devMode && !projConfig.DisableIDEIntegration, registry, bool(s.ObservabilityConfig.ExportOTEL)) if err != nil { return err } diff --git a/internal/container/container.go b/internal/container/container.go index 8211f4f8b5..a18bdc0634 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -77,7 +77,7 @@ func Pull(ctx context.Context, imageName string) error { } // Run starts a new detached container with the given image, name, port map, and (optional) volume mount. -func Run(ctx context.Context, image, name string, hostPort, containerPort int, volume optional.Option[string]) error { +func Run(ctx context.Context, image, name string, hostToContainerPort map[int]int, volume optional.Option[string], env ...string) error { cli, err := dockerClient.Get(ctx) if err != nil { return err @@ -96,21 +96,23 @@ func Run(ctx context.Context, image, name string, hostPort, containerPort int, v } config := container.Config{ - Image: image, + Image: image, + Env: env, + ExposedPorts: map[nat.Port]struct{}{}, + } + bindings := nat.PortMap{} + for k, v := range hostToContainerPort { + containerNatPort := nat.Port(fmt.Sprintf("%d/tcp", v)) + bindings[containerNatPort] = []nat.PortBinding{{HostPort: strconv.Itoa(k)}} + config.ExposedPorts[containerNatPort] = struct{}{} } - containerNatPort := nat.Port(fmt.Sprintf("%d/tcp", containerPort)) hostConfig := container.HostConfig{ + PublishAllPorts: true, RestartPolicy: container.RestartPolicy{ Name: container.RestartPolicyAlways, }, - PortBindings: nat.PortMap{ - containerNatPort: []nat.PortBinding{ - { - HostPort: strconv.Itoa(hostPort), - }, - }, - }, + PortBindings: bindings, } if v, ok := volume.Get(); ok { hostConfig.Binds = []string{v} @@ -132,6 +134,7 @@ func Run(ctx context.Context, image, name string, hostPort, containerPort int, v // RunDB runs a new detached postgres container with the given name and exposed port. func RunDB(ctx context.Context, name string, port int, image string) error { cli, err := dockerClient.Get(ctx) + if err != nil { return err } diff --git a/internal/dev/grafana.go b/internal/dev/grafana.go new file mode 100644 index 0000000000..8a0b603f18 --- /dev/null +++ b/internal/dev/grafana.go @@ -0,0 +1,56 @@ +package dev + +import ( + "context" + "fmt" + "net" + + "github.com/alecthomas/types/optional" + + "github.com/TBD54566975/ftl/internal/container" + "github.com/TBD54566975/ftl/internal/log" +) + +const ftlGrafanaName = "ftl-otel-lgtm-1" + +func SetupGrafana(ctx context.Context, image string) error { + logger := log.FromContext(ctx) + + exists, err := container.DoesExist(ctx, ftlGrafanaName, optional.Some(image)) + if err != nil { + return fmt.Errorf("failed to check if container exists: %w", err) + } + + if !exists { + logger.Debugf("Creating docker container '%s' for grafana", ftlGrafanaName) + // check if port is already in use + ports := []int{3000, 4317, 4318} + for _, port := range ports { + if l, err := net.Listen("tcp", fmt.Sprintf(":%d", port)); err != nil { + return fmt.Errorf("port %d is already in use", port) + } else if err = l.Close(); err != nil { + return fmt.Errorf("failed to close listener: %w", err) + } + } + err = container.Run(ctx, image, ftlGrafanaName, map[int]int{3000: 3000, 4317: 4317, 4318: 4318}, optional.None[string](), "ENABLE_LOGS_ALL=true", "GF_PATHS_DATA=/data/grafana") + if err != nil { + return fmt.Errorf("failed to run grafana container: %w", err) + } + + } else { + // Start the existing container + err = container.Start(ctx, ftlGrafanaName) + if err != nil { + return fmt.Errorf("failed to start existing registry container: %w", err) + } + + logger.Debugf("Reusing existing docker container %s for grafana", ftlGrafanaName) + } + + err = WaitForPortReady(ctx, 3000) + if err != nil { + return fmt.Errorf("registry container failed to be healthy: %w", err) + } + + return nil +} diff --git a/internal/dev/registry.go b/internal/dev/registry.go index e3a38f360f..43cbbf1395 100644 --- a/internal/dev/registry.go +++ b/internal/dev/registry.go @@ -33,7 +33,7 @@ func SetupRegistry(ctx context.Context, image string, port int) error { return fmt.Errorf("failed to close listener: %w", err) } - err = container.Run(ctx, image, ftlRegistryName, port, 5000, optional.None[string]()) + err = container.Run(ctx, image, ftlRegistryName, map[int]int{port: 5000}, optional.None[string]()) if err != nil { return fmt.Errorf("failed to run registry container: %w", err) } @@ -54,7 +54,7 @@ func SetupRegistry(ctx context.Context, image string, port int) error { logger.Debugf("Reusing existing docker container %s on port %d for image registry", ftlRegistryName, port) } - err = WaitForRegistryReady(ctx, port) + err = WaitForPortReady(ctx, port) if err != nil { return fmt.Errorf("registry container failed to be healthy: %w", err) } @@ -62,16 +62,16 @@ func SetupRegistry(ctx context.Context, image string, port int) error { return nil } -func WaitForRegistryReady(ctx context.Context, port int) error { +func WaitForPortReady(ctx context.Context, port int) error { timeout := time.After(10 * time.Minute) retry := time.NewTicker(5 * time.Millisecond) for { select { case <-ctx.Done(): - return fmt.Errorf("context cancelled waiting for registry") + return fmt.Errorf("context cancelled waiting for container") case <-timeout: - return fmt.Errorf("timed out waiting for registry to be healthy") + return fmt.Errorf("timed out waiting for container to be healthy") case <-retry.C: url := fmt.Sprintf("http://127.0.0.1:%d", port)