diff --git a/internal/component/database_observability/mysql/collector/schema_table.go b/internal/component/database_observability/mysql/collector/schema_table.go index 220f83c847..be4cf15e6e 100644 --- a/internal/component/database_observability/mysql/collector/schema_table.go +++ b/internal/component/database_observability/mysql/collector/schema_table.go @@ -64,6 +64,35 @@ const ( WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION ASC` + + selectIndexNames = ` + SELECT + index_name, + seq_in_index, + column_name, + nullable, + non_unique, + index_type + FROM + information_schema.statistics + WHERE + table_schema = ? and table_name = ? + ORDER BY table_name, index_name, seq_in_index` + + // Ignore 'PRIMARY' constraints, as they're already covered by the query above + selectForeignKeys = ` + SELECT + constraint_name, + column_name, + referenced_table_name, + referenced_column_name + FROM + information_schema.key_column_usage + WHERE + constraint_name <> 'PRIMARY' + AND referenced_table_schema is not null + AND table_schema = ? and table_name = ? + ORDER BY table_name, constraint_name, ordinal_position` ) type SchemaTableArguments struct { @@ -108,7 +137,9 @@ type tableInfo struct { } type tableSpec struct { - Columns []columnSpec `json:"columns"` + Columns []columnSpec `json:"columns"` + Indexes []indexSpec `json:"indexes,omitempty"` + ForeignKeys []foreignKey `json:"foreign_keys,omitempty"` } type columnSpec struct { Name string `json:"name"` @@ -119,6 +150,21 @@ type columnSpec struct { DefaultValue string `json:"default_value,omitempty"` } +type indexSpec struct { + Name string `json:"name"` + Type string `json:"type"` + Columns []string `json:"columns"` + Unique bool `json:"unique"` + Nullable bool `json:"nullable"` +} + +type foreignKey struct { + Name string `json:"name"` + ColumnName string `json:"column_name"` + ReferencedTableName string `json:"referenced_table_name"` + ReferencedColumnName string `json:"referenced_column_name"` +} + func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) { c := &SchemaTable{ dbConnection: args.DB, @@ -357,40 +403,24 @@ func (c *SchemaTable) fetchTableDefinitions(ctx context.Context, fullyQualifiedT } func (c *SchemaTable) fetchColumnsDefinitions(ctx context.Context, schemaName string, tableName string) (*tableSpec, error) { - rs, err := c.dbConnection.QueryContext(ctx, selectColumnNames, schemaName, tableName) + colRS, err := c.dbConnection.QueryContext(ctx, selectColumnNames, schemaName, tableName) if err != nil { level.Error(c.logger).Log("msg", "failed to query table columns", "schema", schemaName, "table", tableName, "err", err) return nil, err } - defer rs.Close() + defer colRS.Close() tblSpec := &tableSpec{Columns: []columnSpec{}} - for rs.Next() { + for colRS.Next() { var columnName, isNullable, columnType, columnKey, extra string var columnDefault sql.NullString - if err := rs.Scan(&columnName, &columnDefault, &isNullable, &columnType, &columnKey, &extra); err != nil { + if err := colRS.Scan(&columnName, &columnDefault, &isNullable, &columnType, &columnKey, &extra); err != nil { level.Error(c.logger).Log("msg", "failed to scan table columns", "schema", schemaName, "table", tableName, "err", err) return nil, err } - extra = strings.ToUpper(extra) - - notNull := false - if isNullable == "NO" { - notNull = true - } - - autoIncrement := false - if strings.Contains(extra, "AUTO_INCREMENT") { - autoIncrement = true - } - - primaryKey := false - if columnKey == "PRI" { - primaryKey = true - } - + extra = strings.ToUpper(extra) // "extra" might contain a variety of textual information defaultValue := "" if columnDefault.Valid { defaultValue = columnDefault.String @@ -402,18 +432,85 @@ func (c *SchemaTable) fetchColumnsDefinitions(ctx context.Context, schemaName st colSpec := columnSpec{ Name: columnName, Type: columnType, - NotNull: notNull, - AutoIncrement: autoIncrement, - PrimaryKey: primaryKey, + NotNull: isNullable == "NO", // "YES" if NULL values can be stored in the column, "NO" if not. + AutoIncrement: strings.Contains(extra, "AUTO_INCREMENT"), + PrimaryKey: columnKey == "PRI", // "column_key" is "PRI" if this column a (or part of) PRIMARY KEY DefaultValue: defaultValue, } tblSpec.Columns = append(tblSpec.Columns, colSpec) } - if err := rs.Err(); err != nil { + if err := colRS.Err(); err != nil { level.Error(c.logger).Log("msg", "error during iterating over table columns result set", "schema", schemaName, "table", tableName, "err", err) return nil, err } + idxRS, err := c.dbConnection.QueryContext(ctx, selectIndexNames, schemaName, tableName) + if err != nil { + level.Error(c.logger).Log("msg", "failed to query table indexes", "schema", schemaName, "table", tableName, "err", err) + return nil, err + } + defer idxRS.Close() + + for idxRS.Next() { + var indexName, columnName, indexType string + var seqInIndex, nonUnique int + var nullable sql.NullString + if err := idxRS.Scan(&indexName, &seqInIndex, &columnName, &nullable, &nonUnique, &indexType); err != nil { + level.Error(c.logger).Log("msg", "failed to scan table indexes", "schema", schemaName, "table", tableName, "err", err) + return nil, err + } + + // Append column to the last index if it's the same as the previous one (i.e. multi-column index) + if nIndexes := len(tblSpec.Indexes); nIndexes > 0 && tblSpec.Indexes[nIndexes-1].Name == indexName { + lastIndex := &tblSpec.Indexes[nIndexes-1] + if len(lastIndex.Columns) != seqInIndex-1 { + level.Error(c.logger).Log("msg", "unexpected index column sequence", "schema", schemaName, "table", tableName, "index", indexName, "column", columnName) + continue + } + lastIndex.Columns = append(lastIndex.Columns, columnName) + } else { + tblSpec.Indexes = append(tblSpec.Indexes, indexSpec{ + Name: indexName, + Type: indexType, + Columns: []string{columnName}, + Unique: nonUnique == 0, // 0 if the index cannot contain duplicates, 1 if it can + Nullable: nullable.Valid && nullable.String == "YES", // "YES" if the column may contain NULL values + }) + } + } + + if err := idxRS.Err(); err != nil { + level.Error(c.logger).Log("msg", "error during iterating over table indexes result set", "schema", schemaName, "table", tableName, "err", err) + return nil, err + } + + fkRS, err := c.dbConnection.QueryContext(ctx, selectForeignKeys, schemaName, tableName) + if err != nil { + level.Error(c.logger).Log("msg", "failed to query table foreign keys", "schema", schemaName, "table", tableName, "err", err) + return nil, err + } + defer fkRS.Close() + + for fkRS.Next() { + var constraintName, columnName, referencedTableName, referencedColumnName string + if err := fkRS.Scan(&constraintName, &columnName, &referencedTableName, &referencedColumnName); err != nil { + level.Error(c.logger).Log("msg", "failed to scan foreign keys", "schema", schemaName, "table", tableName, "err", err) + return nil, err + } + + tblSpec.ForeignKeys = append(tblSpec.ForeignKeys, foreignKey{ + Name: constraintName, + ColumnName: columnName, + ReferencedTableName: referencedTableName, + ReferencedColumnName: referencedColumnName, + }) + } + + if err := fkRS.Err(); err != nil { + level.Error(c.logger).Log("msg", "error during iterating over foreign keys result set", "schema", schemaName, "table", tableName, "err", err) + return nil, err + } + return tblSpec, nil } diff --git a/internal/component/database_observability/mysql/collector/schema_table_test.go b/internal/component/database_observability/mysql/collector/schema_table_test.go index cc952328bc..c7cdc17205 100644 --- a/internal/component/database_observability/mysql/collector/schema_table_test.go +++ b/internal/component/database_observability/mysql/collector/schema_table_test.go @@ -34,7 +34,7 @@ func TestSchemaTable(t *testing.T) { collector, err := NewSchemaTable(SchemaTableArguments{ DB: db, InstanceKey: "mysql-db", - CollectInterval: time.Second, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, CacheEnabled: false, Logger: log.NewLogfmtLogger(os.Stderr), @@ -73,7 +73,7 @@ func TestSchemaTable(t *testing.T) { "create_statement", }).AddRow( "some_schema.some_table", - "CREATE TABLE some_table (id INT)", + "CREATE TABLE some_table (id INT, category INT)", ), ) @@ -93,6 +93,47 @@ func TestSchemaTable(t *testing.T) { "int", "PRI", "auto_increment", + ).AddRow( + "category", + "null", + "NO", + "int", + "", + "", + ), + ) + + mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "index_name", + "seq_in_index", + "column_name", + "nullable", + "non_unique", + "index_type", + }).AddRow( + "PRIMARY", + 1, + "id", + "", + 0, + "BTREE", + ), + ) + + mock.ExpectQuery(selectForeignKeys).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "constraint_name", + "column_name", + "referenced_table_name", + "referenced_column_name", + }).AddRow( + "fk_name", + "category", + "categories", + "id", ), ) @@ -113,13 +154,16 @@ func TestSchemaTable(t *testing.T) { err = mock.ExpectationsWereMet() require.NoError(t, err) + expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT, category INT)")) + expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"},{"name":"category","type":"int","not_null":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}],"foreign_keys":[{"name":"fk_name","column_name":"category","referenced_table_name":"categories","referenced_column_name":"id"}]}`)) + lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line) }) t.Run("detect table schema, cache enabled (write)", func(t *testing.T) { t.Parallel() @@ -131,11 +175,11 @@ func TestSchemaTable(t *testing.T) { lokiClient := loki_fake.NewClient(func() {}) // Enable caching. This will exercise the code path - // that writes to cache (but we explicitly assert it in this test) + // that writes to cache (but we don't explicitly assert it in this test) collector, err := NewSchemaTable(SchemaTableArguments{ DB: db, InstanceKey: "mysql-db", - CollectInterval: time.Second, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, CacheEnabled: true, Logger: log.NewLogfmtLogger(os.Stderr), @@ -175,7 +219,7 @@ func TestSchemaTable(t *testing.T) { "create_statement", }).AddRow( "some_schema.some_table", - "CREATE TABLE some_table (id INT)", + "CREATE TABLE some_table (id INT, category INT)", ), ) @@ -195,9 +239,49 @@ func TestSchemaTable(t *testing.T) { "int", "PRI", "auto_increment", + ).AddRow( + "category", + "null", + "NO", + "int", + "", + "", + ), + ) + + mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "index_name", + "seq_in_index", + "column_name", + "nullable", + "non_unique", + "index_type", + }).AddRow( + "PRIMARY", + 1, + "id", + "", + 0, + "BTREE", ), ) + mock.ExpectQuery(selectForeignKeys).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "constraint_name", + "column_name", + "referenced_table_name", + "referenced_column_name", + }).AddRow( + "fk_name", + "category", + "categories", + "id", + ), + ) err = collector.Start(context.Background()) require.NoError(t, err) @@ -217,13 +301,16 @@ func TestSchemaTable(t *testing.T) { require.Equal(t, 1, collector.cache.Len()) + expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT, category INT)")) + expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"},{"name":"category","type":"int","not_null":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}],"foreign_keys":[{"name":"fk_name","column_name":"category","referenced_table_name":"categories","referenced_column_name":"id"}]}`)) + lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line) }) t.Run("detect table schema, cache enabled (write and read)", func(t *testing.T) { t.Parallel() @@ -238,7 +325,7 @@ func TestSchemaTable(t *testing.T) { collector, err := NewSchemaTable(SchemaTableArguments{ DB: db, InstanceKey: "mysql-db", - CollectInterval: time.Second, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, CacheEnabled: true, Logger: log.NewLogfmtLogger(os.Stderr), @@ -301,6 +388,35 @@ func TestSchemaTable(t *testing.T) { ), ) + mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "index_name", + "seq_in_index", + "column_name", + "nullable", + "non_unique", + "index_type", + }).AddRow( + "PRIMARY", + 1, + "id", + "", + 0, + "BTREE", + ), + ) + + mock.ExpectQuery(selectForeignKeys).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "constraint_name", + "column_name", + "referenced_table_name", + "referenced_column_name", + }), + ) + // second loop, table info will be read from cache // and no further queries will be executed mock.ExpectQuery(selectSchemaName).WithoutArgs().RowsWillBeClosed(). @@ -346,19 +462,22 @@ func TestSchemaTable(t *testing.T) { err = mock.ExpectationsWereMet() require.NoError(t, err) + expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")) + expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}]}`)) + lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[3].Labels) require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[3].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[4].Labels) require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[4].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[5].Labels) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[5].Line) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[5].Line) }) t.Run("detect view schema", func(t *testing.T) { t.Parallel() @@ -372,7 +491,7 @@ func TestSchemaTable(t *testing.T) { collector, err := NewSchemaTable(SchemaTableArguments{ DB: db, InstanceKey: "mysql-db", - CollectInterval: time.Second, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, CacheEnabled: false, Logger: log.NewLogfmtLogger(os.Stderr), @@ -439,6 +558,34 @@ func TestSchemaTable(t *testing.T) { ), ) + mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "index_name", + "seq_in_index", + "column_name", + "nullable", + "non_unique", + "index_type", + }).AddRow( + "PRIMARY", + 1, + "id", + "", + 0, + "BTREE", + ), + ) + + mock.ExpectQuery(selectForeignKeys).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "constraint_name", + "column_name", + "referenced_table_name", + "referenced_column_name", + }), + ) err = collector.Start(context.Background()) require.NoError(t, err) @@ -456,13 +603,16 @@ func TestSchemaTable(t *testing.T) { err = mock.ExpectationsWereMet() require.NoError(t, err) + expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE VIEW some_view (id INT)")) + expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}]}`)) + lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE VIEW some_view (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line) }) t.Run("schemas result set iteration error", func(t *testing.T) { t.Parallel() @@ -476,7 +626,7 @@ func TestSchemaTable(t *testing.T) { collector, err := NewSchemaTable(SchemaTableArguments{ DB: db, InstanceKey: "mysql-db", - CollectInterval: time.Second, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, CacheEnabled: false, Logger: log.NewLogfmtLogger(os.Stderr), @@ -526,7 +676,7 @@ func TestSchemaTable(t *testing.T) { collector, err := NewSchemaTable(SchemaTableArguments{ DB: db, InstanceKey: "mysql-db", - CollectInterval: time.Second, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, CacheEnabled: false, Logger: log.NewLogfmtLogger(os.Stderr), @@ -595,7 +745,7 @@ func TestSchemaTable(t *testing.T) { collector, err := NewSchemaTable(SchemaTableArguments{ DB: db, InstanceKey: "mysql-db", - CollectInterval: time.Second, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, CacheEnabled: false, Logger: log.NewLogfmtLogger(os.Stderr), @@ -654,6 +804,35 @@ func TestSchemaTable(t *testing.T) { ), ) + mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "index_name", + "seq_in_index", + "column_name", + "nullable", + "non_unique", + "index_type", + }).AddRow( + "PRIMARY", + 1, + "id", + "", + 0, + "BTREE", + ), + ) + + mock.ExpectQuery(selectForeignKeys).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "constraint_name", + "column_name", + "referenced_table_name", + "referenced_column_name", + }), + ) + err = collector.Start(context.Background()) require.NoError(t, err) @@ -668,12 +847,15 @@ func TestSchemaTable(t *testing.T) { return collector.Stopped() }, 5*time.Second, 100*time.Millisecond) + expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")) + expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}]}`)) + lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line) }) }