From 31b3f1f2c97025d2c23ded506fd6796dae36d509 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 3 Aug 2023 17:47:23 -0300 Subject: [PATCH 001/434] chore: wip work on otel backend. --- agent/backend/otel/otel.go | 103 +++++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 agent/backend/otel/otel.go diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go new file mode 100644 index 000000000..b56d21764 --- /dev/null +++ b/agent/backend/otel/otel.go @@ -0,0 +1,103 @@ +package otel + +import ( + "context" + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/orb-community/orb/agent/backend" + "github.com/orb-community/orb/agent/config" + "github.com/orb-community/orb/agent/policies" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" + "time" +) + +var _ backend.Backend = (*openTelemetryBackend)(nil) + +type openTelemetryBackend struct { + logger *zap.Logger + startTime time.Time + + binaryPath string + + //policies + policyRepo policies.PolicyRepo + agentTags map[string]string + + // Context for controlling the context cancellation + startCtx context.Context + mainCancelFunction context.CancelFunc + + // MQTT Config for OTEL MQTT Exporter + mqttConfig config.MQTTConfig + + mqttClient *mqtt.Client + + otlpMetricsTopic string + otlpTracesTopic string + otlpLogsTopic string + + otelReceiverHost string + otelReceiverPort int + receiver receiver.Metrics + exporter exporter.Metrics +} + +// Configure initializes the backend with the given configuration +func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo, configuration map[string]string, m2 map[string]interface{}) error { + o.logger = logger + o.policyRepo = repo + + var prs bool + if +} + +func (o openTelemetryBackend) SetCommsClient(s string, client *mqtt.Client, s2 string) { + //TODO implement me + panic("implement me") +} + +func (o openTelemetryBackend) Version() (string, error) { + //TODO implement me + panic("implement me") +} + +func (o openTelemetryBackend) Start(ctx context.Context, cancelFunc context.CancelFunc) error { + //TODO implement me + panic("implement me") +} + +func (o openTelemetryBackend) Stop(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (o openTelemetryBackend) FullReset(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (o openTelemetryBackend) GetStartTime() time.Time { + //TODO implement me + panic("implement me") +} + +func (o openTelemetryBackend) GetCapabilities() (map[string]interface{}, error) { + //TODO implement me + panic("implement me") +} + +func (o openTelemetryBackend) GetRunningStatus() (backend.RunningStatus, string, error) { + //TODO implement me + panic("implement me") +} + +func (o openTelemetryBackend) ApplyPolicy(data policies.PolicyData, updatePolicy bool) error { + //TODO implement me + panic("implement me") +} + +func (o openTelemetryBackend) RemovePolicy(data policies.PolicyData) error { + //TODO implement me + panic("implement me") +} From 0d0808f37035f7643eb550ff7c9cacd37e72a98b Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 7 Aug 2023 10:23:05 -0300 Subject: [PATCH 002/434] chore: use otel v.0.81.0 to not break the api. --- go.mod | 49 +++++++++++++++-------------- go.sum | 99 ++++++++++++++++++++++++++++++---------------------------- 2 files changed, 76 insertions(+), 72 deletions(-) diff --git a/go.mod b/go.mod index 8b3651a7f..65cdcc339 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/mainflux/mainflux v0.0.0-20220415135135-92d8fb99bf82 github.com/mattn/go-sqlite3 v1.14.16 github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 - github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.82.0 + github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.81.0 github.com/opentracing/opentracing-go v1.2.0 github.com/ory/dockertest/v3 v3.10.0 github.com/patrickmn/go-cache v2.1.0+incompatible @@ -35,15 +35,15 @@ require ( github.com/spf13/viper v1.15.0 github.com/stretchr/testify v1.8.4 github.com/uber/jaeger-client-go v2.30.0+incompatible - go.opentelemetry.io/collector/component v0.82.0 - go.opentelemetry.io/collector/config/confighttp v0.82.0 - go.opentelemetry.io/collector/config/configtelemetry v0.82.0 - go.opentelemetry.io/collector/config/configtls v0.82.0 - go.opentelemetry.io/collector/confmap v0.82.0 - go.opentelemetry.io/collector/consumer v0.82.0 - go.opentelemetry.io/collector/exporter v0.82.0 - go.opentelemetry.io/collector/processor v0.82.0 - go.opentelemetry.io/collector/receiver v0.82.0 + go.opentelemetry.io/collector/component v0.81.0 + go.opentelemetry.io/collector/config/confighttp v0.81.0 + go.opentelemetry.io/collector/config/configtelemetry v0.81.0 + go.opentelemetry.io/collector/config/configtls v0.81.0 + go.opentelemetry.io/collector/confmap v0.81.0 + go.opentelemetry.io/collector/consumer v0.81.0 + go.opentelemetry.io/collector/exporter v0.81.0 + go.opentelemetry.io/collector/processor v0.81.0 + go.opentelemetry.io/collector/receiver v0.81.0 go.opentelemetry.io/otel/sdk/metric v0.39.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.24.0 @@ -62,7 +62,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/google/uuid v1.3.0 github.com/prometheus/common v0.44.0 // indirect - go.opentelemetry.io/collector v0.82.0 + go.opentelemetry.io/collector v0.81.0 go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 @@ -121,8 +121,9 @@ require ( github.com/nats-io/nats.go v1.27.0 // indirect github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/observiq/ctimefmt v1.0.0 // indirect github.com/oklog/ulid/v2 v2.0.2 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.82.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.81.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc2 // indirect github.com/opencontainers/runc v1.1.5 // indirect @@ -144,14 +145,14 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/collector/config/configauth v0.82.0 // indirect - go.opentelemetry.io/collector/config/configcompression v0.82.0 // indirect - go.opentelemetry.io/collector/config/configgrpc v0.82.0 // indirect - go.opentelemetry.io/collector/config/confignet v0.82.0 // indirect - go.opentelemetry.io/collector/config/configopaque v0.82.0 // indirect - go.opentelemetry.io/collector/config/internal v0.82.0 // indirect - go.opentelemetry.io/collector/extension v0.82.0 // indirect - go.opentelemetry.io/collector/extension/auth v0.82.0 // indirect + go.opentelemetry.io/collector/config/configauth v0.81.0 // indirect + go.opentelemetry.io/collector/config/configcompression v0.81.0 // indirect + go.opentelemetry.io/collector/config/configgrpc v0.81.0 // indirect + go.opentelemetry.io/collector/config/confignet v0.81.0 // indirect + go.opentelemetry.io/collector/config/configopaque v0.81.0 // indirect + go.opentelemetry.io/collector/config/internal v0.81.0 // indirect + go.opentelemetry.io/collector/extension v0.81.0 // indirect + go.opentelemetry.io/collector/extension/auth v0.81.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.1-0.20230612162650-64be7e574a17 // indirect go.opentelemetry.io/otel v1.16.0 // indirect @@ -201,8 +202,8 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.82.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.82.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.81.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.81.0 github.com/orb-community/diode v0.0.0-20230419222319-4ec19ba89e9f github.com/pelletier/go-toml/v2 v2.0.6 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect @@ -211,8 +212,8 @@ require ( github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 github.com/xdg-go/stringprep v1.0.4 // indirect - go.opentelemetry.io/collector/receiver/otlpreceiver v0.82.0 - go.opentelemetry.io/collector/semconv v0.82.0 + go.opentelemetry.io/collector/receiver/otlpreceiver v0.81.0 + go.opentelemetry.io/collector/semconv v0.81.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect k8s.io/kube-openapi v0.0.0-20230303024457-afdc3dddf62d // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/go.sum b/go.sum index 9d5a72159..5e0a196fe 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,7 @@ github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF0 github.com/Masterminds/sprig v2.22.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuNhlNS5hqE0NB0E6fgfo2Br3o= github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg= github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE= +github.com/Mottl/ctimefmt v0.0.0-20190803144728-fd2ac23a585a/go.mod h1:eyj2WSIdoPMPs2eNTLpSmM6Nzqo4V80/d6jHpnJ1SAI= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -558,6 +559,8 @@ github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnu github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/observiq/ctimefmt v1.0.0 h1:r7vTJ+Slkrt9fZ67mkf+mA6zAdR5nGIJRMTzkUyvilk= +github.com/observiq/ctimefmt v1.0.0/go.mod h1:mxi62//WbSpG/roCO1c6MqZ7zQTvjVtYheqHN3eOjvc= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid/v2 v2.0.2 h1:r4fFzBm+bv0wNKNh5eXTwU7i85y5x+uwkxCUTNVQqLc= github.com/oklog/ulid/v2 v2.0.2/go.mod h1:mtBL0Qe/0HAx6/a4Z30qxVIAL1eQDweXq5lxOEiwQ68= @@ -571,14 +574,14 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys= -github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.82.0 h1:0b6glbENAwPdasKKVOgpR/EaZG1sJhsUfXCRiwZ0drU= -github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.82.0/go.mod h1:MKnM9GFqPz4HY4NQDDao+dIjZz4BvThAijuJuPC8NOI= -github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.82.0 h1:2efL2SE/dndrTLPQcpFzrsIJpYw0i3bkFG0n40xnsQI= -github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.82.0/go.mod h1:tqP4R7pPk5M0v0j8nP5h2o1fUqofC2kSrirzkwQW7p0= -github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.82.0 h1:fKTXkXX+iMAAiTu4r1j1DbzKYvbd6CvFoWNWLhTOJjk= -github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.82.0/go.mod h1:1SM5fbDUmJHQUNO0T/lDzMVmGpn+z9UJHyjfGg6IQ0Q= -github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.82.0 h1:dXtEW53Rf09nSx3nc1pvUX1USsL3swTMzDw6rLA4cA0= -github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.82.0/go.mod h1:Z95mPIlMbXnOXBYNOgu26isHr09bcO4/ZxIrOVFcpsM= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.81.0 h1:sPjCHuqjn5UYDJOai4FulMCfLP+7AbspjHfv0jAtmD0= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.81.0/go.mod h1:moQ6krtZ8dyziij2P+9eao5+gBfCJjiNDwN7n2MZZs4= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.81.0 h1:Rb8e1O31dgjTEn6823RsPs2RaOwl7fVuFWz2qK9DRpY= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.81.0/go.mod h1:tnyFHqiWxeNUqAAaGuKDD7XDL0KwBMSqvRB9PsKCzng= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.81.0 h1:UCF4zWe24m1+yQeYGEQjzq5c9yNAf4d1lknbR7PAoH4= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.81.0/go.mod h1:/HK52N8ufi1rKuShEanBZVLVpOygLoIT9zqs0azKQ/s= +github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.81.0 h1:sVmU1X/9txOvdQX0VbnqwetrKThnsPXlJO52dmaFng8= +github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.81.0/go.mod h1:x71DNfrdvoQSN3AiovAsoSi4aG74Rary8oJATXzzkMw= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0-rc2 h1:2zx/Stx4Wc5pIPDvIxHXvXtQFW/7XWJGmnM7r3wg034= @@ -760,50 +763,50 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/collector v0.82.0 h1:MaKqWT0R4GCdkZDhYWOQkLfoJj9V7GsMbk1gsAuogaw= -go.opentelemetry.io/collector v0.82.0/go.mod h1:PMmDJkZzC1xpcViHlwMMEVeAnRRl3HYy3nXgD8KJwG0= -go.opentelemetry.io/collector/component v0.82.0 h1:ID9nOGKBf5G0avhuYQlTzmwAyIMvh9B+tlckLE/4qw4= -go.opentelemetry.io/collector/component v0.82.0/go.mod h1:jSdGG4L1Ger6ob6lWpr8jmKC2qqC+XZ/gOgu7GUA5xs= -go.opentelemetry.io/collector/config/configauth v0.82.0 h1:H5xrWyPMotSqajiiH/bay8bpVsT4aq6Vih4OuArXv4Q= -go.opentelemetry.io/collector/config/configauth v0.82.0/go.mod h1:P0ukmBIUk+HP0O7yfUOKRmPmffneAQgmEL9/iTOo1CU= -go.opentelemetry.io/collector/config/configcompression v0.82.0 h1:M6a7eiHdBUB8mIioDhWugJfNm7Sw85cvv/OXyTDhtY0= -go.opentelemetry.io/collector/config/configcompression v0.82.0/go.mod h1:xhHm1sEH7BTECAJo1xn64NMxeIvZGKdVGdSKUUc+YuM= -go.opentelemetry.io/collector/config/configgrpc v0.82.0 h1:taZWDbtVBm0OOcgnfpVA1X43pmU2oNhj39B2uV3COQk= -go.opentelemetry.io/collector/config/configgrpc v0.82.0/go.mod h1:NHXHRI40Q7TT/d38DKT30B7DOrVUkj7anEFOD59R9o8= -go.opentelemetry.io/collector/config/confighttp v0.82.0 h1:2LhyqVTd+Bsr8SgsCq6+q731F81uddK9GwvGhwD/Co0= -go.opentelemetry.io/collector/config/confighttp v0.82.0/go.mod h1:OHGx/aJqGJ9z2jaBXvaylwkAuiUwikg1/n+RRDpsfOo= -go.opentelemetry.io/collector/config/confignet v0.82.0 h1:zN9JaFTn7Dth3u5ot6KZJcBZACTEzGqFWYyO5qAlYfo= -go.opentelemetry.io/collector/config/confignet v0.82.0/go.mod h1:unOg7BZvpt6T5xsf+LyeOQvUhD8ld/2AbfOsmUZ/bPM= -go.opentelemetry.io/collector/config/configopaque v0.82.0 h1:0Ma63QTr4AkODzEABZHtgiU5Dig8SItpHOuB28UnVSw= -go.opentelemetry.io/collector/config/configopaque v0.82.0/go.mod h1:pM1oy6gasukw3H6jAvc9Q9OtFaaY2IbfeuwCPAjOgXc= -go.opentelemetry.io/collector/config/configtelemetry v0.82.0 h1:Zln2K4S5gBDcOpBNIzM0cZS5P6cohEYstHngVvIbGBY= -go.opentelemetry.io/collector/config/configtelemetry v0.82.0/go.mod h1:KEYQRiYJdx38iZkvcLKBZWH9fK4NeafxBwGRrRKMgyA= -go.opentelemetry.io/collector/config/configtls v0.82.0 h1:eE/8muTszLlviOGLy5N08BaXLCcYqDW3mKIoKyDDa8o= -go.opentelemetry.io/collector/config/configtls v0.82.0/go.mod h1:unBTmL1bdpkp9mYEDz7N+Ln4yEwh7Ug74I1HgZMplCk= -go.opentelemetry.io/collector/config/internal v0.82.0 h1:JnnDARkXrC3OJDsMfQkBgfI0Np4s+18zvoDqZ4OH0+I= -go.opentelemetry.io/collector/config/internal v0.82.0/go.mod h1:RKcLV1gQxhgwx+6rlPYsvGMq1RZNne3UeOUZkHxJnIg= -go.opentelemetry.io/collector/confmap v0.82.0 h1:s1Rd8jz21DGlLJfED0Py9VaEq2qPWmWwWy5MriDCX+4= -go.opentelemetry.io/collector/confmap v0.82.0/go.mod h1:IS/PoUYHETtxV6+fJammTkCxxa4LEwK2u4Cx/bVCH/s= -go.opentelemetry.io/collector/consumer v0.82.0 h1:vZecylW6bpaphetSTjCLgwXLxSYQ6oe/kzwkx4iF5oE= -go.opentelemetry.io/collector/consumer v0.82.0/go.mod h1:qrhd0i0Gp0RkihcEXb+7Rb584Kal2NmGH1eA4Zg6puA= -go.opentelemetry.io/collector/exporter v0.82.0 h1:BWsx4rWfVwlV+qNuevSMm+2Cv6uGZYYZ9CEFqq0q+F4= -go.opentelemetry.io/collector/exporter v0.82.0/go.mod h1:e3VPpLYVNRaF+G2HuKw6A5hTBMYZ4tgRYYzMusfwFJE= -go.opentelemetry.io/collector/extension v0.82.0 h1:DH4tqrTOz0HmGDJ6FT/jRD2woQf3ugqC6QqSiQdH3wg= -go.opentelemetry.io/collector/extension v0.82.0/go.mod h1:n7d0XTh7fdyorZWTc+gLpJh78FS7GjRqIjUiW1xdhe0= -go.opentelemetry.io/collector/extension/auth v0.82.0 h1:iaxwFslRj6mfzs1wVzbnj+gDU2G98IeXW4tcrq78p5s= -go.opentelemetry.io/collector/extension/auth v0.82.0/go.mod h1:O1xBcb06pKD8g3FadLDvMa1xKZwPGdHQp4CI8vW3RCM= +go.opentelemetry.io/collector v0.81.0 h1:pF+sB8xNXlg/W0a0QTLz4mUWyool1a9toVj8LmLoFqg= +go.opentelemetry.io/collector v0.81.0/go.mod h1:thuOTBMusXwcTPTwLbs3zwwCOLaaQX2g+Hjf8OObc/w= +go.opentelemetry.io/collector/component v0.81.0 h1:AKsl6bss/SRrW248GFpmGiiI/4kdemW92Ai/X82CCqY= +go.opentelemetry.io/collector/component v0.81.0/go.mod h1:+m6/yPiJ7O7Oc/OLfmgUB2mrY1xoUqRj4BsoOtIVpGs= +go.opentelemetry.io/collector/config/configauth v0.81.0 h1:NIiJuIGOdblN0EIJv64R2mvGhthcYfWuvyCnjk8HRN4= +go.opentelemetry.io/collector/config/configauth v0.81.0/go.mod h1:2KscbmU+8fIzwiSU9Kku0Tf4b4A1plqFIJXR1DWSaTw= +go.opentelemetry.io/collector/config/configcompression v0.81.0 h1:Q725pvVH7tR6BP3WK7Ro3pbqMeQdZEV3KeFVHchBxCc= +go.opentelemetry.io/collector/config/configcompression v0.81.0/go.mod h1:xhHm1sEH7BTECAJo1xn64NMxeIvZGKdVGdSKUUc+YuM= +go.opentelemetry.io/collector/config/configgrpc v0.81.0 h1:Q2xEE2SGbg79j3TdHT+781eUu/2uUIyrHVJAG9bLpVk= +go.opentelemetry.io/collector/config/configgrpc v0.81.0/go.mod h1:Frq/l2Ttbvm7cFH3hkxLdhl5TCNHcH6rnkpmi8U2kLY= +go.opentelemetry.io/collector/config/confighttp v0.81.0 h1:vIdiepUT7P/WtJRdfh8mjzvSqJRVF8/vl9GWtUNQlHQ= +go.opentelemetry.io/collector/config/confighttp v0.81.0/go.mod h1:I54THsffkpv//O7bUHw+0bXxjYdvyL6IHg5ksgYez8I= +go.opentelemetry.io/collector/config/confignet v0.81.0 h1:Eu8m3eX8GaGhOUc//YXvV4i3cEivxUSxkLnV1U9ydhg= +go.opentelemetry.io/collector/config/confignet v0.81.0/go.mod h1:unOg7BZvpt6T5xsf+LyeOQvUhD8ld/2AbfOsmUZ/bPM= +go.opentelemetry.io/collector/config/configopaque v0.81.0 h1:MkCAGh0WydRWydETB9FLnuCj9hDPDiz2g4Wxnl53I0w= +go.opentelemetry.io/collector/config/configopaque v0.81.0/go.mod h1:pM1oy6gasukw3H6jAvc9Q9OtFaaY2IbfeuwCPAjOgXc= +go.opentelemetry.io/collector/config/configtelemetry v0.81.0 h1:j3dhWbAcrfL1n0RmShRJf99X/xIMoPfEShN/5Z8bY0k= +go.opentelemetry.io/collector/config/configtelemetry v0.81.0/go.mod h1:KEYQRiYJdx38iZkvcLKBZWH9fK4NeafxBwGRrRKMgyA= +go.opentelemetry.io/collector/config/configtls v0.81.0 h1:2vt+yOZUvGq5ADqFAxL5ONm1ACuGXDSs87AWT54Ez4M= +go.opentelemetry.io/collector/config/configtls v0.81.0/go.mod h1:HMHTYBMMgqBpTvnNAhQYmjO7XuoBMe2T4qRHcKluB4Q= +go.opentelemetry.io/collector/config/internal v0.81.0 h1:wRV2PBnJygdmKpIdt/xfG7zdQvXvHz9L+z8MhGsOji4= +go.opentelemetry.io/collector/config/internal v0.81.0/go.mod h1:RKcLV1gQxhgwx+6rlPYsvGMq1RZNne3UeOUZkHxJnIg= +go.opentelemetry.io/collector/confmap v0.81.0 h1:AqweoBGdF3jGM2/KgP5GS6bmN+1aVrEiCy4nPf7IBE4= +go.opentelemetry.io/collector/confmap v0.81.0/go.mod h1:iCTnTqGgZZJumhJxpY7rrJz9UQ/0zjPmsJz2Z7Tp4RY= +go.opentelemetry.io/collector/consumer v0.81.0 h1:8R2iCrSzD7T0RtC2Wh4GXxDiqla2vNhDokGW6Bcrfas= +go.opentelemetry.io/collector/consumer v0.81.0/go.mod h1:jS7+gAKdOx3lD3SnaBztBjUVpUYL3ee7fpoqI4p/gT8= +go.opentelemetry.io/collector/exporter v0.81.0 h1:GLhB8WGrBx+zZSB1HIOx2ivFUMahGtAVO2CC5xbCUHQ= +go.opentelemetry.io/collector/exporter v0.81.0/go.mod h1:Di4RTzI8uRooVNATIeApNUgmGdNt8XiikUTQLabmZaA= +go.opentelemetry.io/collector/extension v0.81.0 h1:Ak7AzZzxTFJxGyVbEklsGzqHyOHW5USiifJilCcRyTU= +go.opentelemetry.io/collector/extension v0.81.0/go.mod h1:DU2bX8qulS5+OCJZGfvqIwIT/q3sFnEjI2HjJ2LDI/s= +go.opentelemetry.io/collector/extension/auth v0.81.0 h1:UzVQSG9naJh1hX7hh+HVcvB3n+rpCJXX2BBdUoL/Ybo= +go.opentelemetry.io/collector/extension/auth v0.81.0/go.mod h1:PaBFcFrzXV+UgM4VZKp6Kn1IiRC/MbEYWxTfIalcIwk= go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 h1:C9o0mbP0MyygqFnKueVQK/v9jef6zvuttmTGlKaqhgw= go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014/go.mod h1:0mE3mDLmUrOXVoNsuvj+7dV14h/9HFl/Fy9YTLoLObo= go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 h1:iT5qH0NLmkGeIdDtnBogYDx7L58t6CaWGL378DEo2QY= go.opentelemetry.io/collector/pdata v1.0.0-rcv0014/go.mod h1:BRvDrx43kiSoUx3mr7SoA7h9B8+OY99mUK+CZSQFWW4= -go.opentelemetry.io/collector/processor v0.82.0 h1:DoqVrrnGYThu/h1sOr6E0hR1Fj5nQT4VT0ptFZcltRk= -go.opentelemetry.io/collector/processor v0.82.0/go.mod h1:B0MtfLWCYNBJ+PXf9k77M2Yn08MKItNB2vuvwhqrtt0= -go.opentelemetry.io/collector/receiver v0.82.0 h1:bc6jc8jmSgc0/C9zqTqqWOGJFVx0AJ53jiToSmQs2SE= -go.opentelemetry.io/collector/receiver v0.82.0/go.mod h1:Uh6BgcTmmrA1Bm/GpKGRY6WwQyPio4yEDsYkUo0A5Gk= -go.opentelemetry.io/collector/receiver/otlpreceiver v0.82.0 h1:LzcmQ9d7NauTVEWfPNwRwqNd/NBQDi+JU0OHWearcEA= -go.opentelemetry.io/collector/receiver/otlpreceiver v0.82.0/go.mod h1:Qt9Ha/yWaU6ni0XwFslNCBX5zZBQHcnxma/sU1s7LH4= -go.opentelemetry.io/collector/semconv v0.82.0 h1:WUeT2a+uZjI6kLvwcBaJnGvo7KSQ/9dIFRcxOQdXucc= -go.opentelemetry.io/collector/semconv v0.82.0/go.mod h1:TlYPtzvsXyHOgr5eATi43qEMqwSmIziivJB2uctKswo= +go.opentelemetry.io/collector/processor v0.81.0 h1:ypyNV5R0bnN3XGMAsH/q5eNARF5vXtFgSOK9rBWzsLc= +go.opentelemetry.io/collector/processor v0.81.0/go.mod h1:ZDwO3DVg1VUSA92g0r/o0jYk+T7r9uxgZZ3LABJbC34= +go.opentelemetry.io/collector/receiver v0.81.0 h1:0c+YtIV7fmd9ev+zmwS9qjx5ASi8cw+gSypu4I7Gugc= +go.opentelemetry.io/collector/receiver v0.81.0/go.mod h1:q80JkMxVLnk0vWxoTRY2J7F4Qx9069Yy5yxDbZ4JVwk= +go.opentelemetry.io/collector/receiver/otlpreceiver v0.81.0 h1:ewVbfATnAeQkwFK3r0dpFKCXcTb8HJKX4AixUioRt+c= +go.opentelemetry.io/collector/receiver/otlpreceiver v0.81.0/go.mod h1:LGuSMVdOq5Zq+CEHF9YBHMaOIUZrzqW7DQGqo9g0dJA= +go.opentelemetry.io/collector/semconv v0.81.0 h1:lCYNNo3powDvFIaTPP2jDKIrBiV1T92NK4QgL/aHYXw= +go.opentelemetry.io/collector/semconv v0.81.0/go.mod h1:TlYPtzvsXyHOgr5eATi43qEMqwSmIziivJB2uctKswo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.1-0.20230612162650-64be7e574a17 h1:mdcNStUIXngF/mH3xxAo4nbR4g65IXqLL1SvYMjz7JQ= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.1-0.20230612162650-64be7e574a17/go.mod h1:N2Nw/UmmvQn0yCnaUzvsWzTWIeffYIdFteg6mxqCWII= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 h1:pginetY7+onl4qN1vl0xW/V/v6OBZ0vVdH+esuJgvmM= From 54c6de17dd289d1a1932e6d57f1de67180bd119c Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 7 Aug 2023 10:49:13 -0300 Subject: [PATCH 003/434] feat(agent): wip otel backend. --- agent/backend/otel/otel.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index b56d21764..84a438e76 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -2,6 +2,7 @@ package otel import ( "context" + "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/orb-community/orb/agent/backend" "github.com/orb-community/orb/agent/config" @@ -9,11 +10,14 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" + "strings" "time" ) var _ backend.Backend = (*openTelemetryBackend)(nil) +const DefaultBinary = "/usr/local/sbin/otelcol" + type openTelemetryBackend struct { logger *zap.Logger startTime time.Time @@ -22,7 +26,7 @@ type openTelemetryBackend struct { //policies policyRepo policies.PolicyRepo - agentTags map[string]string + agentTags map[string]string // Context for controlling the context cancellation startCtx context.Context @@ -49,12 +53,18 @@ func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Policy o.policyRepo = repo var prs bool - if + if o.binaryPath, prs = configuration["binary"]; !prs { + o.binaryPath = DefaultBinary + } + } -func (o openTelemetryBackend) SetCommsClient(s string, client *mqtt.Client, s2 string) { - //TODO implement me - panic("implement me") +func (o openTelemetryBackend) SetCommsClient(agentID string, client *mqtt.Client, baseTopic string) { + o.mqttClient = client + otelBaseTopic := strings.Replace(baseTopic, "?", "otlp", 1) + o.otlpMetricsTopic = fmt.Sprintf("%s/m/%c", otelBaseTopic, agentID[0]) + o.otlpTracesTopic = fmt.Sprintf("%s/t/%c", otelBaseTopic, agentID[0]) + o.otlpLogsTopic = fmt.Sprintf("%s/l/%c", otelBaseTopic, agentID[0]) } func (o openTelemetryBackend) Version() (string, error) { From 08c1390dd8b319aa8b232139d4697f2deb03861d Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 7 Aug 2023 10:49:27 -0300 Subject: [PATCH 004/434] feat(agent): wip otel backend. --- agent/backend/otel/otel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index 84a438e76..644790578 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -56,7 +56,7 @@ func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Policy if o.binaryPath, prs = configuration["binary"]; !prs { o.binaryPath = DefaultBinary } - + return nil } func (o openTelemetryBackend) SetCommsClient(agentID string, client *mqtt.Client, baseTopic string) { From 02f596ef9b811f2759af3ea365860b2c33034ec1 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 7 Aug 2023 22:43:20 -0300 Subject: [PATCH 005/434] feat(agent): add makefile and started work to import and run executable within go, using new lib and go:embed to import binary into code. --- Makefile | 7 +++++++ agent/backend/otel/otel.go | 7 ++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 746989174..5a2760e85 100644 --- a/Makefile +++ b/Makefile @@ -25,6 +25,8 @@ CGO_ENABLED ?= 0 GOARCH ?= $(shell dpkg-architecture -q DEB_BUILD_ARCH) ORB_VERSION = $(shell cat VERSION) COMMIT_HASH = $(shell git rev-parse --short HEAD) +OTEL_COLLECTOR_CONTRIB_VERSION = 0.81.0 +OTEL_CONTRIB_URL="https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v$(OTEL_COLLECTOR_CONTRIB_VERSION)/otelcol-contrib_$(OTEL_COLLECTOR_CONTRIB_VERSION)_linux_arm64.tar.gz" define compile_service echo "ORB_VERSION: $(ORB_VERSION)" @@ -284,3 +286,8 @@ ui: -f docker/Dockerfile . platform: dockers_dev agent ui + +pull-latest-otel-collector-contrib: + wget -O ./agent/backend/otel/otelcol_contrib.tar.gz $(OTEL_CONTRIB_URL) + tar -xvf ./agent/backend/otel/otelcol_contrib.tar.gz -C ./agent/backend/otel/ + rm ./agent/backend/otel/otelcol_contrib.tar.gz diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index 644790578..1b23aeb2c 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -2,6 +2,7 @@ package otel import ( "context" + _ "embed" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/orb-community/orb/agent/backend" @@ -18,6 +19,9 @@ var _ backend.Backend = (*openTelemetryBackend)(nil) const DefaultBinary = "/usr/local/sbin/otelcol" +//go:embed otelcol-contrib +var openTelemtryContribBinary []byte + type openTelemetryBackend struct { logger *zap.Logger startTime time.Time @@ -48,7 +52,7 @@ type openTelemetryBackend struct { } // Configure initializes the backend with the given configuration -func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo, configuration map[string]string, m2 map[string]interface{}) error { +func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo, configuration map[string]string, openTelemetryConfiguration map[string]interface{}) error { o.logger = logger o.policyRepo = repo @@ -56,6 +60,7 @@ func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Policy if o.binaryPath, prs = configuration["binary"]; !prs { o.binaryPath = DefaultBinary } + return nil } From c9bfb7659c07886ad0eb246911c7f7a9faa6250b Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Tue, 8 Aug 2023 10:47:39 -0300 Subject: [PATCH 006/434] feat(agent): add gitignore to ignore the binary files in github. --- agent/backend/otel/.gitignore | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 agent/backend/otel/.gitignore diff --git a/agent/backend/otel/.gitignore b/agent/backend/otel/.gitignore new file mode 100644 index 000000000..35b22f118 --- /dev/null +++ b/agent/backend/otel/.gitignore @@ -0,0 +1,3 @@ +LICENSE +README.md +otelcol-contrib \ No newline at end of file From c6c156e5e7e9c3212c30bd5b6f75e39f02565763 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Tue, 8 Aug 2023 11:42:44 -0300 Subject: [PATCH 007/434] feat(agent): implement version method. --- agent/backend/otel/otel.go | 47 ++++++++++++++++++++++++++++++-------- go.mod | 1 + go.sum | 2 ++ 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index 1b23aeb2c..955a973ef 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -1,9 +1,11 @@ package otel import ( + "bufio" "context" _ "embed" "fmt" + "github.com/amenzhinsky/go-memexec" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/orb-community/orb/agent/backend" "github.com/orb-community/orb/agent/config" @@ -26,8 +28,6 @@ type openTelemetryBackend struct { logger *zap.Logger startTime time.Time - binaryPath string - //policies policyRepo policies.PolicyRepo agentTags map[string]string @@ -56,11 +56,6 @@ func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Policy o.logger = logger o.policyRepo = repo - var prs bool - if o.binaryPath, prs = configuration["binary"]; !prs { - o.binaryPath = DefaultBinary - } - return nil } @@ -73,8 +68,42 @@ func (o openTelemetryBackend) SetCommsClient(agentID string, client *mqtt.Client } func (o openTelemetryBackend) Version() (string, error) { - //TODO implement me - panic("implement me") + executable, err := memexec.New(openTelemtryContribBinary) + if err != nil { + return "", err + } + defer func(executable *memexec.Exec) { + err := executable.Close() + if err != nil { + o.logger.Error("error closing executable", zap.Error(err)) + } + }(executable) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + cmd := executable.CommandContext(ctx, "--version") + if cmd.Err != nil { + return "", cmd.Err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return "", err + } + var versionOutput string + go func() { + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + o.logger.Info("DEBUG", zap.String("line", scanner.Text())) + versionOutput = scanner.Text() + } + }() + if err := cmd.Start(); err != nil { + return "", err + } + if err := cmd.Wait(); err != nil { + return "", err + } + o.logger.Info("running opentelemetry-contrib version", zap.String("version", versionOutput)) + return versionOutput, nil } func (o openTelemetryBackend) Start(ctx context.Context, cancelFunc context.CancelFunc) error { diff --git a/go.mod b/go.mod index 65cdcc339..83ba0978a 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/orb-community/orb go 1.19 require ( + github.com/amenzhinsky/go-memexec v0.7.1 github.com/andybalholm/brotli v1.0.5 github.com/aws/aws-sdk-go v1.44.232 github.com/benbjohnson/immutable v0.4.3 diff --git a/go.sum b/go.sum index 5e0a196fe..4d0b0e3b3 100644 --- a/go.sum +++ b/go.sum @@ -71,6 +71,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/amenzhinsky/go-memexec v0.7.1 h1:DVm4cXzklaNWZoTJgZUi/dlXtelhC7QBtX4luKjl1qk= +github.com/amenzhinsky/go-memexec v0.7.1/go.mod h1:ApTO9/i2bcii7kvIXi74gum+/zYDzkiOXtuBZoYOKVE= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= From 763209ab57a449d47271dfb490d96e8656aedc7d Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Tue, 8 Aug 2023 15:53:58 -0300 Subject: [PATCH 008/434] feat(agent): implement add and start testing to control policies. --- agent/backend/otel/otel.go | 167 +++++++++++++++++++++++++++++++++---- 1 file changed, 152 insertions(+), 15 deletions(-) diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index 955a973ef..ab85bf31f 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" + "os" "strings" "time" ) @@ -29,16 +30,17 @@ type openTelemetryBackend struct { startTime time.Time //policies - policyRepo policies.PolicyRepo - agentTags map[string]string + policyRepo policies.PolicyRepo + policyConfigDirectory string + agentTags map[string]string // Context for controlling the context cancellation - startCtx context.Context + mainContext context.Context + runningCollectors map[string]context.Context mainCancelFunction context.CancelFunc // MQTT Config for OTEL MQTT Exporter mqttConfig config.MQTTConfig - mqttClient *mqtt.Client otlpMetricsTopic string @@ -47,14 +49,41 @@ type openTelemetryBackend struct { otelReceiverHost string otelReceiverPort int - receiver receiver.Metrics - exporter exporter.Metrics + + metricsReceiver receiver.Metrics + metricsExporter exporter.Metrics + //tracesReceiver receiver.Traces + //tracesExporter exporter.Traces + //logsReceiver receiver.Logs + //logsExporter exporter.Logs } // Configure initializes the backend with the given configuration -func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo, configuration map[string]string, openTelemetryConfiguration map[string]interface{}) error { +func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo, configuration map[string]string, otelConfig map[string]interface{}) error { o.logger = logger o.policyRepo = repo + var err error + o.policyConfigDirectory, err = os.MkdirTemp("", "otel-policies") + if err != nil { + return err + } + if agentTags, ok := otelConfig["agent_tags"]; ok { + o.agentTags = agentTags.(map[string]string) + } + for k, v := range otelConfig { + switch k { + case "Host": + o.otelReceiverHost = v.(string) + case "Port": + o.otelReceiverPort = v.(int) + } + } + o.mqttConfig = config.MQTTConfig{ + Address: "", + Id: "", + Key: "", + ChannelID: "", + } return nil } @@ -78,7 +107,7 @@ func (o openTelemetryBackend) Version() (string, error) { o.logger.Error("error closing executable", zap.Error(err)) } }(executable) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(o.mainContext, 5*time.Second) defer cancel() cmd := executable.CommandContext(ctx, "--version") if cmd.Err != nil { @@ -107,13 +136,45 @@ func (o openTelemetryBackend) Version() (string, error) { } func (o openTelemetryBackend) Start(ctx context.Context, cancelFunc context.CancelFunc) error { - //TODO implement me - panic("implement me") + // initialize otlpreceiver and mqttexporter for scraping + if o.policyRepo == nil { + return fmt.Errorf("backend not properly configured, call Configure() first") + } + o.runningCollectors = make(map[string]context.Context) + o.mainCancelFunction = cancelFunc + + currentVersion, err := o.Version() + if err != nil { + o.logger.Error("error during ") + } + o.logger.Info("starting open-telemetry backend using version", zap.String("version", currentVersion)) + + policiesData, err := o.policyRepo.GetAll() + if err != nil { + defer cancelFunc() + o.logger.Error("failed to start otel backend, policies are absent") + return err + } + for _, policyData := range policiesData { + if err := o.ApplyPolicy(policyData, true); err != nil { + o.logger.Error("failed to start otel backend, failed to apply policy", zap.Error(err)) + cancelFunc() + return err + } + o.logger.Info("policy applied successfully", zap.String("policy_id", policyData.ID)) + } + + return nil } func (o openTelemetryBackend) Stop(ctx context.Context) error { - //TODO implement me - panic("implement me") + o.logger.Info("stopping all running policies") + o.mainCancelFunction() + for policyID, policyCtx := range o.runningCollectors { + o.logger.Debug("stopping policy context", zap.String("policy_id", policyID)) + policyCtx.Done() + } + return nil } func (o openTelemetryBackend) FullReset(ctx context.Context) error { @@ -136,9 +197,85 @@ func (o openTelemetryBackend) GetRunningStatus() (backend.RunningStatus, string, panic("implement me") } -func (o openTelemetryBackend) ApplyPolicy(data policies.PolicyData, updatePolicy bool) error { - //TODO implement me - panic("implement me") +func (o openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, updatePolicy bool) error { + if !o.policyRepo.Exists(newPolicyData.ID) { + temporaryFile, err := os.CreateTemp(o.policyConfigDirectory, fmt.Sprintf("otel-%s-config.yml", newPolicyData.ID)) + if err != nil { + return err + } + err = o.addRunner(newPolicyData, temporaryFile.Name()) + if err != nil { + return err + } + } else { + currentPolicyData, err := o.policyRepo.Get(newPolicyData.ID) + if err != nil { + return err + } + if currentPolicyData.Version <= newPolicyData.Version { + dataAsByte := []byte(newPolicyData.Data.(string)) + currentPolicyPath := o.policyConfigDirectory + fmt.Sprintf("otel-%s-config.yml", currentPolicyData.ID) + o.logger.Info("new policy version received, updating", zap.String("policy_id", newPolicyData.ID), zap.Int32("version", newPolicyData.Version)) + err := os.WriteFile(currentPolicyPath, dataAsByte, os.ModeTemporary) + if err != nil { + return err + } + err = o.policyRepo.Update(newPolicyData) + if err != nil { + return err + } + } else { + o.logger.Info("current policy version is newer than the one being applied, skipping", + zap.String("policy_id", newPolicyData.ID), + zap.Int32("current_version", currentPolicyData.Version), + zap.Int32("incoming_version", newPolicyData.Version)) + } + } + + return nil +} + +func (o openTelemetryBackend) addRunner(policyData policies.PolicyData, policyFilePath string) error { + policyContext := context.WithValue(o.mainContext, "policy_id", policyData.ID) + executable, err := memexec.New(openTelemtryContribBinary) + if err != nil { + return err + } + defer func(executable *memexec.Exec) { + err := executable.Close() + if err != nil { + o.logger.Error("error closing executable", zap.Error(err)) + } + }(executable) + command := executable.CommandContext(policyContext, "--config", policyFilePath) + stderr, err := command.StderrPipe() + if err != nil { + return err + } + go func(ctx context.Context) { + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + o.logger.Info("stderr output", + zap.String("policy_id", policyData.ID), + zap.String("line", scanner.Text())) + if command.Err != nil { + o.logger.Error("error running command", zap.Error(command.Err)) + ctx.Done() + return + } + } + }(policyContext) + + return nil +} + +func (o openTelemetryBackend) addPolicyControl(policyCtx context.Context, policyID string) { + o.runningCollectors[policyID] = policyCtx +} + +func (o openTelemetryBackend) removePolicyControl(policyID string) { + o.runningCollectors[policyID].Done() + delete(o.runningCollectors, policyID) } func (o openTelemetryBackend) RemovePolicy(data policies.PolicyData) error { From 75ee97caf1773894b8ded49508266c7f188bd069 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 10 Aug 2023 16:14:31 -0300 Subject: [PATCH 009/434] feat(agent): implement add and start testing to control policies. --- agent/backend/otel/otel.go | 41 +++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index ab85bf31f..e8d764211 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -20,8 +20,6 @@ import ( var _ backend.Backend = (*openTelemetryBackend)(nil) -const DefaultBinary = "/usr/local/sbin/otelcol" - //go:embed otelcol-contrib var openTelemtryContribBinary []byte @@ -46,6 +44,7 @@ type openTelemetryBackend struct { otlpMetricsTopic string otlpTracesTopic string otlpLogsTopic string + otelReceiverTaps []string otelReceiverHost string otelReceiverPort int @@ -59,9 +58,11 @@ type openTelemetryBackend struct { } // Configure initializes the backend with the given configuration -func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo, configuration map[string]string, otelConfig map[string]interface{}) error { +func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo, + _ map[string]string, otelConfig map[string]interface{}) error { o.logger = logger o.policyRepo = repo + o.otelReceiverTaps = []string{} var err error o.policyConfigDirectory, err = os.MkdirTemp("", "otel-policies") if err != nil { @@ -142,6 +143,7 @@ func (o openTelemetryBackend) Start(ctx context.Context, cancelFunc context.Canc } o.runningCollectors = make(map[string]context.Context) o.mainCancelFunction = cancelFunc + o.startTime = time.Now() currentVersion, err := o.Version() if err != nil { @@ -183,18 +185,26 @@ func (o openTelemetryBackend) FullReset(ctx context.Context) error { } func (o openTelemetryBackend) GetStartTime() time.Time { - //TODO implement me - panic("implement me") + return o.startTime } -func (o openTelemetryBackend) GetCapabilities() (map[string]interface{}, error) { - //TODO implement me - panic("implement me") +// this will only print a default backend config +func (o openTelemetryBackend) GetCapabilities() (capabilities map[string]interface{}, err error) { + capabilities["taps"] = o.otelReceiverTaps + capabilities["version"], err = o.Version() + if err != nil { + return + } + return } +// cross reference the Processes using the os, with the policies and contexts func (o openTelemetryBackend) GetRunningStatus() (backend.RunningStatus, string, error) { - //TODO implement me - panic("implement me") + amountCollectors := len(o.runningCollectors) + if amountCollectors > 0 { + return backend.Running, fmt.Sprintf("opentelemetry backend running with %d policies", amountCollectors), nil + } + return backend.Offline, "opentelemetry backend offline, waiting for policy to come to start running", nil } func (o openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, updatePolicy bool) error { @@ -224,6 +234,7 @@ func (o openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, upd if err != nil { return err } + o.otelReceiverTaps = append(o.otelReceiverTaps, newPolicyData.ID) } else { o.logger.Info("current policy version is newer than the one being applied, skipping", zap.String("policy_id", newPolicyData.ID), @@ -279,6 +290,12 @@ func (o openTelemetryBackend) removePolicyControl(policyID string) { } func (o openTelemetryBackend) RemovePolicy(data policies.PolicyData) error { - //TODO implement me - panic("implement me") + if o.policyRepo.Exists(data.ID) { + o.removePolicyControl(data.ID) + err := o.policyRepo.Remove(data.ID) + if err != nil { + return err + } + } + return nil } From 3997dea6b01ce7ec544de942714b654d6e0ce61f Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 14 Aug 2023 10:33:27 -0300 Subject: [PATCH 010/434] feat(agent): fix behaviors and implement missing features. --- agent/{agent_prof.go => agent.go} | 2 +- agent/backend/otel/comms.go | 15 ++++ agent/backend/otel/otel.go | 124 +++++------------------------ agent/backend/otel/otel_sample.go | 44 ++++++++++ agent/backend/otel/policy.go | 104 ++++++++++++++++++++++++ agent/backend/pktvisor/pktvisor.go | 1 + 6 files changed, 183 insertions(+), 107 deletions(-) rename agent/{agent_prof.go => agent.go} (99%) create mode 100644 agent/backend/otel/comms.go create mode 100644 agent/backend/otel/otel_sample.go create mode 100644 agent/backend/otel/policy.go diff --git a/agent/agent_prof.go b/agent/agent.go similarity index 99% rename from agent/agent_prof.go rename to agent/agent.go index 6a704344f..32f692db1 100644 --- a/agent/agent_prof.go +++ b/agent/agent.go @@ -44,7 +44,7 @@ type orbAgent struct { backendState map[string]*backend.State cancelFunction context.CancelFunc rpcFromCancelFunc context.CancelFunc - // TODO: look for a better way to do this, context shouldn't be inside structs + asyncContext context.Context hbTicker *time.Ticker diff --git a/agent/backend/otel/comms.go b/agent/backend/otel/comms.go new file mode 100644 index 000000000..cde5194ef --- /dev/null +++ b/agent/backend/otel/comms.go @@ -0,0 +1,15 @@ +package otel + +import ( + "fmt" + mqtt "github.com/eclipse/paho.mqtt.golang" + "strings" +) + +func (o openTelemetryBackend) SetCommsClient(agentID string, client *mqtt.Client, baseTopic string) { + o.mqttClient = client + otelBaseTopic := strings.Replace(baseTopic, "?", "otlp", 1) + o.otlpMetricsTopic = fmt.Sprintf("%s/m/%c", otelBaseTopic, agentID[0]) + o.otlpTracesTopic = fmt.Sprintf("%s/t/%c", otelBaseTopic, agentID[0]) + o.otlpLogsTopic = fmt.Sprintf("%s/l/%c", otelBaseTopic, agentID[0]) +} diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index e8d764211..10b469d65 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -14,7 +14,6 @@ import ( "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" "os" - "strings" "time" ) @@ -89,14 +88,6 @@ func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Policy return nil } -func (o openTelemetryBackend) SetCommsClient(agentID string, client *mqtt.Client, baseTopic string) { - o.mqttClient = client - otelBaseTopic := strings.Replace(baseTopic, "?", "otlp", 1) - o.otlpMetricsTopic = fmt.Sprintf("%s/m/%c", otelBaseTopic, agentID[0]) - o.otlpTracesTopic = fmt.Sprintf("%s/t/%c", otelBaseTopic, agentID[0]) - o.otlpLogsTopic = fmt.Sprintf("%s/l/%c", otelBaseTopic, agentID[0]) -} - func (o openTelemetryBackend) Version() (string, error) { executable, err := memexec.New(openTelemtryContribBinary) if err != nil { @@ -143,6 +134,7 @@ func (o openTelemetryBackend) Start(ctx context.Context, cancelFunc context.Canc } o.runningCollectors = make(map[string]context.Context) o.mainCancelFunction = cancelFunc + o.mainContext = ctx o.startTime = time.Now() currentVersion, err := o.Version() @@ -169,7 +161,7 @@ func (o openTelemetryBackend) Start(ctx context.Context, cancelFunc context.Canc return nil } -func (o openTelemetryBackend) Stop(ctx context.Context) error { +func (o openTelemetryBackend) Stop(_ context.Context) error { o.logger.Info("stopping all running policies") o.mainCancelFunction() for policyID, policyCtx := range o.runningCollectors { @@ -179,9 +171,22 @@ func (o openTelemetryBackend) Stop(ctx context.Context) error { return nil } -func (o openTelemetryBackend) FullReset(ctx context.Context) error { - //TODO implement me - panic("implement me") +func (o openTelemetryBackend) FullReset(_ context.Context) error { + o.logger.Info("resetting all policies and restarting") + for policyID, policyCtx := range o.runningCollectors { + o.logger.Debug("stopping policy context", zap.String("policy_id", policyID)) + policyCtx.Done() + policy, err := o.policyRepo.Get(policyID) + if err != nil { + o.logger.Error("failed to get policy", zap.Error(err)) + return err + } + err = o.ApplyPolicy(policy, true) + if err != nil { + return err + } + } + return nil } func (o openTelemetryBackend) GetStartTime() time.Time { @@ -206,96 +211,3 @@ func (o openTelemetryBackend) GetRunningStatus() (backend.RunningStatus, string, } return backend.Offline, "opentelemetry backend offline, waiting for policy to come to start running", nil } - -func (o openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, updatePolicy bool) error { - if !o.policyRepo.Exists(newPolicyData.ID) { - temporaryFile, err := os.CreateTemp(o.policyConfigDirectory, fmt.Sprintf("otel-%s-config.yml", newPolicyData.ID)) - if err != nil { - return err - } - err = o.addRunner(newPolicyData, temporaryFile.Name()) - if err != nil { - return err - } - } else { - currentPolicyData, err := o.policyRepo.Get(newPolicyData.ID) - if err != nil { - return err - } - if currentPolicyData.Version <= newPolicyData.Version { - dataAsByte := []byte(newPolicyData.Data.(string)) - currentPolicyPath := o.policyConfigDirectory + fmt.Sprintf("otel-%s-config.yml", currentPolicyData.ID) - o.logger.Info("new policy version received, updating", zap.String("policy_id", newPolicyData.ID), zap.Int32("version", newPolicyData.Version)) - err := os.WriteFile(currentPolicyPath, dataAsByte, os.ModeTemporary) - if err != nil { - return err - } - err = o.policyRepo.Update(newPolicyData) - if err != nil { - return err - } - o.otelReceiverTaps = append(o.otelReceiverTaps, newPolicyData.ID) - } else { - o.logger.Info("current policy version is newer than the one being applied, skipping", - zap.String("policy_id", newPolicyData.ID), - zap.Int32("current_version", currentPolicyData.Version), - zap.Int32("incoming_version", newPolicyData.Version)) - } - } - - return nil -} - -func (o openTelemetryBackend) addRunner(policyData policies.PolicyData, policyFilePath string) error { - policyContext := context.WithValue(o.mainContext, "policy_id", policyData.ID) - executable, err := memexec.New(openTelemtryContribBinary) - if err != nil { - return err - } - defer func(executable *memexec.Exec) { - err := executable.Close() - if err != nil { - o.logger.Error("error closing executable", zap.Error(err)) - } - }(executable) - command := executable.CommandContext(policyContext, "--config", policyFilePath) - stderr, err := command.StderrPipe() - if err != nil { - return err - } - go func(ctx context.Context) { - scanner := bufio.NewScanner(stderr) - for scanner.Scan() { - o.logger.Info("stderr output", - zap.String("policy_id", policyData.ID), - zap.String("line", scanner.Text())) - if command.Err != nil { - o.logger.Error("error running command", zap.Error(command.Err)) - ctx.Done() - return - } - } - }(policyContext) - - return nil -} - -func (o openTelemetryBackend) addPolicyControl(policyCtx context.Context, policyID string) { - o.runningCollectors[policyID] = policyCtx -} - -func (o openTelemetryBackend) removePolicyControl(policyID string) { - o.runningCollectors[policyID].Done() - delete(o.runningCollectors, policyID) -} - -func (o openTelemetryBackend) RemovePolicy(data policies.PolicyData) error { - if o.policyRepo.Exists(data.ID) { - o.removePolicyControl(data.ID) - err := o.policyRepo.Remove(data.ID) - if err != nil { - return err - } - } - return nil -} diff --git a/agent/backend/otel/otel_sample.go b/agent/backend/otel/otel_sample.go new file mode 100644 index 000000000..abf9c572d --- /dev/null +++ b/agent/backend/otel/otel_sample.go @@ -0,0 +1,44 @@ +package otel + +import ( + "github.com/orb-community/orb/agent/policies" + "time" +) + +var policyData = ` +receivers: + httpcheck: + targets: + - endpoint: http://localhost:8000/health + method: GET + - endpoint: http://localhost:8000/health + method: GET + collection_interval: 10s + +exporters: + otlphttp: + endpoint: http://localhost: + +processors: # this collection value may be supported (tbd) + +extensions: # tbd + +service: # tbd + metrics: + exporters: +` + +var policy = policies.PolicyData{ + ID: "default", + Datasets: nil, + GroupIds: nil, + Name: "opentelemetry-default", + Backend: "otel", + Version: 0, + Data: nil, + State: 0, + BackendErr: "", + LastScrapeBytes: 0, + LastScrapeTS: time.Time{}, + PreviousPolicyData: nil, +} diff --git a/agent/backend/otel/policy.go b/agent/backend/otel/policy.go new file mode 100644 index 000000000..05175ce38 --- /dev/null +++ b/agent/backend/otel/policy.go @@ -0,0 +1,104 @@ +package otel + +import ( + "bufio" + "context" + "fmt" + "github.com/amenzhinsky/go-memexec" + "github.com/orb-community/orb/agent/policies" + "go.uber.org/zap" + "os" +) + +func (o openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, updatePolicy bool) error { + if !o.policyRepo.Exists(newPolicyData.ID) { + temporaryFile, err := os.CreateTemp(o.policyConfigDirectory, fmt.Sprintf("otel-%s-config.yml", newPolicyData.ID)) + if err != nil { + return err + } + err = o.addRunner(newPolicyData, temporaryFile.Name()) + if err != nil { + return err + } + } else { + currentPolicyData, err := o.policyRepo.Get(newPolicyData.ID) + if err != nil { + return err + } + if currentPolicyData.Version <= newPolicyData.Version { + dataAsByte := []byte(newPolicyData.Data.(string)) + currentPolicyPath := o.policyConfigDirectory + fmt.Sprintf("otel-%s-config.yml", currentPolicyData.ID) + o.logger.Info("new policy version received, updating", zap.String("policy_id", newPolicyData.ID), zap.Int32("version", newPolicyData.Version)) + err := os.WriteFile(currentPolicyPath, dataAsByte, os.ModeTemporary) + if err != nil { + return err + } + err = o.policyRepo.Update(newPolicyData) + if err != nil { + return err + } + o.otelReceiverTaps = append(o.otelReceiverTaps, newPolicyData.ID) + } else { + o.logger.Info("current policy version is newer than the one being applied, skipping", + zap.String("policy_id", newPolicyData.ID), + zap.Int32("current_version", currentPolicyData.Version), + zap.Int32("incoming_version", newPolicyData.Version)) + } + } + + return nil +} + +func (o openTelemetryBackend) addRunner(policyData policies.PolicyData, policyFilePath string) error { + policyContext := context.WithValue(o.mainContext, "policy_id", policyData.ID) + executable, err := memexec.New(openTelemtryContribBinary) + if err != nil { + return err + } + defer func(executable *memexec.Exec) { + err := executable.Close() + if err != nil { + o.logger.Error("error closing executable", zap.Error(err)) + } + }(executable) + command := executable.CommandContext(policyContext, "--config", policyFilePath) + stderr, err := command.StderrPipe() + if err != nil { + return err + } + go func(ctx context.Context) { + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + o.logger.Info("stderr output", + zap.String("policy_id", policyData.ID), + zap.String("line", scanner.Text())) + if command.Err != nil { + o.logger.Error("error running command", zap.Error(command.Err)) + ctx.Done() + return + } + } + }(policyContext) + + return nil +} + +func (o openTelemetryBackend) addPolicyControl(policyCtx context.Context, policyID string) { + o.runningCollectors[policyID] = policyCtx +} + +func (o openTelemetryBackend) removePolicyControl(policyID string) { + o.runningCollectors[policyID].Done() + delete(o.runningCollectors, policyID) +} + +func (o openTelemetryBackend) RemovePolicy(data policies.PolicyData) error { + if o.policyRepo.Exists(data.ID) { + o.removePolicyControl(data.ID) + err := o.policyRepo.Remove(data.ID) + if err != nil { + return err + } + } + return nil +} diff --git a/agent/backend/pktvisor/pktvisor.go b/agent/backend/pktvisor/pktvisor.go index 0e8b70894..4bfdd9e69 100644 --- a/agent/backend/pktvisor/pktvisor.go +++ b/agent/backend/pktvisor/pktvisor.go @@ -316,6 +316,7 @@ func (p *pktvisorBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo p.otelReceiverPort = v.(int) } } + p.logger.Info("configured otel receiver host", zap.String("host", p.otelReceiverHost), zap.Int("port", p.otelReceiverPort)) return nil } From ec49822e60622cfbb06c888113004b04ef114873 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 14 Aug 2023 10:37:47 -0300 Subject: [PATCH 011/434] feat(agent): add otel registered backend. --- agent/backend/otel/otel.go | 5 +++++ cmd/agent/main.go | 2 ++ 2 files changed, 7 insertions(+) diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index 10b469d65..3695e4f8b 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -189,6 +189,11 @@ func (o openTelemetryBackend) FullReset(_ context.Context) error { return nil } +func Register() bool { + backend.Register("otel", &openTelemetryBackend{}) + return true +} + func (o openTelemetryBackend) GetStartTime() time.Time { return o.startTime } diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 416c42b22..39a4a0304 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -7,6 +7,7 @@ package main import ( "context" "fmt" + "github.com/orb-community/orb/agent/backend/otel" "os" "os/signal" "strings" @@ -34,6 +35,7 @@ var ( func init() { pktvisor.Register() + otel.Register() diode.Register() } From aaad3b46d91eb9f982a8e94fe11764729e45f872 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 14 Aug 2023 14:58:12 -0300 Subject: [PATCH 012/434] feat(agent): fix configuration pass-through. --- agent/agent.go | 7 ++++++ agent/backend/otel/otel.go | 8 +++---- agent/backend/otel/otel_sample.go | 2 +- agent/backend/otel/policy.go | 8 ++++++- cmd/agent/main.go | 26 ++++++++++----------- cmd/agent/otel.yaml | 38 +++++++++++++++++++++++++++++++ 6 files changed, 70 insertions(+), 19 deletions(-) create mode 100644 cmd/agent/otel.yaml diff --git a/agent/agent.go b/agent/agent.go index 32f692db1..1dda48395 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -91,6 +91,11 @@ func New(logger *zap.Logger, c config.Config) (Agent, error) { pm, err := manager.New(logger, c, db) if err != nil { + logger.Error("error during create policy manager, exiting", zap.Error(err)) + return nil, err + } + if pm.GetRepo() == nil { + logger.Error("policy manager failed to get repository", zap.Error(err)) return nil, err } return &orbAgent{logger: logger, config: c, policyManager: pm, db: db, groupsInfos: make(map[string]GroupInfo)}, nil @@ -112,6 +117,7 @@ func (a *orbAgent) startBackends(agentCtx context.Context) error { configuration := structs.Map(a.config.OrbAgent.Otel) configuration["agent_tags"] = a.config.OrbAgent.Tags if err := be.Configure(a.logger, a.policyManager.GetRepo(), configurationEntry, configuration); err != nil { + a.logger.Info("failed to configure backend", zap.String("backend", name), zap.Error(err)) return err } backendCtx := context.WithValue(agentCtx, "routine", name) @@ -121,6 +127,7 @@ func (a *orbAgent) startBackends(agentCtx context.Context) error { backendCtx = context.WithValue(backendCtx, "agent_id", "auto-provisioning-without-id") } if err := be.Start(context.WithCancel(backendCtx)); err != nil { + a.logger.Info("failed to start backend", zap.String("backend", name), zap.Error(err)) return err } a.backends[name] = be diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index 3695e4f8b..01d9d861c 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -20,7 +20,7 @@ import ( var _ backend.Backend = (*openTelemetryBackend)(nil) //go:embed otelcol-contrib -var openTelemtryContribBinary []byte +var openTelemetryContribBinary []byte type openTelemetryBackend struct { logger *zap.Logger @@ -58,7 +58,7 @@ type openTelemetryBackend struct { // Configure initializes the backend with the given configuration func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo, - _ map[string]string, otelConfig map[string]interface{}) error { + configuration map[string]string, otelConfig map[string]interface{}) error { o.logger = logger o.policyRepo = repo o.otelReceiverTaps = []string{} @@ -79,7 +79,7 @@ func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Policy } } o.mqttConfig = config.MQTTConfig{ - Address: "", + Address: configuration["cloud"], Id: "", Key: "", ChannelID: "", @@ -89,7 +89,7 @@ func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Policy } func (o openTelemetryBackend) Version() (string, error) { - executable, err := memexec.New(openTelemtryContribBinary) + executable, err := memexec.New(openTelemetryContribBinary) if err != nil { return "", err } diff --git a/agent/backend/otel/otel_sample.go b/agent/backend/otel/otel_sample.go index abf9c572d..3517ebdc9 100644 --- a/agent/backend/otel/otel_sample.go +++ b/agent/backend/otel/otel_sample.go @@ -17,7 +17,7 @@ receivers: exporters: otlphttp: - endpoint: http://localhost: + endpoint: http://localhost:0 processors: # this collection value may be supported (tbd) diff --git a/agent/backend/otel/policy.go b/agent/backend/otel/policy.go index 05175ce38..340474915 100644 --- a/agent/backend/otel/policy.go +++ b/agent/backend/otel/policy.go @@ -51,7 +51,7 @@ func (o openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, upd func (o openTelemetryBackend) addRunner(policyData policies.PolicyData, policyFilePath string) error { policyContext := context.WithValue(o.mainContext, "policy_id", policyData.ID) - executable, err := memexec.New(openTelemtryContribBinary) + executable, err := memexec.New(openTelemetryContribBinary) if err != nil { return err } @@ -67,6 +67,12 @@ func (o openTelemetryBackend) addRunner(policyData policies.PolicyData, policyFi return err } go func(ctx context.Context) { + err := command.Start() + if err != nil { + o.logger.Error("error starting command", zap.Error(err)) + ctx.Done() + return + } scanner := bufio.NewScanner(stderr) for scanner.Scan() { o.logger.Info("stderr output", diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 39a4a0304..d34105cea 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -49,21 +49,21 @@ func Run(cmd *cobra.Command, args []string) { initConfig() // configuration - var config config.Config - err := viper.Unmarshal(&config) + var configData config.Config + err := viper.Unmarshal(&configData) if err != nil { - cobra.CheckErr(fmt.Errorf("agent start up error (config): %w", err)) + cobra.CheckErr(fmt.Errorf("agent start up error (configData): %w", err)) os.Exit(1) } // include pktvisor backend by default if binary is at default location _, err = os.Stat(pktvisor.DefaultBinary) - if err == nil && config.OrbAgent.Backends == nil { - config.OrbAgent.Backends = make(map[string]map[string]string) - config.OrbAgent.Backends["pktvisor"] = make(map[string]string) - config.OrbAgent.Backends["pktvisor"]["binary"] = pktvisor.DefaultBinary + if err == nil && configData.OrbAgent.Backends == nil { + configData.OrbAgent.Backends = make(map[string]map[string]string) + configData.OrbAgent.Backends["pktvisor"] = make(map[string]string) + configData.OrbAgent.Backends["pktvisor"]["binary"] = pktvisor.DefaultBinary if len(cfgFiles) > 0 { - config.OrbAgent.Backends["pktvisor"]["config_file"] = cfgFiles[0] + configData.OrbAgent.Backends["pktvisor"]["config_file"] = cfgFiles[0] } } @@ -88,7 +88,7 @@ func Run(cmd *cobra.Command, args []string) { }(logger) // new agent - a, err := agent.New(logger, config) + a, err := agent.New(logger, configData) if err != nil { logger.Error("agent start up error", zap.Error(err)) os.Exit(1) @@ -151,10 +151,10 @@ func mergeOrError(path string) { v.SetDefault("orb.otel.port", 0) v.SetDefault("orb.debug.enable", Debug) - v.SetDefault("orb.backends.pktvisor.binary", "/usr/local/sbin/pktvisord") - v.SetDefault("orb.backends.pktvisor.config_file", "/opt/orb/agent.yaml") - v.SetDefault("orb.backends.pktvisor.api_host", "localhost") - v.SetDefault("orb.backends.pktvisor.api_port", "10853") + //v.SetDefault("orb.backends.pktvisor.binary", "/usr/local/sbin/pktvisord") + //v.SetDefault("orb.backends.pktvisor.config_file", "/opt/orb/agent.yaml") + //v.SetDefault("orb.backends.pktvisor.api_host", "localhost") + //v.SetDefault("orb.backends.pktvisor.api_port", "10853") if len(path) > 0 { cobra.CheckErr(v.ReadInConfig()) diff --git a/cmd/agent/otel.yaml b/cmd/agent/otel.yaml new file mode 100644 index 000000000..2402782ba --- /dev/null +++ b/cmd/agent/otel.yaml @@ -0,0 +1,38 @@ +version: "1.0" + +# this section is used by pktvisor +# see https://github.com/orb-community/pktvisor/blob/develop/RFCs/2021-04-16-75-taps.md +visor: + taps: + default_pcap: + input_type: pcap + config: + iface: "auto" + +# this section is used orb-agent +# most sections and keys are optional +orb: + # these are arbitrary key value pairs used for dynamically define a group of agents by matching against agent group tags + tags: + region: LA + cloud: + config: + # optionally specify an agent name to use during auto provisioning + # hostname will be used if it's not specified here + agent_name: dev-otel-1 + auto_provision: false + api: + address: https://kubernetes.docker.internal + mqtt: + address: tls://kubernetes.docker.internal:8883 + id: "7cdfe20e-9a58-42ab-9fc0-e6b004a96aed" + key: "f29ebb11-7784-43cf-82d2-bc6192551d77" + channel_id: "dd0a1803-4109-4496-807c-bf624d5350c0" + tls: + verify: false + backends: + otel: + config_file: "/opt/orb/agent.yaml" + otel: + enable: true + From 489d9b9d9f0e11518d11e2774738d6d2fd13e3ef Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 14 Aug 2023 15:12:07 -0300 Subject: [PATCH 013/434] feat(agent): fix configuration pass-through. --- agent/backend/otel/otel.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index 01d9d861c..bc022d58a 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -61,8 +61,13 @@ func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Policy configuration map[string]string, otelConfig map[string]interface{}) error { o.logger = logger o.policyRepo = repo + pols, err := repo.GetAll() + if err != nil { + o.logger.Error("error getting policies", zap.Error(err)) + return err + } + o.logger.Info("configuring OpenTelemetry backend", zap.Any("policies", pols)) o.otelReceiverTaps = []string{} - var err error o.policyConfigDirectory, err = os.MkdirTemp("", "otel-policies") if err != nil { return err From 218206f522966468bca6cdaeed8ca031f8149533 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 14 Aug 2023 16:06:44 -0300 Subject: [PATCH 014/434] feat(agent): add sample policy. --- agent/backend/otel/otel.go | 28 +++++++++++----------------- agent/backend/otel/otel_sample.go | 9 +++++---- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index bc022d58a..1bf8e3463 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -61,12 +61,7 @@ func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Policy configuration map[string]string, otelConfig map[string]interface{}) error { o.logger = logger o.policyRepo = repo - pols, err := repo.GetAll() - if err != nil { - o.logger.Error("error getting policies", zap.Error(err)) - return err - } - o.logger.Info("configuring OpenTelemetry backend", zap.Any("policies", pols)) + var err error o.otelReceiverTaps = []string{} o.policyConfigDirectory, err = os.MkdirTemp("", "otel-policies") if err != nil { @@ -83,12 +78,6 @@ func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Policy o.otelReceiverPort = v.(int) } } - o.mqttConfig = config.MQTTConfig{ - Address: configuration["cloud"], - Id: "", - Key: "", - ChannelID: "", - } return nil } @@ -133,15 +122,20 @@ func (o openTelemetryBackend) Version() (string, error) { } func (o openTelemetryBackend) Start(ctx context.Context, cancelFunc context.CancelFunc) error { - // initialize otlpreceiver and mqttexporter for scraping - if o.policyRepo == nil { - return fmt.Errorf("backend not properly configured, call Configure() first") - } o.runningCollectors = make(map[string]context.Context) o.mainCancelFunction = cancelFunc o.mainContext = ctx o.startTime = time.Now() - + // apply sample policy - remove after POC + err := o.ApplyPolicy(samplePolicy, false) + if err != nil { + o.logger.Error("error updating policies", zap.Error(err)) + return err + } + // initialize otlpreceiver and mqttexporter for scraping + if o.policyRepo == nil { + return fmt.Errorf("backend not properly configured, call Configure() first") + } currentVersion, err := o.Version() if err != nil { o.logger.Error("error during ") diff --git a/agent/backend/otel/otel_sample.go b/agent/backend/otel/otel_sample.go index 3517ebdc9..7276be8df 100644 --- a/agent/backend/otel/otel_sample.go +++ b/agent/backend/otel/otel_sample.go @@ -5,7 +5,7 @@ import ( "time" ) -var policyData = ` +var samplePolicyData = ` receivers: httpcheck: targets: @@ -25,17 +25,18 @@ extensions: # tbd service: # tbd metrics: - exporters: + exporters: [otlphttp] + receivers: [httpcheck] ` -var policy = policies.PolicyData{ +var samplePolicy = policies.PolicyData{ ID: "default", Datasets: nil, GroupIds: nil, Name: "opentelemetry-default", Backend: "otel", Version: 0, - Data: nil, + Data: samplePolicyData, State: 0, BackendErr: "", LastScrapeBytes: 0, From c0c90f0c73b05d8ec9b38248a0ebbaf1bceff9b0 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Mon, 14 Aug 2023 16:29:31 -0300 Subject: [PATCH 015/434] feat(agent): fix apply policy behaviour. --- agent/backend/otel/policy.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/agent/backend/otel/policy.go b/agent/backend/otel/policy.go index 340474915..0575ba291 100644 --- a/agent/backend/otel/policy.go +++ b/agent/backend/otel/policy.go @@ -11,7 +11,7 @@ import ( ) func (o openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, updatePolicy bool) error { - if !o.policyRepo.Exists(newPolicyData.ID) { + if !updatePolicy || !o.policyRepo.Exists(newPolicyData.ID) { temporaryFile, err := os.CreateTemp(o.policyConfigDirectory, fmt.Sprintf("otel-%s-config.yml", newPolicyData.ID)) if err != nil { return err @@ -85,6 +85,7 @@ func (o openTelemetryBackend) addRunner(policyData policies.PolicyData, policyFi } } }(policyContext) + o.addPolicyControl(policyContext, policyData.ID) return nil } From 96ed0c792d8baf7be3fa522c0afa5ce4e5c3b87a Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Tue, 15 Aug 2023 13:52:19 -0300 Subject: [PATCH 016/434] feat(agent): add default configurations to inside pktvisor code, removing from the cmd/main.go. --- agent/backend/pktvisor/pktvisor.go | 12 ++++++++---- cmd/agent/main.go | 5 ----- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/agent/backend/pktvisor/pktvisor.go b/agent/backend/pktvisor/pktvisor.go index 4bfdd9e69..5ff774f56 100644 --- a/agent/backend/pktvisor/pktvisor.go +++ b/agent/backend/pktvisor/pktvisor.go @@ -37,6 +37,9 @@ const ( VersionTimeout = 2 ScrapeTimeout = 5 TapsTimeout = 5 + DefaultConfigPath = "/opt/orb/agent.yaml" + DefaultAPIHost = "localhost" + DefaultAPIPort = "10853" ) // AppInfo represents server application information @@ -282,22 +285,23 @@ func (p *pktvisorBackend) Stop(ctx context.Context) error { return nil } +// Configure this will set configurations, but if not set, will use the following defaults func (p *pktvisorBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo, config map[string]string, otelConfig map[string]interface{}) error { p.logger = logger p.policyRepo = repo var prs bool if p.binary, prs = config["binary"]; !prs { - return errors.New("you must specify pktvisor binary") + p.binary = DefaultBinary } if p.configFile, prs = config["config_file"]; !prs { - p.configFile = "" + p.configFile = DefaultConfigPath } if p.adminAPIHost, prs = config["api_host"]; !prs { - return errors.New("you must specify pktvisor admin API host") + p.adminAPIHost = DefaultAPIHost } if p.adminAPIPort, prs = config["api_port"]; !prs { - return errors.New("you must specify pktvisor admin API port") + p.adminAPIPort = DefaultAPIPort } if agentTags, ok := otelConfig["agent_tags"]; ok { p.agentTags = agentTags.(map[string]string) diff --git a/cmd/agent/main.go b/cmd/agent/main.go index d34105cea..f73a56522 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -151,11 +151,6 @@ func mergeOrError(path string) { v.SetDefault("orb.otel.port", 0) v.SetDefault("orb.debug.enable", Debug) - //v.SetDefault("orb.backends.pktvisor.binary", "/usr/local/sbin/pktvisord") - //v.SetDefault("orb.backends.pktvisor.config_file", "/opt/orb/agent.yaml") - //v.SetDefault("orb.backends.pktvisor.api_host", "localhost") - //v.SetDefault("orb.backends.pktvisor.api_port", "10853") - if len(path) > 0 { cobra.CheckErr(v.ReadInConfig()) } From aa7482dcdfb9f449fdfbca8d063d671f099e5c73 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Tue, 15 Aug 2023 15:11:33 -0300 Subject: [PATCH 017/434] feat(agent): wip try to fix startup. --- agent/backend/otel/otel.go | 8 +++----- agent/backend/otel/policy.go | 10 +++++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index 1bf8e3463..a269283b7 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -58,13 +58,15 @@ type openTelemetryBackend struct { // Configure initializes the backend with the given configuration func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo, - configuration map[string]string, otelConfig map[string]interface{}) error { + _ map[string]string, otelConfig map[string]interface{}) error { o.logger = logger + o.logger.Info("configuring OpenTelemetry backend") o.policyRepo = repo var err error o.otelReceiverTaps = []string{} o.policyConfigDirectory, err = os.MkdirTemp("", "otel-policies") if err != nil { + o.logger.Error("failed to create temporary directory for policy configs", zap.Error(err)) return err } if agentTags, ok := otelConfig["agent_tags"]; ok { @@ -132,10 +134,6 @@ func (o openTelemetryBackend) Start(ctx context.Context, cancelFunc context.Canc o.logger.Error("error updating policies", zap.Error(err)) return err } - // initialize otlpreceiver and mqttexporter for scraping - if o.policyRepo == nil { - return fmt.Errorf("backend not properly configured, call Configure() first") - } currentVersion, err := o.Version() if err != nil { o.logger.Error("error during ") diff --git a/agent/backend/otel/policy.go b/agent/backend/otel/policy.go index 0575ba291..d63fc50d1 100644 --- a/agent/backend/otel/policy.go +++ b/agent/backend/otel/policy.go @@ -66,25 +66,25 @@ func (o openTelemetryBackend) addRunner(policyData policies.PolicyData, policyFi if err != nil { return err } - go func(ctx context.Context) { + go func(ctx context.Context, logger *zap.Logger) { err := command.Start() if err != nil { - o.logger.Error("error starting command", zap.Error(err)) + fmt.Printf("error starting command: %s", err) ctx.Done() return } scanner := bufio.NewScanner(stderr) for scanner.Scan() { - o.logger.Info("stderr output", + logger.Info("stderr output", zap.String("policy_id", policyData.ID), zap.String("line", scanner.Text())) if command.Err != nil { - o.logger.Error("error running command", zap.Error(command.Err)) + logger.Error("error running command", zap.Error(command.Err)) ctx.Done() return } } - }(policyContext) + }(policyContext, o.logger) o.addPolicyControl(policyContext, policyData.ID) return nil From 2567e651f0454f33456c1e1d00d9365d2bcaeb83 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Wed, 16 Aug 2023 17:38:06 -0300 Subject: [PATCH 018/434] feat(agent): wip try to fix startup. --- agent/agent.go | 2 ++ agent/backend/otel/comms.go | 2 +- agent/backend/otel/otel.go | 17 +++++++++-------- agent/backend/otel/policy.go | 10 +++++----- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 1dda48395..15a554289 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -116,6 +116,7 @@ func (a *orbAgent) startBackends(agentCtx context.Context) error { be := backend.GetBackend(name) configuration := structs.Map(a.config.OrbAgent.Otel) configuration["agent_tags"] = a.config.OrbAgent.Tags + a.logger.Info("DEBUG pointer", zap.Reflect("be", be)) if err := be.Configure(a.logger, a.policyManager.GetRepo(), configurationEntry, configuration); err != nil { a.logger.Info("failed to configure backend", zap.String("backend", name), zap.Error(err)) return err @@ -126,6 +127,7 @@ func (a *orbAgent) startBackends(agentCtx context.Context) error { } else { backendCtx = context.WithValue(backendCtx, "agent_id", "auto-provisioning-without-id") } + a.logger.Info("DEBUG pointer", zap.Reflect("be", be)) if err := be.Start(context.WithCancel(backendCtx)); err != nil { a.logger.Info("failed to start backend", zap.String("backend", name), zap.Error(err)) return err diff --git a/agent/backend/otel/comms.go b/agent/backend/otel/comms.go index cde5194ef..33a5f397b 100644 --- a/agent/backend/otel/comms.go +++ b/agent/backend/otel/comms.go @@ -6,7 +6,7 @@ import ( "strings" ) -func (o openTelemetryBackend) SetCommsClient(agentID string, client *mqtt.Client, baseTopic string) { +func (o *openTelemetryBackend) SetCommsClient(agentID string, client *mqtt.Client, baseTopic string) { o.mqttClient = client otelBaseTopic := strings.Replace(baseTopic, "?", "otlp", 1) o.otlpMetricsTopic = fmt.Sprintf("%s/m/%c", otelBaseTopic, agentID[0]) diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index a269283b7..d8d5a5d6d 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -57,10 +57,11 @@ type openTelemetryBackend struct { } // Configure initializes the backend with the given configuration -func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo, +func (o *openTelemetryBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo, _ map[string]string, otelConfig map[string]interface{}) error { o.logger = logger o.logger.Info("configuring OpenTelemetry backend") + logger.Info("debug pointer", zap.Reflect("pointer", &o)) o.policyRepo = repo var err error o.otelReceiverTaps = []string{} @@ -84,7 +85,7 @@ func (o openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Policy return nil } -func (o openTelemetryBackend) Version() (string, error) { +func (o *openTelemetryBackend) Version() (string, error) { executable, err := memexec.New(openTelemetryContribBinary) if err != nil { return "", err @@ -123,7 +124,7 @@ func (o openTelemetryBackend) Version() (string, error) { return versionOutput, nil } -func (o openTelemetryBackend) Start(ctx context.Context, cancelFunc context.CancelFunc) error { +func (o *openTelemetryBackend) Start(ctx context.Context, cancelFunc context.CancelFunc) error { o.runningCollectors = make(map[string]context.Context) o.mainCancelFunction = cancelFunc o.mainContext = ctx @@ -158,7 +159,7 @@ func (o openTelemetryBackend) Start(ctx context.Context, cancelFunc context.Canc return nil } -func (o openTelemetryBackend) Stop(_ context.Context) error { +func (o *openTelemetryBackend) Stop(_ context.Context) error { o.logger.Info("stopping all running policies") o.mainCancelFunction() for policyID, policyCtx := range o.runningCollectors { @@ -168,7 +169,7 @@ func (o openTelemetryBackend) Stop(_ context.Context) error { return nil } -func (o openTelemetryBackend) FullReset(_ context.Context) error { +func (o *openTelemetryBackend) FullReset(_ context.Context) error { o.logger.Info("resetting all policies and restarting") for policyID, policyCtx := range o.runningCollectors { o.logger.Debug("stopping policy context", zap.String("policy_id", policyID)) @@ -191,12 +192,12 @@ func Register() bool { return true } -func (o openTelemetryBackend) GetStartTime() time.Time { +func (o *openTelemetryBackend) GetStartTime() time.Time { return o.startTime } // this will only print a default backend config -func (o openTelemetryBackend) GetCapabilities() (capabilities map[string]interface{}, err error) { +func (o *openTelemetryBackend) GetCapabilities() (capabilities map[string]interface{}, err error) { capabilities["taps"] = o.otelReceiverTaps capabilities["version"], err = o.Version() if err != nil { @@ -206,7 +207,7 @@ func (o openTelemetryBackend) GetCapabilities() (capabilities map[string]interfa } // cross reference the Processes using the os, with the policies and contexts -func (o openTelemetryBackend) GetRunningStatus() (backend.RunningStatus, string, error) { +func (o *openTelemetryBackend) GetRunningStatus() (backend.RunningStatus, string, error) { amountCollectors := len(o.runningCollectors) if amountCollectors > 0 { return backend.Running, fmt.Sprintf("opentelemetry backend running with %d policies", amountCollectors), nil diff --git a/agent/backend/otel/policy.go b/agent/backend/otel/policy.go index d63fc50d1..6b4a6af3e 100644 --- a/agent/backend/otel/policy.go +++ b/agent/backend/otel/policy.go @@ -10,7 +10,7 @@ import ( "os" ) -func (o openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, updatePolicy bool) error { +func (o *openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, updatePolicy bool) error { if !updatePolicy || !o.policyRepo.Exists(newPolicyData.ID) { temporaryFile, err := os.CreateTemp(o.policyConfigDirectory, fmt.Sprintf("otel-%s-config.yml", newPolicyData.ID)) if err != nil { @@ -49,7 +49,7 @@ func (o openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, upd return nil } -func (o openTelemetryBackend) addRunner(policyData policies.PolicyData, policyFilePath string) error { +func (o *openTelemetryBackend) addRunner(policyData policies.PolicyData, policyFilePath string) error { policyContext := context.WithValue(o.mainContext, "policy_id", policyData.ID) executable, err := memexec.New(openTelemetryContribBinary) if err != nil { @@ -90,16 +90,16 @@ func (o openTelemetryBackend) addRunner(policyData policies.PolicyData, policyFi return nil } -func (o openTelemetryBackend) addPolicyControl(policyCtx context.Context, policyID string) { +func (o *openTelemetryBackend) addPolicyControl(policyCtx context.Context, policyID string) { o.runningCollectors[policyID] = policyCtx } -func (o openTelemetryBackend) removePolicyControl(policyID string) { +func (o *openTelemetryBackend) removePolicyControl(policyID string) { o.runningCollectors[policyID].Done() delete(o.runningCollectors, policyID) } -func (o openTelemetryBackend) RemovePolicy(data policies.PolicyData) error { +func (o *openTelemetryBackend) RemovePolicy(data policies.PolicyData) error { if o.policyRepo.Exists(data.ID) { o.removePolicyControl(data.ID) err := o.policyRepo.Remove(data.ID) From db02c744dfc52cfc6358e9784b516559de7f16ba Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Wed, 16 Aug 2023 17:39:25 -0300 Subject: [PATCH 019/434] feat(agent): wip try to fix startup. --- agent/agent.go | 2 -- agent/backend/otel/otel.go | 1 - 2 files changed, 3 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 15a554289..1dda48395 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -116,7 +116,6 @@ func (a *orbAgent) startBackends(agentCtx context.Context) error { be := backend.GetBackend(name) configuration := structs.Map(a.config.OrbAgent.Otel) configuration["agent_tags"] = a.config.OrbAgent.Tags - a.logger.Info("DEBUG pointer", zap.Reflect("be", be)) if err := be.Configure(a.logger, a.policyManager.GetRepo(), configurationEntry, configuration); err != nil { a.logger.Info("failed to configure backend", zap.String("backend", name), zap.Error(err)) return err @@ -127,7 +126,6 @@ func (a *orbAgent) startBackends(agentCtx context.Context) error { } else { backendCtx = context.WithValue(backendCtx, "agent_id", "auto-provisioning-without-id") } - a.logger.Info("DEBUG pointer", zap.Reflect("be", be)) if err := be.Start(context.WithCancel(backendCtx)); err != nil { a.logger.Info("failed to start backend", zap.String("backend", name), zap.Error(err)) return err diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index d8d5a5d6d..680e2dffc 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -61,7 +61,6 @@ func (o *openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Polic _ map[string]string, otelConfig map[string]interface{}) error { o.logger = logger o.logger.Info("configuring OpenTelemetry backend") - logger.Info("debug pointer", zap.Reflect("pointer", &o)) o.policyRepo = repo var err error o.otelReceiverTaps = []string{} From 2891677764bcbf1b120514e834ced885c6563584 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Thu, 17 Aug 2023 11:43:16 -0300 Subject: [PATCH 020/434] feat(agent): fix startup and multiple routines working the same executable. --- .idea/runConfigurations/orb_agent.xml | 2 +- Makefile | 2 +- agent/backend/otel/otel.go | 46 ++++++++++++++++----------- agent/backend/otel/otel_sample.go | 17 ++++++---- agent/backend/otel/policy.go | 15 ++------- agent/logging.go | 4 +-- 6 files changed, 44 insertions(+), 42 deletions(-) diff --git a/.idea/runConfigurations/orb_agent.xml b/.idea/runConfigurations/orb_agent.xml index 9f9c71328..4d08b7c75 100644 --- a/.idea/runConfigurations/orb_agent.xml +++ b/.idea/runConfigurations/orb_agent.xml @@ -2,7 +2,7 @@ - +