From 2b7d19675715606b348cdf6863a47d2d1db50809 Mon Sep 17 00:00:00 2001 From: Bohdan Siryk Date: Mon, 28 Oct 2024 11:24:21 +0200 Subject: [PATCH] Integration test for Metadata_changed mechanism --- cassandra_test.go | 102 ++++++++++++++++++++++++++++++++++++++++++++-- conn.go | 9 ++-- 2 files changed, 103 insertions(+), 8 deletions(-) diff --git a/cassandra_test.go b/cassandra_test.go index 567946e7f..6b0c1a559 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -3397,8 +3397,6 @@ func TestLargeSizeQuery(t *testing.T) { t.Fatal(err) } - defer session.Close() - longString := strings.Repeat("a", 500_000) err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", longString).Exec() @@ -3430,8 +3428,6 @@ func TestQueryCompressionNotWorthIt(t *testing.T) { t.Fatal(err) } - defer session.Close() - str := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()_+" err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", str).Exec() if err != nil { @@ -3446,3 +3442,101 @@ func TestQueryCompressionNotWorthIt(t *testing.T) { require.Equal(t, str, result) } + +func TestPrepareExecuteMetadataChangedFlag(t *testing.T) { + // This test ensures that the whole Metadata_changed flow + // is handled properly. + // + // To trigger C* to return Metadata_changed we should do: + // 1. Create a table + // 2. Prepare stmt which uses the created table + // 3. Change the table schema in order to affect prepared stmt (e.g. add a column) + // 4. Execute prepared stmt. As a result C* should return RESULT/ROWS response with + // Metadata_changed flag, new metadata id and updated metadata resultset. + // + // The driver should handle this by updating its prepared statement inside the cache + // when it receives RESULT/ROWS with Metadata_changed flag + session := createSession(t) + defer session.Close() + + if session.cfg.ProtoVersion < protoVersion5 { + t.Skip("Metadata_changed mechanism is only available in proto > 4") + } + + if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.metadata_changed(id int, PRIMARY KEY (id))"); err != nil { + t.Fatal(err) + } + + err := session.Query("INSERT INTO gocql_test.metadata_changed (id) VALUES (?)", 1).Exec() + if err != nil { + t.Fatal(err) + } + + // We have to specify conn for all queries to ensure that + // all queries are running on the same node + conn := session.getConn() + + const selectStmt = "SELECT * FROM gocql_test.metadata_changed" + queryBeforeTableAltering := session.Query(selectStmt) + queryBeforeTableAltering.conn = conn + row := make(map[string]interface{}) + err = queryBeforeTableAltering.MapScan(row) + if err != nil { + t.Fatal(err) + } + + require.Len(t, row, 1, "Expected to retrieve a single column") + stmtCacheKey := session.stmtsLRU.keyFor(conn.host.HostID(), conn.currentKeyspace, queryBeforeTableAltering.stmt) + inflight, _ := session.stmtsLRU.get(stmtCacheKey) + preparedStatementBeforeTableAltering := inflight.preparedStatment + + // Changing table schema in order to cause C* to return RESULT/ROWS Metadata_changed + alteringTableQuery := session.Query("ALTER TABLE gocql_test.metadata_changed ADD new_col int") + alteringTableQuery.conn = conn + err = alteringTableQuery.Exec() + if err != nil { + t.Fatal(err) + } + + // Expecting C* will return RESULT/ROWS Metadata_changed + // and it will be properly handled + queryAfterTableAltering := session.Query(selectStmt) + queryAfterTableAltering.conn = conn + row = make(map[string]interface{}) + err = queryAfterTableAltering.MapScan(row) + if err != nil { + t.Fatal(err) + } + + // Ensuring if cache contains updated prepared statement + require.Len(t, row, 2, "Expected to retrieve both columns") + inflight, _ = session.stmtsLRU.get(stmtCacheKey) + preparedStatementAfterTableAltering := inflight.preparedStatment + require.NotEqual(t, preparedStatementBeforeTableAltering.resultMetadataID, preparedStatementAfterTableAltering.resultMetadataID) + require.NotEqual(t, preparedStatementBeforeTableAltering.response, preparedStatementAfterTableAltering.response) + + // Executing prepared stmt and expecting that C* won't return + // Metadata_changed because the table is not being changed. + // Running query with timeout to ensure there is no deadlocks. + // However, it doesn't 100% proves that there is a deadlock... + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30000) + defer cancel() + + queryAfterTableAltering2 := session.Query(selectStmt).WithContext(ctx) + queryAfterTableAltering2.conn = conn + row = make(map[string]interface{}) + err = queryAfterTableAltering2.MapScan(row) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + t.Fatal("It is likely failed due deadlock") + } + t.Fatal(err) + } + + // Ensuring metadata of prepared stmt is not changed + require.Len(t, row, 2, "Expected to retrieve both columns") + inflight, _ = session.stmtsLRU.get(stmtCacheKey) + preparedStatementAfterTableAltering2 := inflight.preparedStatment + require.Equal(t, preparedStatementAfterTableAltering.resultMetadataID, preparedStatementAfterTableAltering2.resultMetadataID) + require.Equal(t, preparedStatementAfterTableAltering.response, preparedStatementAfterTableAltering2.response) +} diff --git a/conn.go b/conn.go index 87f745a45..40bc6f65f 100644 --- a/conn.go +++ b/conn.go @@ -1613,8 +1613,6 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { return &Iter{framer: framer} case *resultRowsFrame: if x.meta.newMetadataID != nil { - // Updating the result metadata id in prepared stmt - // // If a RESULT/Rows message reports // changed resultset metadata with the Metadata_changed flag, the reported new // resultset metadata must be used in subsequent executions @@ -1631,8 +1629,11 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { }, } c.session.stmtsLRU.add(stmtCacheKey, newInflight) - // Closing done here because the stmtsLRU already contains a new inflight - // with updated metadata and result metadata id + // If the driver received newMetadataID field in the Result/Rows response + // it should prioritize metadata from the response + info = newInflight.preparedStatment + // It should close this done to avoid deadlocks of + // other subsequent requests close(newInflight.done) } }