From 16789daf21f06b9f25031a7234fd3e004d8170b6 Mon Sep 17 00:00:00 2001 From: Rajesh Samala Date: Wed, 31 Jul 2024 14:44:31 +0530 Subject: [PATCH] Otel dist metrics change (#96) * Occ config logging (#394) * occ configurations logging * cal event success * adding cal data * added TODOs * Remove log_occ_confogs.go * Remove testing files * source of configs - files * whitelist format change * code clean up * code review changes-1 * CR fixes * CR fixes * Delete tests/unittest/config_logging/main_test.go * clean up * Merge branch 'occ-config-logging' of /Users/simmidisetty/Documents/GitHub/OpenSourceHera/src/github.com/paypal/hera with conflicts. * test for config logging * removing test changes * tests for all cases * test * making minor changes for logging feature specific data * changes for incorporate review comments --------- Co-authored-by: simmidisetty Co-authored-by: Rajesh S * changes for otel integration in hera --------- Co-authored-by: satyakamala03 <128077872+satyakamala03@users.noreply.github.com> Co-authored-by: simmidisetty Co-authored-by: Rajesh S --- lib/statelog.go | 3 +- tests/unittest/otel_basic/main_test.go | 8 +- tests/unittest/testutil/main.go | 6 +- utility/logger/otel/defs.go | 84 ++---- utility/logger/otel/logger.go | 118 +++++++- utility/logger/otel/state_logger.go | 251 ++++-------------- utility/logger/otel/test/state_logger_test.go | 17 +- 7 files changed, 218 insertions(+), 269 deletions(-) diff --git a/lib/statelog.go b/lib/statelog.go index 717f2679..89d54c5b 100644 --- a/lib/statelog.go +++ b/lib/statelog.go @@ -19,6 +19,7 @@ package lib import ( "bytes" + "context" "errors" "fmt" otel_logger "github.com/paypal/hera/utility/logger/otel" @@ -571,7 +572,7 @@ func (sl *StateLog) init() error { if otelconfig.OTelConfigData.Enabled { // Initialize statelog_metrics to send metrics information currently we are ignoring registration object returned from this call - stateStartErr := otel_logger.StartMetricsCollection(totalWorkersCount, + stateStartErr := otel_logger.StartMetricsCollection(context.Background(), totalWorkersCount, otel_logger.WithMetricProvider(otel.GetMeterProvider()), otel_logger.WithAppName(otelconfig.OTelConfigData.PoolName)) diff --git a/tests/unittest/otel_basic/main_test.go b/tests/unittest/otel_basic/main_test.go index 7bfa327b..f8877584 100644 --- a/tests/unittest/otel_basic/main_test.go +++ b/tests/unittest/otel_basic/main_test.go @@ -28,9 +28,11 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { appcfg["rac_sql_interval"] = "0" appcfg["child.executable"] = "mysqlworker" appcfg["enable_otel"] = "true" - appcfg["otel_resolution_time_in_sec"] = "1" + appcfg["otel_resolution_time_in_sec"] = "10" opscfg := make(map[string]string) opscfg["opscfg.default.server.max_connections"] = "3" + appcfg["cfg_from_tns"] = "false" + appcfg["num_standby_dbs"] = "0" opscfg["opscfg.default.server.log_level"] = "5" os.Setenv("AVAILABILITY_ZONE", "test-dev") os.Setenv("ENVIRONMENT", "dev") @@ -108,9 +110,9 @@ func TestOTELMetricsBasic(t *testing.T) { if count < 1 { t.Fatalf("OTEL event should contain application as hera-test") } - initCount := testutil.RegexCountFile("\"name\":\"pp.occ.init_connection.cnt\"", logFilePath) + initCount := testutil.RegexCountFile("\"name\":\"pp.occ.init_connection\"", logFilePath) if initCount < 1 { - t.Fatalf("OTEL event should contain metric name pp.occ.init_connection.cnt") + t.Fatalf("OTEL event should contain metric name pp.occ.init_connection") } tagsCount := testutil.RegexCountFile("{\"key\":\"InstanceId\",\"value\":{\"intValue\":\"0\"}},{\"key\":\"ShardId\",\"value\":{\"intValue\":\"0\"}},{\"key\":\"WorkerType\",\"value\":{\"intValue\":\"0\"}", logFilePath) diff --git a/tests/unittest/testutil/main.go b/tests/unittest/testutil/main.go index 3e08310e..e3150abe 100644 --- a/tests/unittest/testutil/main.go +++ b/tests/unittest/testutil/main.go @@ -23,9 +23,9 @@ func setup(cfg cfgFunc) error { if appcfg["enable_otel"] == "true" { err = mx.StartOTelAgent() } - if err != nil { - return err - } + //if err != nil { + // return err + //} err = mx.StartServer() return err } diff --git a/utility/logger/otel/defs.go b/utility/logger/otel/defs.go index ca8a7152..4819e39d 100644 --- a/utility/logger/otel/defs.go +++ b/utility/logger/otel/defs.go @@ -9,30 +9,19 @@ import ( // Following Metric Names will get instrumented as part of StateLogMetrics const ( // Worker States - InitConnGuageMetric = "init_connection.cnt" - AccptConnGuageMetric = "accept_connection.cnt" - WaitConnGuageMetric = "wait_connection.cnt" - BusyConnGuageMetric = "busy_connection.cnt" - ScheduledConnGuageMetric = "scheduled_connection.cnt" - FinishedConnGuageMetric = "finished_connection.cnt" - QuiescedConnGuageMetric = "quiesced_connection.cnt" + InitConnMetric = "init_connection" + AccptConnMetric = "accept_connection" + WaitConnMetric = "wait_connection" + BusyConnMetric = "busy_connection" + ScheduledConnMetric = "scheduled_connection" + FinishedConnMetric = "finished_connection" + QuiescedConnMetric = "quiesced_connection" // Connection States - AssignedConnGuageMetric = "assigned_connection.cnt" - IdleConnGuageMetric = "idle_connection.cnt" - BacklogConnGuageMetric = "backlog_connection.cnt" - StrdConnGuageMetric = "stranded_connection.cnt" - - InitMaxGuageMetric = "init_connection.cnt.max" - AcceptMinGuageMetric = "accept_connection.cnt.min" - WaitMaxGuageMetric = "wait_connection.cnt.max" - BusyMaxGuageMetric = "busy_connection.cnt.max" - SchdMaxGuageMetric = "scheduled_connection.cnt.max" - QuiescedMaxGuageMetric = "quiesced_connection.cnt.max" - - IdleMaxGuageMetric = "idle_connection.cnt.max" - BacklogMaxGuageMetric = "backlog_connection.cnt.max" - StrdMaxGuageMetric = "stranded_connection.cnt.max" + AssignedConnMetric = "assigned_connection" + IdleConnMetric = "idle_connection" + BacklogConnMetric = "backlog_connection" + StrdConnMetric = "stranded_connection" ) const ( @@ -51,6 +40,9 @@ const ( ContainerHostDimName = string("container_host") ) +var StatelogBucket = []float64{0, 5, 10, 15, 20, 25, 30, 40, 50, 60, 80, 100, 120, 160, 200} +var ConnectionStateBucket = []float64{0, 25, 50, 75, 100, 150, 200, 300, 400, 500, 600, 700, 800, 1200, 2400, 4800, 9600, 19200, 39400, 65536} + const OtelInstrumentationVersion string = "v1.0" // DEFAULT_OTEL_COLLECTOR_PROTOCOL default OTEL configurations point to QA collector @@ -84,18 +76,6 @@ type ( ServerType int ) -// StateData Represents stats by a worker -type StateData struct { - Name string - Value float64 - Dimensions metric.MeasurementOption -} - -type DataPoint struct { - attr metric.MeasurementOption - data int64 -} - // StateLogMetrics state_log_metrics reports workers states type StateLogMetrics struct { @@ -114,31 +94,17 @@ type StateLogMetrics struct { stateLock sync.Mutex - registration metric.Registration - - initState metric.Int64ObservableGauge - acptState metric.Int64ObservableGauge - waitState metric.Int64ObservableGauge - busyState metric.Int64ObservableGauge - schdState metric.Int64ObservableGauge - fnshState metric.Int64ObservableGauge - quceState metric.Int64ObservableGauge - asgnState metric.Int64ObservableGauge - idleState metric.Int64ObservableGauge - bklgState metric.Int64ObservableGauge - strdState metric.Int64ObservableGauge - - initStateMax metric.Int64ObservableGauge - waitStateMax metric.Int64ObservableGauge - busyStateMax metric.Int64ObservableGauge - schdStateMax metric.Int64ObservableGauge - quceStateMax metric.Int64ObservableGauge - - idleStateMax metric.Int64ObservableGauge - bklgStateMax metric.Int64ObservableGauge - strdStateMax metric.Int64ObservableGauge - - acptStateMin metric.Int64ObservableGauge + initState metric.Int64Histogram + acptState metric.Int64Histogram + waitState metric.Int64Histogram + busyState metric.Int64Histogram + schdState metric.Int64Histogram + fnshState metric.Int64Histogram + quceState metric.Int64Histogram + asgnState metric.Int64Histogram + idleState metric.Int64Histogram + bklgState metric.Int64Histogram + strdState metric.Int64Histogram } // Object represents the workers states data for worker belongs to specific shardId and workperType with flat-map diff --git a/utility/logger/otel/logger.go b/utility/logger/otel/logger.go index 4e88b3d4..2785659c 100644 --- a/utility/logger/otel/logger.go +++ b/utility/logger/otel/logger.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" @@ -116,15 +117,130 @@ func newMeterProvider(ctx context.Context) (*metric.MeterProvider, error) { logger.GetLogger().Log(logger.Alert, "failed to initialize metric exporter, error %v", err) return nil, err } - + metricViews := getStateLogMetricsViews() meterProvider := metric.NewMeterProvider( metric.WithResource(getResourceInfo(config.OTelConfigData.PoolName)), metric.WithReader(metric.NewPeriodicReader(metricExporter, metric.WithInterval(time.Duration(config.OTelConfigData.ResolutionTimeInSec)*time.Second))), + metric.WithView(metricViews...), ) return meterProvider, nil } +func getStateLogMetricsViews() []metric.View { + initView := metric.NewView( + metric.Instrument{ + Name: config.OTelConfigData.PopulateMetricNamePrefix(InitConnMetric), + Scope: instrumentation.Scope{Name: StateLogMeterName}, + }, + metric.Stream{ + Aggregation: metric.AggregationBase2ExponentialHistogram{MaxSize: 32, MaxScale: 20}, + }, + ) + + acptStateView := metric.NewView( + metric.Instrument{ + Name: config.OTelConfigData.PopulateMetricNamePrefix(AccptConnMetric), + Scope: instrumentation.Scope{Name: StateLogMeterName}, + }, + metric.Stream{ + Aggregation: metric.AggregationBase2ExponentialHistogram{MaxSize: 32, MaxScale: 20}, + }, + ) + + waitStateView := metric.NewView( + metric.Instrument{ + Name: config.OTelConfigData.PopulateMetricNamePrefix(WaitConnMetric), + Scope: instrumentation.Scope{Name: StateLogMeterName}, + }, + metric.Stream{ + Aggregation: metric.AggregationBase2ExponentialHistogram{MaxSize: 32, MaxScale: 20}, + }, + ) + + busyStateView := metric.NewView( + metric.Instrument{ + Name: config.OTelConfigData.PopulateMetricNamePrefix(BusyConnMetric), + Scope: instrumentation.Scope{Name: StateLogMeterName}, + }, + metric.Stream{ + Aggregation: metric.AggregationBase2ExponentialHistogram{MaxSize: 32, MaxScale: 20}, + }, + ) + + schdStateView := metric.NewView( + metric.Instrument{ + Name: config.OTelConfigData.PopulateMetricNamePrefix(ScheduledConnMetric), + Scope: instrumentation.Scope{Name: StateLogMeterName}, + }, + metric.Stream{ + Aggregation: metric.AggregationBase2ExponentialHistogram{MaxSize: 32, MaxScale: 20}, + }, + ) + + fnshStateView := metric.NewView( + metric.Instrument{ + Name: config.OTelConfigData.PopulateMetricNamePrefix(FinishedConnMetric), + Scope: instrumentation.Scope{Name: StateLogMeterName}, + }, + metric.Stream{ + Aggregation: metric.AggregationBase2ExponentialHistogram{MaxSize: 32, MaxScale: 20}, + }, + ) + + quceStateView := metric.NewView( + metric.Instrument{ + Name: config.OTelConfigData.PopulateMetricNamePrefix(QuiescedConnMetric), + Scope: instrumentation.Scope{Name: StateLogMeterName}, + }, + metric.Stream{ + Aggregation: metric.AggregationBase2ExponentialHistogram{MaxSize: 32, MaxScale: 20}, + }, + ) + + asgnStateView := metric.NewView( + metric.Instrument{ + Name: config.OTelConfigData.PopulateMetricNamePrefix(AssignedConnMetric), + Scope: instrumentation.Scope{Name: StateLogMeterName}, + }, + metric.Stream{ + Aggregation: metric.AggregationBase2ExponentialHistogram{MaxSize: 32, MaxScale: 20}, + }, + ) + + idleStateView := metric.NewView( + metric.Instrument{ + Name: config.OTelConfigData.PopulateMetricNamePrefix(IdleConnMetric), + Scope: instrumentation.Scope{Name: StateLogMeterName}, + }, + metric.Stream{ + Aggregation: metric.AggregationBase2ExponentialHistogram{MaxSize: 32, MaxScale: 20}, + }, + ) + + bklgStateView := metric.NewView( + metric.Instrument{ + Name: config.OTelConfigData.PopulateMetricNamePrefix(BacklogConnMetric), + Scope: instrumentation.Scope{Name: StateLogMeterName}, + }, + metric.Stream{ + Aggregation: metric.AggregationBase2ExponentialHistogram{MaxSize: 32, MaxScale: 20}, + }, + ) + + strdStateView := metric.NewView( + metric.Instrument{ + Name: config.OTelConfigData.PopulateMetricNamePrefix(StrdConnMetric), + Scope: instrumentation.Scope{Name: StateLogMeterName}, + }, + metric.Stream{ + Aggregation: metric.AggregationBase2ExponentialHistogram{MaxSize: 32, MaxScale: 20}, + }, + ) + return []metric.View{initView, acptStateView, waitStateView, busyStateView, schdStateView, + fnshStateView, quceStateView, asgnStateView, idleStateView, bklgStateView, strdStateView} +} + // getMetricExporter Initialize metric exporter based protocol selected by user. func getMetricExporter(ctx context.Context) (metric.Exporter, error) { if config.OTelConfigData.OtelMetricGRPC { diff --git a/utility/logger/otel/state_logger.go b/utility/logger/otel/state_logger.go index 6295f2e1..f2d4922a 100644 --- a/utility/logger/otel/state_logger.go +++ b/utility/logger/otel/state_logger.go @@ -57,7 +57,7 @@ func newConfig(opts ...StateLogOption) stateLogMetricsConfig { } // StartMetricsCollection initializes reporting of stateLogMetrics using the supplied config. -func StartMetricsCollection(totalWorkersCount int, opt ...StateLogOption) error { +func StartMetricsCollection(ctx context.Context, totalWorkersCount int, opt ...StateLogOption) error { stateLogMetricsConfig := newConfig(opt...) //Verification of config data @@ -86,6 +86,11 @@ func StartMetricsCollection(totalWorkersCount int, opt ...StateLogOption) error doneCh: make(chan struct{}), } err = metricsStateLogger.register() + if err != nil { + logger.GetLogger().Log(logger.Alert, "Failed to register state metrics collector", err) + } else { + go metricsStateLogger.startStateLogMetricsPoll(ctx) + } }) return err } @@ -108,6 +113,13 @@ func AddDataPointToOTELStateDataChan(dataPoint *WorkersStateData) { return case <-time.After(time.Millisecond * 100): logger.GetLogger().Log(logger.Alert, "timeout occurred while adding record to stats data channel") + default: + select { + case metricsStateLogger.mStateDataChan <- dataPoint: + return + default: + logger.GetLogger().Log(logger.Alert, "metricsStateLogger.mStateData channel closed or full while sending data") + } } } @@ -116,195 +128,93 @@ func (stateLogMetrics *StateLogMetrics) register() error { //"init", "acpt", "wait", "busy", "schd", "fnsh", "quce", "asgn", "idle", "bklg", "strd", "cls" var err error - if stateLogMetrics.initState, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(InitConnGuageMetric), + if stateLogMetrics.initState, err = stateLogMetrics.meter.Int64Histogram( + otelconfig.OTelConfigData.PopulateMetricNamePrefix(InitConnMetric), metric.WithDescription("Number of workers in init state"), ); err != nil { logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for init state", err) return err } - if stateLogMetrics.acptState, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(AccptConnGuageMetric), + if stateLogMetrics.acptState, err = stateLogMetrics.meter.Int64Histogram( + otelconfig.OTelConfigData.PopulateMetricNamePrefix(AccptConnMetric), metric.WithDescription("Number of workers in accept state"), ); err != nil { logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for accept state", err) return err } - if stateLogMetrics.waitState, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(WaitConnGuageMetric), + if stateLogMetrics.waitState, err = stateLogMetrics.meter.Int64Histogram( + otelconfig.OTelConfigData.PopulateMetricNamePrefix(WaitConnMetric), metric.WithDescription("Number of workers in wait state"), ); err != nil { logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for wait state", err) return err } - if stateLogMetrics.busyState, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(BusyConnGuageMetric), + if stateLogMetrics.busyState, err = stateLogMetrics.meter.Int64Histogram( + otelconfig.OTelConfigData.PopulateMetricNamePrefix(BusyConnMetric), metric.WithDescription("Number of workers in busy state"), ); err != nil { logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for busy state", err) return err } - if stateLogMetrics.schdState, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(ScheduledConnGuageMetric), + if stateLogMetrics.schdState, err = stateLogMetrics.meter.Int64Histogram( + otelconfig.OTelConfigData.PopulateMetricNamePrefix(ScheduledConnMetric), metric.WithDescription("Number of workers in scheduled state"), ); err != nil { logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for scheduled state", err) return err } - if stateLogMetrics.fnshState, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(FinishedConnGuageMetric), + if stateLogMetrics.fnshState, err = stateLogMetrics.meter.Int64Histogram( + otelconfig.OTelConfigData.PopulateMetricNamePrefix(FinishedConnMetric), metric.WithDescription("Number of workers in finished state"), ); err != nil { logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for finished state", err) return err } - if stateLogMetrics.quceState, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(QuiescedConnGuageMetric), + if stateLogMetrics.quceState, err = stateLogMetrics.meter.Int64Histogram( + otelconfig.OTelConfigData.PopulateMetricNamePrefix(QuiescedConnMetric), metric.WithDescription("Number of workers in quiesced state"), ); err != nil { logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for quiesced state", err) return err } - if stateLogMetrics.asgnState, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(AssignedConnGuageMetric), + if stateLogMetrics.asgnState, err = stateLogMetrics.meter.Int64Histogram( + otelconfig.OTelConfigData.PopulateMetricNamePrefix(AssignedConnMetric), metric.WithDescription("Number of workers in assigned state"), ); err != nil { logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for assigned state", err) return err } - if stateLogMetrics.idleState, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(IdleConnGuageMetric), + if stateLogMetrics.idleState, err = stateLogMetrics.meter.Int64Histogram( + otelconfig.OTelConfigData.PopulateMetricNamePrefix(IdleConnMetric), metric.WithDescription("Number of workers in idle state"), ); err != nil { logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for idle state", err) return err } - if stateLogMetrics.bklgState, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(BacklogConnGuageMetric), + if stateLogMetrics.bklgState, err = stateLogMetrics.meter.Int64Histogram( + otelconfig.OTelConfigData.PopulateMetricNamePrefix(BacklogConnMetric), metric.WithDescription("Number of workers in backlog state"), ); err != nil { logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for backlog state", err) return err } - if stateLogMetrics.strdState, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(StrdConnGuageMetric), + if stateLogMetrics.strdState, err = stateLogMetrics.meter.Int64Histogram( + otelconfig.OTelConfigData.PopulateMetricNamePrefix(StrdConnMetric), metric.WithDescription("Number of connections in stranded state"), ); err != nil { logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for stranded state", err) return err } - //Initialize max metrics - if stateLogMetrics.initStateMax, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(InitMaxGuageMetric), - metric.WithDescription("Maximum Number of workers in init state within resolution time"), - ); err != nil { - logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for init max state", err) - return err - } - - if stateLogMetrics.waitStateMax, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(WaitMaxGuageMetric), - metric.WithDescription("Maximum Number of workers in wait state within resolution time"), - ); err != nil { - logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for max wait state", err) - return err - } - - if stateLogMetrics.busyStateMax, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(BusyMaxGuageMetric), - metric.WithDescription("Maximum Number of workers in busy state within resolution time"), - ); err != nil { - logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for max busy state", err) - return err - } - - if stateLogMetrics.schdStateMax, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(SchdMaxGuageMetric), - metric.WithDescription("Maximum Number of workers in scheduled state within resolution time"), - ); err != nil { - logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for scheduled state", err) - return err - } - - if stateLogMetrics.quceStateMax, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(QuiescedMaxGuageMetric), - metric.WithDescription("Maximum Number of workers in quiesced state within resolution time"), - ); err != nil { - logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for quiesced state", err) - return err - } - - if stateLogMetrics.idleStateMax, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(IdleMaxGuageMetric), - metric.WithDescription("Maximum Number of client connections in idle state within resolution time"), - ); err != nil { - logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for max idle state", err) - return err - } - - if stateLogMetrics.bklgStateMax, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(BacklogMaxGuageMetric), - metric.WithDescription("Maximum Number of client connections in backlog state within resolution time"), - ); err != nil { - logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for max backlog state", err) - return err - } - - if stateLogMetrics.strdStateMax, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(StrdMaxGuageMetric), - metric.WithDescription("Maximum Number of client connections in idle state within resolution time"), - ); err != nil { - logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for max stranded state", err) - return err - } - - //Initialize min for accpet - if stateLogMetrics.acptStateMin, err = stateLogMetrics.meter.Int64ObservableGauge( - otelconfig.OTelConfigData.PopulateMetricNamePrefix(AcceptMinGuageMetric), - metric.WithDescription("Minimum Number of workers in accept state within resolution time"), - ); err != nil { - logger.GetLogger().Log(logger.Alert, "Failed to register guage metric for min accept state", err) - return err - } - - stateLogMetrics.registration, err = stateLogMetrics.meter.RegisterCallback( - func(ctx context.Context, observer metric.Observer) error { - return stateLogMetrics.asyncStateLogMetricsPoll(observer) - }, - []metric.Observable{ - stateLogMetrics.initState, - stateLogMetrics.acptState, - stateLogMetrics.waitState, - stateLogMetrics.busyState, - stateLogMetrics.schdState, - stateLogMetrics.fnshState, - stateLogMetrics.quceState, - stateLogMetrics.asgnState, - stateLogMetrics.idleState, - stateLogMetrics.bklgState, - stateLogMetrics.strdState, - - stateLogMetrics.initStateMax, //Max - stateLogMetrics.waitStateMax, - stateLogMetrics.busyStateMax, - stateLogMetrics.schdStateMax, - stateLogMetrics.quceStateMax, - stateLogMetrics.idleStateMax, - stateLogMetrics.bklgStateMax, - stateLogMetrics.strdStateMax, - - stateLogMetrics.acptStateMin, //Min - }...) if err != nil { return err @@ -313,19 +223,16 @@ func (stateLogMetrics *StateLogMetrics) register() error { } /* - * AasyncStatelogMetricsPoll poll operation involved periodically by OTEL collector based-on its polling interval + * asyncStateLogMetricsPoll poll operation involved periodically by OTEL collector based-on its polling interval * it poll metrics from channel do aggregation or compute max based combination of shardId + workerType + InstanceId */ -func (stateLogMetrics *StateLogMetrics) asyncStateLogMetricsPoll(observer metric.Observer) (err error) { - stateLogMetrics.stateLock.Lock() - defer stateLogMetrics.stateLock.Unlock() - stateLogsData := make(map[string]map[string]int64) - var stateLogTitle string - //Infinite loop read through the channel and send metrics +func (stateLogMetrics *StateLogMetrics) startStateLogMetricsPoll(ctx context.Context) { mainloop: for { select { case workersState, more := <-stateLogMetrics.mStateDataChan: + stateLogsData := make(map[string]map[string]int64) + var stateLogTitle string if !more { logger.GetLogger().Log(logger.Info, "Statelog metrics data channel 'mStateDataChan' has been closed.") break mainloop @@ -341,52 +248,28 @@ mainloop: stateLogsData[keyName][WorkerType] = int64(workersState.WorkerType) stateLogsData[keyName][InstanceId] = int64(workersState.InstanceId) stateLogsData[keyName][Datapoints] += 1 - for key, value := range workersState.StateData { - if key == "req" || key == "resp" { - stateLogsData[keyName][key] += value - } else { - maxKey := key + "Max" - minKey := key + "Min" - stateLogsData[keyName][key] = value - //check max update max value - _, keyPresent := stateLogsData[keyName][maxKey] - if !keyPresent { - stateLogsData[keyName][maxKey] = value - } - if stateLogsData[keyName][maxKey] < value { - stateLogsData[keyName][maxKey] = value - } - //Min value - _, keyPresent = stateLogsData[keyName][minKey] - if !keyPresent { - stateLogsData[keyName][minKey] = value - } - if stateLogsData[keyName][minKey] > value { - stateLogsData[keyName][minKey] = value - } - } + stateLogsData[keyName][key] = value + } + if len(stateLogsData) > 0 { + stateLogMetrics.sendMetricsDataToCollector(ctx, &stateLogTitle, stateLogsData) } case <-stateLogMetrics.doneCh: logger.GetLogger().Log(logger.Info, "received stopped signal for processing statelog metric. "+ - "so unregistering callback for sending data and closing data channel") + "so stop sending data and closing data channel") close(stateLogMetrics.mStateDataChan) - stateLogMetrics.registration.Unregister() - default: break mainloop + case <-time.After(1000 * time.Millisecond): + logger.GetLogger().Log(logger.Info, "timeout on waiting for statelog metrics data") + continue mainloop } } - //Process metrics data - if len(stateLogsData) > 0 { - err = stateLogMetrics.sendMetricsDataToCollector(observer, &stateLogTitle, stateLogsData) - } - return err } /* * Send metrics datat data-points to collector */ -func (stateLogMetrics *StateLogMetrics) sendMetricsDataToCollector(observer metric.Observer, stateLogTitle *string, stateLogsData map[string]map[string]int64) (err error) { +func (stateLogMetrics *StateLogMetrics) sendMetricsDataToCollector(ctx context.Context, stateLogTitle *string, stateLogsData map[string]map[string]int64) { for key, aggStatesData := range stateLogsData { logger.GetLogger().Log(logger.Info, fmt.Sprintf("publishing metric with calculated max value and aggregation of gauge for shardid-workertype-instanceId: %s using datapoints size: %d", key, aggStatesData[Datapoints])) commonLabels := []attribute.KeyValue{ @@ -398,34 +281,18 @@ func (stateLogMetrics *StateLogMetrics) sendMetricsDataToCollector(observer metr } //Observe states data //1. Worker States - observer.ObserveInt64(stateLogMetrics.initState, aggStatesData["init"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.acptState, aggStatesData["acpt"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.waitState, aggStatesData["wait"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.busyState, aggStatesData["busy"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.schdState, aggStatesData["schd"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.fnshState, aggStatesData["fnsh"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.quceState, aggStatesData["quce"], metric.WithAttributes(commonLabels...)) + stateLogMetrics.initState.Record(ctx, aggStatesData["init"], metric.WithAttributes(commonLabels...)) + stateLogMetrics.acptState.Record(ctx, aggStatesData["acpt"], metric.WithAttributes(commonLabels...)) + stateLogMetrics.waitState.Record(ctx, aggStatesData["wait"], metric.WithAttributes(commonLabels...)) + stateLogMetrics.busyState.Record(ctx, aggStatesData["busy"], metric.WithAttributes(commonLabels...)) + stateLogMetrics.schdState.Record(ctx, aggStatesData["schd"], metric.WithAttributes(commonLabels...)) + stateLogMetrics.fnshState.Record(ctx, aggStatesData["fnsh"], metric.WithAttributes(commonLabels...)) + stateLogMetrics.quceState.Record(ctx, aggStatesData["quce"], metric.WithAttributes(commonLabels...)) //2. Connection States - observer.ObserveInt64(stateLogMetrics.asgnState, aggStatesData["asgn"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.idleState, aggStatesData["idle"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.bklgState, aggStatesData["bklg"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.strdState, aggStatesData["strd"], metric.WithAttributes(commonLabels...)) - - //3. Worker States Max values - observer.ObserveInt64(stateLogMetrics.initStateMax, aggStatesData["initMax"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.waitStateMax, aggStatesData["waitMax"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.busyStateMax, aggStatesData["busyMax"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.schdStateMax, aggStatesData["schdMax"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.quceStateMax, aggStatesData["quceMax"], metric.WithAttributes(commonLabels...)) - - //4. Connection States Max values - observer.ObserveInt64(stateLogMetrics.idleStateMax, aggStatesData["idleMax"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.bklgStateMax, aggStatesData["bklgMax"], metric.WithAttributes(commonLabels...)) - observer.ObserveInt64(stateLogMetrics.strdStateMax, aggStatesData["strdMax"], metric.WithAttributes(commonLabels...)) - - //5. Min accept state - observer.ObserveInt64(stateLogMetrics.acptStateMin, aggStatesData["acptMin"], metric.WithAttributes(commonLabels...)) + stateLogMetrics.asgnState.Record(ctx, aggStatesData["asgn"], metric.WithAttributes(commonLabels...)) + stateLogMetrics.idleState.Record(ctx, aggStatesData["idle"], metric.WithAttributes(commonLabels...)) + stateLogMetrics.bklgState.Record(ctx, aggStatesData["bklg"], metric.WithAttributes(commonLabels...)) + stateLogMetrics.strdState.Record(ctx, aggStatesData["strd"], metric.WithAttributes(commonLabels...)) } - return nil } diff --git a/utility/logger/otel/test/state_logger_test.go b/utility/logger/otel/test/state_logger_test.go index f994738c..87c455e8 100644 --- a/utility/logger/otel/test/state_logger_test.go +++ b/utility/logger/otel/test/state_logger_test.go @@ -93,7 +93,7 @@ func TestVerifyStateLogMetricsInitilization(t *testing.T) { t.Fail() } - err = otellogger.StartMetricsCollection(5, otellogger.WithMetricProvider(otel.GetMeterProvider()), otellogger.WithAppName("occ-testapp")) + err = otellogger.StartMetricsCollection(context.Background(), 5, otellogger.WithMetricProvider(otel.GetMeterProvider()), otellogger.WithAppName("occ-testapp")) if err != nil { logger.GetLogger().Log(logger.Alert, "Failed to initialize Metric Collection service") @@ -116,7 +116,7 @@ func TestVerifyStateLogMetricsInitilizationAndContextWithTimeout(t *testing.T) { t.Fail() } - err = otellogger.StartMetricsCollection(5, otellogger.WithMetricProvider(otel.GetMeterProvider()), otellogger.WithAppName("occ-testapp")) + err = otellogger.StartMetricsCollection(context.Background(), 5, otellogger.WithMetricProvider(otel.GetMeterProvider()), otellogger.WithAppName("occ-testapp")) defer otellogger.StopMetricCollection() if err != nil { @@ -137,7 +137,7 @@ func TestSendingStateLogMetrics(t *testing.T) { time.Sleep(2 * time.Second) - err := otellogger.StartMetricsCollection(5, otellogger.WithMetricProvider(otel.GetMeterProvider()), otellogger.WithAppName("occ-testapp")) + err := otellogger.StartMetricsCollection(context.Background(), 5, otellogger.WithMetricProvider(otel.GetMeterProvider()), otellogger.WithAppName("occ-testapp")) if err != nil { logger.GetLogger().Log(logger.Alert, "Failed to initialize Metric Collection service") @@ -167,8 +167,8 @@ func TestSendingStateLogMetrics(t *testing.T) { logger.GetLogger().Log(logger.Info, "Data Sent successfully for instrumentation") time.Sleep(5 * time.Second) metricsData := mc.GetMetrics() - if len(metricsData) < 20 { - t.Fatalf("got %d, wanted %d", len(metricsData), 20) + if len(metricsData) < 11 { + t.Fatalf("got %d, wanted %d", len(metricsData), 11) } } @@ -178,7 +178,7 @@ func TestSendingStateLogMetricsConsoleExporter(t *testing.T) { t.Fail() } - err2 := otellogger.StartMetricsCollection(100, otellogger.WithMetricProvider(otel.GetMeterProvider()), otellogger.WithAppName("occ-testapp2")) + err2 := otellogger.StartMetricsCollection(context.Background(), 100, otellogger.WithMetricProvider(otel.GetMeterProvider()), otellogger.WithAppName("occ-testapp2")) if err2 != nil { logger.GetLogger().Log(logger.Alert, "Failed to initialize Metric Collection service") @@ -268,7 +268,7 @@ func TestOCCStatelogGenerator(t *testing.T) { } defer cont.Shutdown(context.Background()) - err2 := otellogger.StartMetricsCollection(1000, otellogger.WithMetricProvider(otel.GetMeterProvider()), otellogger.WithAppName("occ-testapp")) + err2 := otellogger.StartMetricsCollection(context.Background(), 1000, otellogger.WithMetricProvider(otel.GetMeterProvider()), otellogger.WithAppName("occ-testapp")) defer otellogger.StopMetricCollection() go dataGenerator() @@ -285,7 +285,6 @@ func dataGenerator() { waitTime := time.Second * 1 metricNames := [11]string{"init", "acpt", "wait", "busy", "schd", "fnsh", "quce", "asgn", "idle", "bklg", "strd"} - workerStates := [2]string{"req", "resp"} timer := time.NewTimer(waitTime) @@ -316,8 +315,6 @@ mainloop: //Random index randIndex := rand.Intn(len(metricNames)) workerStatesData.StateData[metricNames[randIndex]] += int64(totalSum - tempSum) - workerStatesData.StateData[workerStates[0]] = int64(rand.Intn(100)) - workerStatesData.StateData[workerStates[1]] = int64(rand.Intn(100)) otellogger.AddDataPointToOTELStateDataChan(&workerStatesData) timer.Reset(waitTime) case <-ctx.Done():