Skip to content

Commit

Permalink
Merge #86671 #87084
Browse files Browse the repository at this point in the history
86671: metric: migrate all histograms to use prometheus-backed version  r=aadityasondhi a=aadityasondhi

In a previous change, a new prometheus-backed histogram library was
introduced to help standardize histogram buckets across the codebase.
This change migrates all existing histograms to use the new library.

In this change, `NewLatency()` is removed in favor of explicitly defining
which buckets to use between `NetworkLatencyBuckets` and
`IOLatencyBuckets` when calling `NewHistogram()`. For all histograms
that were previously created using the `NewLatency()` func, I tried to
place them in appropriate buckets with the new library. For cases where
it was unclear, I chose `IOLatencyBuckets` as it allows for a larger
range of values.

related: #85990

Release justification: low risk, high benefit

Release note (ops change): This change introduces a new histogram
implementation that will reduce the total number of buckets and
standardize them across all usage. This should help increase the
usability of histograms when exported to a UI (i.e. Grafana) and reduce
the storage overhead.

After applying this patch it is expected to see fewer buckets in
prometheus/grafana, but still  have similar values for histogram
percentiles due to the use of interpolated values by Prometheus.

87084: roachtest: introduce multi tenant TPCH test r=yuzefovich a=yuzefovich

**roachtest: sort tests lexicographically**

This commit unexports a couple of registration methods of the roachtests
and then orders all such method lexicographically in the registry.

Release justification: test-only change.

Release note: None

**roachtest: introduce multi tenant TPCH test**

This commit introduces a new roachtest that runs TPCH benchmark on
a single node in a single-tenant deployment first followed by another
run in a multi-tenant deployment with a single SQL instance. It
refactors the tpchvec roachtest a bit to extract a couple of helper
methods for performing the latency analysis. In particular, it removes
`queriesToRun` parameter since all variants now run all 22 queries.

I ran into some difficulties around managing the certificates in
different deployment models, so the test is structured that first the
single-node cluster is used in a single-tenant deployment model, and
then - after running all 22 queries - a tenant with a single SQL
instance is created and all queries are run within the tenant.

Here is the comparison of averages over 10 runs of single-tenant vs
multi-tenant:
```
Q1: 6.34s	13.35s	110.39%
Q2: 0.22s	0.49s	122.07%
Q3: 4.88s	8.04s	64.67%
Q4: 1.51s	4.75s	213.67%
Q5: 2.33s	11.69s	400.90%
Q6: 5.51s	35.49s	543.89%
Q7: 5.75s	24.08s	318.42%
Q8: 0.85s	3.82s	348.47%
Q9: 7.34s	28.37s	286.25%
Q10: 1.99s	5.00s	150.93%
Q11: 0.55s	1.92s	249.00%
Q12: 6.02s	34.76s	477.05%
Q13: 1.88s	3.76s	100.00%
Q14: 0.64s	1.10s	73.11%
Q15: 3.33s	16.80s	404.23%
Q16: 0.88s	1.29s	47.66%
Q17: 0.24s	0.60s	145.49%
Q18: 7.75s	30.13s	288.48%
Q19: 5.20s	13.08s	151.69%
Q20: 12.66s	55.30s	336.92%
Q21: 6.98s	24.50s	250.77%
Q22: 0.62s	1.17s	90.26%
```

Fixes: #71528.

Release justification: test-only change.

Release note: None

Co-authored-by: Aaditya Sondhi <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Sep 1, 2022
3 parents 9911916 + a82aa82 + 26881f4 commit 5aebc22
Show file tree
Hide file tree
Showing 44 changed files with 762 additions and 710 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ func (j *jobState) checkpointCompleted(ctx context.Context, checkpointDuration t

j.metrics.CheckpointHistNanos.RecordValue(checkpointDuration.Nanoseconds())
j.lastProgressUpdate = j.ts.Now()
j.checkpointDuration = time.Duration(j.metrics.CheckpointHistNanos.Snapshot().Mean())
j.checkpointDuration = time.Duration(j.metrics.CheckpointHistNanos.Mean())
j.progressUpdatesSkipped = false
}

