Skip to content

Commit

Permalink
[receiver/postgresqlreceiver] [fix] Support postgres 17 table schema (#…
Browse files Browse the repository at this point in the history
…36813)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
Support collecting bgwriter metrics from postgres17. 

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes
#36784

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Updated integration tests, tested locally.

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
dehaansa authored Dec 13, 2024
1 parent 7e3d003 commit b6238cf
Show file tree
Hide file tree
Showing 6 changed files with 870 additions and 55 deletions.
27 changes: 27 additions & 0 deletions .chloggen/postgresql-17-bgwriter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: postgresqlreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Update the postgresqlreceiver to handle new table schema for the bgwriter metrics in pg17+"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36784]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
151 changes: 110 additions & 41 deletions receiver/postgresqlreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"net"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -56,6 +57,7 @@ type client interface {
getMaxConnections(ctx context.Context) (int64, error)
getIndexStats(ctx context.Context, database string) (map[indexIdentifer]indexStat, error)
listDatabases(ctx context.Context) ([]string, error)
getVersion(ctx context.Context) (string, error)
}

type postgreSQLClient struct {
Expand Down Expand Up @@ -442,7 +444,6 @@ type bgStat struct {
checkpointWriteTime float64
checkpointSyncTime float64
bgWrites int64
backendWrites int64
bufferBackendWrites int64
bufferFsyncWrites int64
bufferCheckpoints int64
Expand All @@ -451,54 +452,105 @@ type bgStat struct {
}

func (c *postgreSQLClient) getBGWriterStats(ctx context.Context) (*bgStat, error) {
query := `SELECT
checkpoints_req AS checkpoint_req,
checkpoints_timed AS checkpoint_scheduled,
checkpoint_write_time AS checkpoint_duration_write,
checkpoint_sync_time AS checkpoint_duration_sync,
buffers_clean AS bg_writes,
buffers_backend AS backend_writes,
buffers_backend_fsync AS buffers_written_fsync,
buffers_checkpoint AS buffers_checkpoints,
buffers_alloc AS buffers_allocated,
maxwritten_clean AS maxwritten_count
FROM pg_stat_bgwriter;`
version, err := c.getVersion(ctx)
if err != nil {
return nil, err
}

major, err := parseMajorVersion(version)
if err != nil {
return nil, err
}

row := c.client.QueryRowContext(ctx, query)
var (
checkpointsReq, checkpointsScheduled int64
checkpointSyncTime, checkpointWriteTime float64
bgWrites, bufferCheckpoints, bufferAllocated int64
bufferBackendWrites, bufferFsyncWrites, maxWritten int64
)
err := row.Scan(
&checkpointsReq,
&checkpointsScheduled,
&checkpointWriteTime,
&checkpointSyncTime,
&bgWrites,
&bufferBackendWrites,
&bufferFsyncWrites,
&bufferCheckpoints,
&bufferAllocated,
&maxWritten,
)
if err != nil {
return nil, err

if major < 17 {
query := `SELECT
checkpoints_req AS checkpoint_req,
checkpoints_timed AS checkpoint_scheduled,
checkpoint_write_time AS checkpoint_duration_write,
checkpoint_sync_time AS checkpoint_duration_sync,
buffers_clean AS bg_writes,
buffers_backend AS backend_writes,
buffers_backend_fsync AS buffers_written_fsync,
buffers_checkpoint AS buffers_checkpoints,
buffers_alloc AS buffers_allocated,
maxwritten_clean AS maxwritten_count
FROM pg_stat_bgwriter;`

row := c.client.QueryRowContext(ctx, query)

if err = row.Scan(
&checkpointsReq,
&checkpointsScheduled,
&checkpointWriteTime,
&checkpointSyncTime,
&bgWrites,
&bufferBackendWrites,
&bufferFsyncWrites,
&bufferCheckpoints,
&bufferAllocated,
&maxWritten,
); err != nil {
return nil, err
}
return &bgStat{
checkpointsReq: checkpointsReq,
checkpointsScheduled: checkpointsScheduled,
checkpointWriteTime: checkpointWriteTime,
checkpointSyncTime: checkpointSyncTime,
bgWrites: bgWrites,
bufferBackendWrites: bufferBackendWrites,
bufferFsyncWrites: bufferFsyncWrites,
bufferCheckpoints: bufferCheckpoints,
buffersAllocated: bufferAllocated,
maxWritten: maxWritten,
}, nil
} else {
query := `SELECT
cp.num_requested AS checkpoint_req,
cp.num_timed AS checkpoint_scheduled,
cp.write_time AS checkpoint_duration_write,
cp.sync_time AS checkpoint_duration_sync,
cp.buffers_written AS buffers_checkpoints,
bg.buffers_clean AS bg_writes,
bg.buffers_alloc AS buffers_allocated,
bg.maxwritten_clean AS maxwritten_count
FROM pg_stat_bgwriter bg, pg_stat_checkpointer cp;`

row := c.client.QueryRowContext(ctx, query)

if err = row.Scan(
&checkpointsReq,
&checkpointsScheduled,
&checkpointWriteTime,
&checkpointSyncTime,
&bufferCheckpoints,
&bgWrites,
&bufferAllocated,
&maxWritten,
); err != nil {
return nil, err
}

return &bgStat{
checkpointsReq: checkpointsReq,
checkpointsScheduled: checkpointsScheduled,
checkpointWriteTime: checkpointWriteTime,
checkpointSyncTime: checkpointSyncTime,
bgWrites: bgWrites,
bufferBackendWrites: -1, // Not found in pg17+ tables
bufferFsyncWrites: -1, // Not found in pg17+ tables
bufferCheckpoints: bufferCheckpoints,
buffersAllocated: bufferAllocated,
maxWritten: maxWritten,
}, nil
}
return &bgStat{
checkpointsReq: checkpointsReq,
checkpointsScheduled: checkpointsScheduled,
checkpointWriteTime: checkpointWriteTime,
checkpointSyncTime: checkpointSyncTime,
bgWrites: bgWrites,
backendWrites: bufferBackendWrites,
bufferBackendWrites: bufferBackendWrites,
bufferFsyncWrites: bufferFsyncWrites,
bufferCheckpoints: bufferCheckpoints,
buffersAllocated: bufferAllocated,
maxWritten: maxWritten,
}, nil
}

func (c *postgreSQLClient) getMaxConnections(ctx context.Context) (int64, error) {
Expand Down Expand Up @@ -641,6 +693,23 @@ func (c *postgreSQLClient) listDatabases(ctx context.Context) ([]string, error)
return databases, nil
}

func (c *postgreSQLClient) getVersion(ctx context.Context) (string, error) {
query := "SHOW server_version;"
row := c.client.QueryRowContext(ctx, query)
var version string
err := row.Scan(&version)
return version, err
}

func parseMajorVersion(ver string) (int, error) {
parts := strings.Split(ver, ".")
if len(parts) < 2 {
return 0, fmt.Errorf("unexpected version string: %s", ver)
}

return strconv.Atoi(parts[0])
}

func filterQueryByDatabases(baseQuery string, databases []string, groupBy bool) string {
if len(databases) > 0 {
var queryDatabases []string
Expand Down
30 changes: 19 additions & 11 deletions receiver/postgresqlreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package postgresqlreceiver

import (
"fmt"
"net"
"path/filepath"
"testing"
Expand All @@ -22,37 +23,44 @@ import (

const postgresqlPort = "5432"

const (
pre17TestVersion = "13.18"
post17TestVersion = "17.2"
)

func TestIntegration(t *testing.T) {
defer testutil.SetFeatureGateForTest(t, separateSchemaAttrGate, false)()
defer testutil.SetFeatureGateForTest(t, connectionPoolGate, false)()
t.Run("single_db", integrationTest("single_db", []string{"otel"}))
t.Run("multi_db", integrationTest("multi_db", []string{"otel", "otel2"}))
t.Run("all_db", integrationTest("all_db", []string{}))
t.Run("single_db", integrationTest("single_db", []string{"otel"}, pre17TestVersion))
t.Run("multi_db", integrationTest("multi_db", []string{"otel", "otel2"}, pre17TestVersion))
t.Run("all_db", integrationTest("all_db", []string{}, pre17TestVersion))

t.Run("single_db_post17", integrationTest("single_db_post17", []string{"otel"}, post17TestVersion))
}

func TestIntegrationWithSeparateSchemaAttr(t *testing.T) {
defer testutil.SetFeatureGateForTest(t, separateSchemaAttrGate, true)()
defer testutil.SetFeatureGateForTest(t, connectionPoolGate, false)()
t.Run("single_db_schemaattr", integrationTest("single_db_schemaattr", []string{"otel"}))
t.Run("multi_db_schemaattr", integrationTest("multi_db_schemaattr", []string{"otel", "otel2"}))
t.Run("all_db_schemaattr", integrationTest("all_db_schemaattr", []string{}))
t.Run("single_db_schemaattr", integrationTest("single_db_schemaattr", []string{"otel"}, pre17TestVersion))
t.Run("multi_db_schemaattr", integrationTest("multi_db_schemaattr", []string{"otel", "otel2"}, pre17TestVersion))
t.Run("all_db_schemaattr", integrationTest("all_db_schemaattr", []string{}, pre17TestVersion))
}

func TestIntegrationWithConnectionPool(t *testing.T) {
defer testutil.SetFeatureGateForTest(t, separateSchemaAttrGate, false)()
defer testutil.SetFeatureGateForTest(t, connectionPoolGate, true)()
t.Run("single_db_connpool", integrationTest("single_db_connpool", []string{"otel"}))
t.Run("multi_db_connpool", integrationTest("multi_db_connpool", []string{"otel", "otel2"}))
t.Run("all_db_connpool", integrationTest("all_db_connpool", []string{}))
t.Run("single_db_connpool", integrationTest("single_db_connpool", []string{"otel"}, pre17TestVersion))
t.Run("multi_db_connpool", integrationTest("multi_db_connpool", []string{"otel", "otel2"}, pre17TestVersion))
t.Run("all_db_connpool", integrationTest("all_db_connpool", []string{}, pre17TestVersion))
}

func integrationTest(name string, databases []string) func(*testing.T) {
func integrationTest(name string, databases []string, pgVersion string) func(*testing.T) {
expectedFile := filepath.Join("testdata", "integration", "expected_"+name+".yaml")
return scraperinttest.NewIntegrationTest(
NewFactory(),
scraperinttest.WithContainerRequest(
testcontainers.ContainerRequest{
Image: "postgres:13.18",
Image: fmt.Sprintf("postgres:%s", pgVersion),
Env: map[string]string{
"POSTGRES_USER": "root",
"POSTGRES_PASSWORD": "otel",
Expand Down
8 changes: 6 additions & 2 deletions receiver/postgresqlreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,13 @@ func (p *postgreSQLScraper) collectBGWriterStats(
p.mb.RecordPostgresqlBgwriterBuffersAllocatedDataPoint(now, bgStats.buffersAllocated)

p.mb.RecordPostgresqlBgwriterBuffersWritesDataPoint(now, bgStats.bgWrites, metadata.AttributeBgBufferSourceBgwriter)
p.mb.RecordPostgresqlBgwriterBuffersWritesDataPoint(now, bgStats.bufferBackendWrites, metadata.AttributeBgBufferSourceBackend)
if bgStats.bufferBackendWrites >= 0 {
p.mb.RecordPostgresqlBgwriterBuffersWritesDataPoint(now, bgStats.bufferBackendWrites, metadata.AttributeBgBufferSourceBackend)
}
p.mb.RecordPostgresqlBgwriterBuffersWritesDataPoint(now, bgStats.bufferCheckpoints, metadata.AttributeBgBufferSourceCheckpoints)
p.mb.RecordPostgresqlBgwriterBuffersWritesDataPoint(now, bgStats.bufferFsyncWrites, metadata.AttributeBgBufferSourceBackendFsync)
if bgStats.bufferFsyncWrites >= 0 {
p.mb.RecordPostgresqlBgwriterBuffersWritesDataPoint(now, bgStats.bufferFsyncWrites, metadata.AttributeBgBufferSourceBackendFsync)
}

p.mb.RecordPostgresqlBgwriterCheckpointCountDataPoint(now, bgStats.checkpointsReq, metadata.AttributeBgCheckpointTypeRequested)
p.mb.RecordPostgresqlBgwriterCheckpointCountDataPoint(now, bgStats.checkpointsScheduled, metadata.AttributeBgCheckpointTypeScheduled)
Expand Down
6 changes: 5 additions & 1 deletion receiver/postgresqlreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,11 @@ func (m *mockClient) listDatabases(_ context.Context) ([]string, error) {
return args.Get(0).([]string), args.Error(1)
}

func (m *mockClient) getVersion(_ context.Context) (string, error) {
args := m.Called()
return args.String(0), args.Error(1)
}

func (m *mockClientFactory) getClient(database string) (client, error) {
args := m.Called(database)
return args.Get(0).(client), args.Error(1)
Expand Down Expand Up @@ -511,7 +516,6 @@ func (m *mockClient) initMocks(database string, schema string, databases []strin
checkpointWriteTime: 3.12,
checkpointSyncTime: 4.23,
bgWrites: 5,
backendWrites: 6,
bufferBackendWrites: 7,
bufferFsyncWrites: 8,
bufferCheckpoints: 9,
Expand Down
Loading

0 comments on commit b6238cf

Please sign in to comment.