Skip to content

Commit

Permalink
database_observability: add indexes to schema table collector
Browse files Browse the repository at this point in the history
Extemd the tableSpec with indexes info.
  • Loading branch information
cristiangreco committed Feb 14, 2025
1 parent 90c1721 commit 036fe0c
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) {
Namespace: "database_observability",
Name: "connection_info",
Help: "Information about the connection",
}, []string{"provider_name", "region", "db_instance_identifier"})
}, []string{"provider_name", "provider_region", "db_instance_identifier"})

args.Registry.MustRegister(infoMetric)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestConnectionInfo(t *testing.T) {
const baseExpectedMetrics = `
# HELP database_observability_connection_info Information about the connection
# TYPE database_observability_connection_info gauge
database_observability_connection_info{db_instance_identifier="%s",provider_name="%s",region="%s"} 1
database_observability_connection_info{db_instance_identifier="%s",provider_name="%s",provider_region="%s"} 1
`

testCases := []struct {
Expand All @@ -28,12 +28,12 @@ func TestConnectionInfo(t *testing.T) {
}{
{
name: "generic dsn",
dsn: "user:pass@tcp(localhost:3306)/db",
dsn: "user:pass@tcp(localhost:3306)/dbname",
expectedMetrics: fmt.Sprintf(baseExpectedMetrics, "unknown", "unknown", "unknown"),
},
{
name: "AWS/RDS dsn",
dsn: "user:pass@tcp(products-db.abc123xyz.us-east-1.rds.amazonaws.com:3306)/db",
dsn: "user:pass@tcp(products-db.abc123xyz.us-east-1.rds.amazonaws.com:3306)/dbname",
expectedMetrics: fmt.Sprintf(baseExpectedMetrics, "products-db", "aws", "us-east-1"),
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) {
instanceKey: args.InstanceKey,
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
logger: log.With(args.Logger, "collector", "QuerySample"),
logger: log.With(args.Logger, "collector", QuerySampleName),
running: &atomic.Bool{},
}, nil
}
Expand All @@ -72,7 +72,7 @@ func (c *QuerySample) Name() string {
}

func (c *QuerySample) Start(ctx context.Context) error {
level.Debug(c.logger).Log("msg", "QuerySample collector started")
level.Debug(c.logger).Log("msg", QuerySampleName+" collector started")

c.running.Store(true)
ctx, cancel := context.WithCancel(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ const (
WHERE
TABLE_SCHEMA = ? AND TABLE_NAME = ?
ORDER BY ORDINAL_POSITION ASC`

selectIndexNames = `
SELECT
index_name,
seq_in_index,
column_name,
nullable,
non_unique,
index_type
FROM
information_schema.statistics
WHERE
table_schema = ? and table_name = ?
ORDER BY table_name, index_name, seq_in_index`
)

type SchemaTableArguments struct {
Expand Down Expand Up @@ -109,6 +123,7 @@ type tableInfo struct {

type tableSpec struct {
Columns []columnSpec `json:"columns"`
Indexes []indexSpec `json:"indexes,omitempty"`
}
type columnSpec struct {
Name string `json:"name"`
Expand All @@ -119,13 +134,21 @@ type columnSpec struct {
DefaultValue string `json:"default_value,omitempty"`
}

type indexSpec struct {
Name string `json:"name"`
Type string `json:"type"`
Columns []string `json:"columns"`
Unique bool `json:"unique"`
Nullable bool `json:"nullable"`
}

func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) {
c := &SchemaTable{
dbConnection: args.DB,
instanceKey: args.InstanceKey,
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
logger: log.With(args.Logger, "collector", "SchemaTable"),
logger: log.With(args.Logger, "collector", SchemaTableName),
running: &atomic.Bool{},
}

Expand All @@ -141,7 +164,7 @@ func (c *SchemaTable) Name() string {
}

func (c *SchemaTable) Start(ctx context.Context) error {
level.Debug(c.logger).Log("msg", "SchemaTable collector started")
level.Debug(c.logger).Log("msg", SchemaTableName+" collector started")

c.running.Store(true)
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -415,5 +438,43 @@ func (c *SchemaTable) fetchColumnsDefinitions(ctx context.Context, schemaName st
return nil, err
}

rs, err = c.dbConnection.QueryContext(ctx, selectIndexNames, schemaName, tableName)
if err != nil {
level.Error(c.logger).Log("msg", "failed to query table indexes", "schema", schemaName, "table", tableName, "err", err)
return nil, err
}
defer rs.Close()

for rs.Next() {
var indexName, columnName, indexType string
var seqInIndex, nonUnique int
var nullable sql.NullString
if err := rs.Scan(&indexName, &seqInIndex, &columnName, &nullable, &nonUnique, &indexType); err != nil {
level.Error(c.logger).Log("msg", "failed to scan table indexes", "schema", schemaName, "table", tableName, "err", err)
return nil, err
}

if len(tblSpec.Indexes) > 0 && tblSpec.Indexes[len(tblSpec.Indexes)-1].Name == indexName {
lastIndex := &tblSpec.Indexes[len(tblSpec.Indexes)-1]
if len(lastIndex.Columns) != seqInIndex-1 {
panic(seqInIndex)
}
lastIndex.Columns = append(lastIndex.Columns, columnName)
} else {
tblSpec.Indexes = append(tblSpec.Indexes, indexSpec{
Name: indexName,
Type: indexType,
Columns: []string{columnName},
Unique: nonUnique == 0,
Nullable: nullable.Valid && nullable.String == "YES",
})
}
}

if err := rs.Err(); err != nil {
level.Error(c.logger).Log("msg", "error during iterating over table indexes result set", "schema", schemaName, "table", tableName, "err", err)
return nil, err
}

return tblSpec, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,25 @@ func TestSchemaTable(t *testing.T) {
),
)

mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"index_name",
"seq_in_index",
"column_name",
"nullable",
"non_unique",
"index_type",
}).AddRow(
"PRIMARY",
1,
"id",
"",
0,
"BTREE",
),
)

err = collector.Start(context.Background())
require.NoError(t, err)

Expand All @@ -113,13 +132,16 @@ func TestSchemaTable(t *testing.T) {
err = mock.ExpectationsWereMet()
require.NoError(t, err)

expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)"))
expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}]}`))

lokiEntries := lokiClient.Received()
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels)
require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels)
require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line)
})
t.Run("detect table schema, cache enabled (write)", func(t *testing.T) {
t.Parallel()
Expand All @@ -131,7 +153,7 @@ func TestSchemaTable(t *testing.T) {
lokiClient := loki_fake.NewClient(func() {})

// Enable caching. This will exercise the code path
// that writes to cache (but we explicitly assert it in this test)
// that writes to cache (but we don't explicitly assert it in this test)
collector, err := NewSchemaTable(SchemaTableArguments{
DB: db,
InstanceKey: "mysql-db",
Expand Down Expand Up @@ -198,6 +220,25 @@ func TestSchemaTable(t *testing.T) {
),
)

mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"index_name",
"seq_in_index",
"column_name",
"nullable",
"non_unique",
"index_type",
}).AddRow(
"PRIMARY",
1,
"id",
"",
0,
"BTREE",
),
)

err = collector.Start(context.Background())
require.NoError(t, err)

Expand All @@ -217,13 +258,16 @@ func TestSchemaTable(t *testing.T) {

require.Equal(t, 1, collector.cache.Len())

expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)"))
expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}]}`))

lokiEntries := lokiClient.Received()
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels)
require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels)
require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line)
})
t.Run("detect table schema, cache enabled (write and read)", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -301,6 +345,25 @@ func TestSchemaTable(t *testing.T) {
),
)

mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"index_name",
"seq_in_index",
"column_name",
"nullable",
"non_unique",
"index_type",
}).AddRow(
"PRIMARY",
1,
"id",
"",
0,
"BTREE",
),
)

// second loop, table info will be read from cache
// and no further queries will be executed
mock.ExpectQuery(selectSchemaName).WithoutArgs().RowsWillBeClosed().
Expand Down Expand Up @@ -346,19 +409,22 @@ func TestSchemaTable(t *testing.T) {
err = mock.ExpectationsWereMet()
require.NoError(t, err)

expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)"))
expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}]}`))