Expand Down
52 changes: 19 additions & 33 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,6 @@ func (w *wrappingCostController) getBackfillRangeCallback() func(int64) (func(),
return w.inner.getBackfillRangeCallback()
}

const (
changefeedCheckpointHistMaxLatency = 30 * time.Second
changefeedBatchHistMaxLatency = 30 * time.Second
changefeedFlushHistMaxLatency = 1 * time.Minute
admitLatencyMaxValue = 1 * time.Minute
commitLatencyMaxValue = 10 * time.Minute
)

var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
Expand Down Expand Up @@ -450,20 +442,15 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
a := &AggMetrics{
ErrorRetries: b.Counter(metaChangefeedErrorRetries),
EmittedMessages: b.Counter(metaChangefeedEmittedMessages),
MessageSize: b.Histogram(metaMessageSize,
histogramWindow, 10<<20 /* 10MB max message size */, 1),
EmittedBytes: b.Counter(metaChangefeedEmittedBytes),
FlushedBytes: b.Counter(metaChangefeedFlushedBytes),
Flushes: b.Counter(metaChangefeedFlushes),

BatchHistNanos: b.Histogram(metaChangefeedBatchHistNanos,
histogramWindow, changefeedBatchHistMaxLatency.Nanoseconds(), 1),
FlushHistNanos: b.Histogram(metaChangefeedFlushHistNanos,
histogramWindow, changefeedFlushHistMaxLatency.Nanoseconds(), 2),
CommitLatency: b.Histogram(metaCommitLatency,
histogramWindow, commitLatencyMaxValue.Nanoseconds(), 1),
AdmitLatency: b.Histogram(metaAdmitLatency, histogramWindow,
admitLatencyMaxValue.Nanoseconds(), 1),
MessageSize: b.Histogram(metaMessageSize, histogramWindow, metric.DataSize16MBBuckets),
EmittedBytes: b.Counter(metaChangefeedEmittedBytes),
FlushedBytes: b.Counter(metaChangefeedFlushedBytes),
Flushes: b.Counter(metaChangefeedFlushes),

BatchHistNanos: b.Histogram(metaChangefeedBatchHistNanos, histogramWindow, metric.BatchProcessLatencyBuckets),
FlushHistNanos: b.Histogram(metaChangefeedFlushHistNanos, histogramWindow, metric.BatchProcessLatencyBuckets),
CommitLatency: b.Histogram(metaCommitLatency, histogramWindow, metric.BatchProcessLatencyBuckets),
AdmitLatency: b.Histogram(metaAdmitLatency, histogramWindow, metric.BatchProcessLatencyBuckets),
BackfillCount: b.Gauge(metaChangefeedBackfillCount),
BackfillPendingRanges: b.Gauge(metaChangefeedBackfillPendingRanges),
RunningCount: b.Gauge(metaChangefeedRunning),
Expand Down Expand Up @@ -566,17 +553,16 @@ func (m *Metrics) getSLIMetrics(scope string) (*sliMetrics, error) {
// MakeMetrics makes the metrics for changefeed monitoring.
func MakeMetrics(histogramWindow time.Duration) metric.Struct {
m := &Metrics{
AggMetrics: newAggregateMetrics(histogramWindow),
KVFeedMetrics: kvevent.MakeMetrics(histogramWindow),
SchemaFeedMetrics: schemafeed.MakeMetrics(histogramWindow),
ResolvedMessages: metric.NewCounter(metaChangefeedForwardedResolvedMessages),
Failures: metric.NewCounter(metaChangefeedFailures),
QueueTimeNanos: metric.NewCounter(metaEventQueueTime),
CheckpointHistNanos: metric.NewHistogram(metaChangefeedCheckpointHistNanos, histogramWindow,
changefeedCheckpointHistMaxLatency.Nanoseconds(), 2),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow),
ReplanCount: metric.NewCounter(metaChangefeedReplanCount),
AggMetrics: newAggregateMetrics(histogramWindow),
KVFeedMetrics: kvevent.MakeMetrics(histogramWindow),
SchemaFeedMetrics: schemafeed.MakeMetrics(histogramWindow),
ResolvedMessages: metric.NewCounter(metaChangefeedForwardedResolvedMessages),
Failures: metric.NewCounter(metaChangefeedFailures),
QueueTimeNanos: metric.NewCounter(metaEventQueueTime),
CheckpointHistNanos: metric.NewHistogram(metaChangefeedCheckpointHistNanos, histogramWindow, metric.IOLatencyBuckets),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow),
ReplanCount: metric.NewCounter(metaChangefeedReplanCount),
}

m.mu.resolved = make(map[int]hlc.Timestamp)
Expand Down
10 changes: 7 additions & 3 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,9 @@ func TestConnector_dialTenantCluster(t *testing.T) {
defer cancel()

c := &connector{
DialTenantLatency: metric.NewLatency(metaDialTenantLatency, time.Millisecond),
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency, time.Millisecond, metric.NetworkLatencyBuckets,
),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
c.testingKnobs.lookupAddr = func(ctx context.Context) (string, error) {
Expand Down Expand Up @@ -403,8 +405,10 @@ func TestConnector_dialTenantCluster(t *testing.T) {

var reportFailureFnCount int
c := &connector{
TenantID: roachpb.MakeTenantID(42),
DialTenantLatency: metric.NewLatency(metaDialTenantLatency, time.Millisecond),
TenantID: roachpb.MakeTenantID(42),
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency, time.Millisecond, metric.NetworkLatencyBuckets,
),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
c.DirectoryCache = &testTenantDirectoryCache{
Expand Down
22 changes: 7 additions & 15 deletions pkg/ccl/sqlproxyccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,6 @@ func (metrics) MetricStruct() {}

var _ metric.Struct = metrics{}

const (
// maxExpectedTransferResponseMessageSize corresponds to maximum expected
// response message size for the SHOW TRANSFER STATE query. We choose 16MB
// here to match the defaultMaxReadBufferSize used for ingesting SQL
// statements in the SQL server (see pkg/sql/pgwire/pgwirebase/encoding.go).
//
// This will be used to tune sql.session_transfer.max_session_size.
maxExpectedTransferResponseMessageSize = 1 << 24 // 16MB
)

var (
metaCurConnCount = metric.Metadata{
Name: "proxy.sql.conns",
Expand Down Expand Up @@ -224,32 +214,34 @@ func makeProxyMetrics() metrics {
RoutingErrCount: metric.NewCounter(metaRoutingErrCount),
RefusedConnCount: metric.NewCounter(metaRefusedConnCount),
SuccessfulConnCount: metric.NewCounter(metaSuccessfulConnCount),
ConnectionLatency: metric.NewLatency(
ConnectionLatency: metric.NewHistogram(
metaConnMigrationAttemptedCount,
base.DefaultHistogramWindowInterval(),
metric.NetworkLatencyBuckets,
),
AuthFailedCount: metric.NewCounter(metaAuthFailedCount),
ExpiredClientConnCount: metric.NewCounter(metaExpiredClientConnCount),
// Connector metrics.
DialTenantLatency: metric.NewLatency(
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency,
base.DefaultHistogramWindowInterval(),
metric.NetworkLatencyBuckets,
),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
// Connection migration metrics.
ConnMigrationSuccessCount: metric.NewCounter(metaConnMigrationSuccessCount),
ConnMigrationErrorFatalCount: metric.NewCounter(metaConnMigrationErrorFatalCount),
ConnMigrationErrorRecoverableCount: metric.NewCounter(metaConnMigrationErrorRecoverableCount),
ConnMigrationAttemptedCount: metric.NewCounter(metaConnMigrationAttemptedCount),
ConnMigrationAttemptedLatency: metric.NewLatency(
ConnMigrationAttemptedLatency: metric.NewHistogram(
metaConnMigrationAttemptedLatency,
base.DefaultHistogramWindowInterval(),
metric.NetworkLatencyBuckets,
),
ConnMigrationTransferResponseMessageSize: metric.NewHistogram(
metaConnMigrationTransferResponseMessageSize,
base.DefaultHistogramWindowInterval(),
maxExpectedTransferResponseMessageSize,
1,
metric.DataSize16MBBuckets,
),
QueryCancelReceivedPGWire: metric.NewCounter(metaQueryCancelReceivedPGWire),
QueryCancelReceivedHTTP: metric.NewCounter(metaQueryCancelReceivedHTTP),
Expand Down
12 changes: 3 additions & 9 deletions pkg/ccl/streamingccl/streamingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
)

const (
streamingFlushHistMaxLatency = 1 * time.Minute
streamingAdmitLatencyMaxValue = 3 * time.Minute
streamingCommitLatencyMaxValue = 10 * time.Minute
)

var (
metaStreamingEventsIngested = metric.Metadata{
Name: "streaming.events_ingested",
Expand Down Expand Up @@ -135,11 +129,11 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
ResolvedEvents: metric.NewCounter(metaStreamingResolvedEventsIngested),
JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates),
FlushHistNanos: metric.NewHistogram(metaStreamingFlushHistNanos,
histogramWindow, streamingFlushHistMaxLatency.Nanoseconds(), 1),
histogramWindow, metric.BatchProcessLatencyBuckets),
CommitLatency: metric.NewHistogram(metaStreamingCommitLatency,
histogramWindow, streamingCommitLatencyMaxValue.Nanoseconds(), 1),
histogramWindow, metric.BatchProcessLatencyBuckets),
AdmitLatency: metric.NewHistogram(metaStreamingAdmitLatency,
histogramWindow, streamingAdmitLatencyMaxValue.Nanoseconds(), 1),
histogramWindow, metric.BatchProcessLatencyBuckets),
RunningCount: metric.NewGauge(metaStreamsRunning),
EarliestDataCheckpointSpan: metric.NewGauge(metaEarliestDataCheckpointSpan),
LatestDataCheckpointSpan: metric.NewGauge(metaLatestDataCheckpointSpan),
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ go_library(
"mixed_version_schemachange.go",
"multitenant.go",
"multitenant_fairness.go",
"multitenant_tpch.go",
"multitenant_upgrade.go",
"multitenant_utils.go",
"network.go",
Expand Down
8 changes: 4 additions & 4 deletions pkg/cmd/roachtest/tests/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ func registerCancel(r registry.Registry) {

m := c.NewMonitor(ctx, c.All())
m.Go(func(ctx context.Context) error {
conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()

t.Status("restoring TPCH dataset for Scale Factor 1")
if err := loadTPCHDataset(
ctx, t, c, 1 /* sf */, c.NewMonitor(ctx), c.All(), false, /* disableMergeQueue */
ctx, t, c, conn, 1 /* sf */, c.NewMonitor(ctx), c.All(), false, /* disableMergeQueue */
); err != nil {
t.Fatal(err)
}

conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()

queryPrefix := "USE tpch; "
if !useDistsql {
queryPrefix += "SET distsql = off; "
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/disk_stall.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// RegisterDiskStalledDetection registers the disk stall test.
func RegisterDiskStalledDetection(r registry.Registry) {
// registerDiskStalledDetection registers the disk stall test.
func registerDiskStalledDetection(r registry.Registry) {
for _, affectsLogDir := range []bool{false, true} {
for _, affectsDataDir := range []bool{false, true} {
// Grab copies of the args because we'll pass them into a closure.
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/jepsen.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,9 @@ cd /mnt/data1/jepsen/cockroachdb && set -eo pipefail && \
}
}

// RegisterJepsen registers the Jepsen test suite, which primarily checks for
// registerJepsen registers the Jepsen test suite, which primarily checks for
// transaction anomalies.
func RegisterJepsen(r registry.Registry) {
func registerJepsen(r registry.Registry) {
// NB: the "comments" test is not included because it requires
// linearizability.
tests := []string{
Expand Down
102 changes: 102 additions & 0 deletions pkg/cmd/roachtest/tests/multitenant_tpch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
gosql "database/sql"
"fmt"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/workload/tpch"
)

// runMultiTenantTPCH runs TPCH queries on a cluster that is first used as a
// single-tenant deployment followed by a run of all queries in a multi-tenant
// deployment with a single SQL instance.
func runMultiTenantTPCH(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Put(ctx, t.Cockroach(), "./cockroach", c.All())
c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(1))
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(install.SecureOption(true)), c.All())

setupNames := []string{"single-tenant", "multi-tenant"}
const numRunsPerQuery = 3
perfHelper := newTpchVecPerfHelper(setupNames)

// runTPCH runs all TPCH queries on a single setup. It first restores the
// TPCH dataset using the provided connection and then runs each TPCH query
// one at a time (using the given url as a parameter to the 'workload run'
// command). The runtimes are accumulated in the perf helper.
runTPCH := func(conn *gosql.DB, url string, setupIdx int) {
t.Status("restoring TPCH dataset for Scale Factor 1 in %s", setupNames[setupIdx])
if err := loadTPCHDataset(
ctx, t, c, conn, 1 /* sf */, c.NewMonitor(ctx), c.All(), false, /* disableMergeQueue */
); err != nil {
t.Fatal(err)
}
if _, err := conn.Exec("USE tpch;"); err != nil {
t.Fatal(err)
}
createStatsFromTables(t, conn, tpchTables)
for queryNum := 1; queryNum <= tpch.NumQueries; queryNum++ {
cmd := fmt.Sprintf("./workload run tpch %s --secure "+
"--concurrency=1 --db=tpch --max-ops=%d --queries=%d",
url, numRunsPerQuery, queryNum)
result, err := c.RunWithDetailsSingleNode(ctx, t.L(), c.Node(1), cmd)
workloadOutput := result.Stdout + result.Stderr
t.L().Printf(workloadOutput)
if err != nil {
t.Fatal(err)
}
perfHelper.parseQueryOutput(t, []byte(workloadOutput), setupIdx)
}
}

// First, use the cluster as a single tenant deployment. It is important to
// not create the tenant yet so that the certs directory is not overwritten.
singleTenantConn := c.Conn(ctx, t.L(), 1)
runTPCH(singleTenantConn, "" /* url */, 0 /* setupIdx */)

// Now we create a tenant and run all TPCH queries within it.
const (
tenantID = 123
tenantHTTPPort = 8081
tenantSQLPort = 30258
tenantNode = 1
)
_, err := singleTenantConn.Exec(`SELECT crdb_internal.create_tenant($1)`, tenantID)
if err != nil {
t.Fatal(err)
}
tenant := createTenantNode(ctx, t, c, c.All(), tenantID, tenantNode, tenantHTTPPort, tenantSQLPort)
tenant.start(ctx, t, c, "./cockroach")
multiTenantConn, err := gosql.Open("postgres", tenant.pgURL)
if err != nil {
t.Fatal(err)
}
runTPCH(multiTenantConn, "'"+tenant.secureURL()+"'", 1 /* setupIdx */)

// Analyze the runtimes of both setups.
perfHelper.compareSetups(t, numRunsPerQuery, nil /* timesCallback */)
}

func registerMultiTenantTPCH(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "multitenant/tpch",
Owner: registry.OwnerSQLQueries,
Cluster: r.MakeClusterSpec(1 /* nodeCount */),
Run: runMultiTenantTPCH,
})
}
Loading

0 comments on commit 5aebc22

Please sign in to comment.