lokiEntries := lokiClient.Received()
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels)
require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels)
require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line)
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[3].Labels)
require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[3].Line)
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[4].Labels)
require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[4].Line)
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[5].Labels)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[5].Line)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[5].Line)
})
t.Run("detect view schema", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -439,6 +505,25 @@ func TestSchemaTable(t *testing.T) {
),
)

mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"index_name",
"seq_in_index",
"column_name",
"nullable",
"non_unique",
"index_type",
}).AddRow(
"PRIMARY",
1,
"id",
"",
0,
"BTREE",
),
)

err = collector.Start(context.Background())
require.NoError(t, err)

Expand All @@ -456,13 +541,16 @@ func TestSchemaTable(t *testing.T) {
err = mock.ExpectationsWereMet()
require.NoError(t, err)

expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE VIEW some_view (id INT)"))
expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}]}`))

lokiEntries := lokiClient.Received()
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels)
require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels)
require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE VIEW some_view (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line)
})
t.Run("schemas result set iteration error", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -654,6 +742,25 @@ func TestSchemaTable(t *testing.T) {
),
)

mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"index_name",
"seq_in_index",
"column_name",
"nullable",
"non_unique",
"index_type",
}).AddRow(
"PRIMARY",
1,
"id",
"",
0,
"BTREE",
),
)

err = collector.Start(context.Background())
require.NoError(t, err)

Expand All @@ -668,12 +775,15 @@ func TestSchemaTable(t *testing.T) {
return collector.Stopped()
}, 5*time.Second, 100*time.Millisecond)

expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)"))
expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}]}`))

lokiEntries := lokiClient.Received()
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels)
require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels)
require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line)
})
}

0 comments on commit 036fe0c

Please sign in to comment.