Skip to content

Commit

Permalink
Merge pull request #364 from sylwiaszunejko/invalidate-tablets-data
Browse files Browse the repository at this point in the history
Invalidate tablets of updated keyspace/table on control connection reconnect
  • Loading branch information
dkropachev authored Dec 2, 2024
2 parents 09e5326 + a8083ac commit 615c6d9
Showing 1 changed file with 170 additions and 2 deletions.
172 changes: 170 additions & 2 deletions metadata_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package gocql

import (
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -63,6 +64,38 @@ type TableMetadataOptions struct {
Version string
}

func (t *TableMetadataOptions) Equals(other *TableMetadataOptions) bool {
if t == nil || other == nil {
return t == other // Both must be nil to be equal
}

if t.BloomFilterFpChance != other.BloomFilterFpChance ||
t.Comment != other.Comment ||
t.CrcCheckChance != other.CrcCheckChance ||
t.DcLocalReadRepairChance != other.DcLocalReadRepairChance ||
t.DefaultTimeToLive != other.DefaultTimeToLive ||
t.GcGraceSeconds != other.GcGraceSeconds ||
t.MaxIndexInterval != other.MaxIndexInterval ||
t.MemtableFlushPeriodInMs != other.MemtableFlushPeriodInMs ||
t.MinIndexInterval != other.MinIndexInterval ||
t.ReadRepairChance != other.ReadRepairChance ||
t.SpeculativeRetry != other.SpeculativeRetry ||
t.InMemory != other.InMemory ||
t.Partitioner != other.Partitioner ||
t.Version != other.Version {
return false
}

if !compareStringMaps(t.Caching, other.Caching) ||
!compareStringMaps(t.Compaction, other.Compaction) ||
!compareStringMaps(t.Compression, other.Compression) ||
!compareStringMaps(t.CDC, other.CDC) {
return false
}

return true
}

type ViewMetadata struct {
KeyspaceName string
ViewName string
Expand Down Expand Up @@ -92,6 +125,22 @@ type ColumnMetadata struct {
Index ColumnIndexMetadata
}

func (c *ColumnMetadata) Equals(other *ColumnMetadata) bool {
if c == nil || other == nil {
return c == other
}

return c.Keyspace == other.Keyspace &&
c.Table == other.Table &&
c.Name == other.Name &&
c.ComponentIndex == other.ComponentIndex &&
c.Kind == other.Kind &&
c.Type == other.Type &&
c.ClusteringOrder == other.ClusteringOrder &&
c.Order == other.Order &&
c.Index.Equals(&other.Index)
}

// FunctionMetadata holds metadata for function constructs
type FunctionMetadata struct {
Keyspace string
Expand Down Expand Up @@ -135,6 +184,96 @@ type IndexMetadata struct {
Options map[string]string
}

func (t *TableMetadata) Equals(other *TableMetadata) bool {
if t == nil || other == nil {
return t == other
}

if t.Keyspace != other.Keyspace || t.Name != other.Name {
return false
}

if len(t.PartitionKey) != len(other.PartitionKey) || !compareColumnSlices(t.PartitionKey, other.PartitionKey) {
return false
}

if len(t.ClusteringColumns) != len(other.ClusteringColumns) || !compareColumnSlices(t.ClusteringColumns, other.ClusteringColumns) {
return false
}

if len(t.Columns) != len(other.Columns) || !compareColumnsMap(t.Columns, other.Columns) {
return false
}

if len(t.OrderedColumns) != len(other.OrderedColumns) || !compareStringSlices(t.OrderedColumns, other.OrderedColumns) {
return false
}

if !t.Options.Equals(&other.Options) {
return false
}

if len(t.Flags) != len(other.Flags) || !compareStringSlices(t.Flags, other.Flags) {
return false
}

if len(t.Extensions) != len(other.Extensions) || !compareInterfaceMaps(t.Extensions, other.Extensions) {
return false
}

return true
}

func compareColumnSlices(a, b []*ColumnMetadata) bool {
for i := range a {
if !a[i].Equals(b[i]) {
return false
}
}
return true
}

func compareColumnsMap(a, b map[string]*ColumnMetadata) bool {
for k, v := range a {
otherValue, exists := b[k]
if !exists || !v.Equals(otherValue) {
return false
}
}
return true
}

func compareStringSlices(a, b []string) bool {
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

func compareStringMaps(a, b map[string]string) bool {
if len(a) != len(b) {
return false
}
for k, v := range a {
if otherValue, exists := b[k]; !exists || v != otherValue {
return false
}
}
return true
}

func compareInterfaceMaps(a, b map[string]interface{}) bool {
for k, v := range a {
otherValue, exists := b[k]
if !exists || !reflect.DeepEqual(v, otherValue) {
return false
}
}
return true
}

// cowTabletList implements a copy on write keyspace metadata map, its equivalent type is map[string]*KeyspaceMetadata
type cowKeyspaceMetadataMap struct {
keyspaceMap atomic.Value
Expand Down Expand Up @@ -212,6 +351,29 @@ type ColumnIndexMetadata struct {
Options map[string]interface{}
}

func (c *ColumnIndexMetadata) Equals(other *ColumnIndexMetadata) bool {
if c == nil || other == nil {
return c == other
}

if c.Name != other.Name || c.Type != other.Type {
return false
}

// Compare the Options map
if len(c.Options) != len(other.Options) {
return false
}
for k, v := range c.Options {
otherValue, exists := other.Options[k]
if !exists || !reflect.DeepEqual(v, otherValue) {
return false
}
}

return true
}

type ColumnKind int

const (
Expand Down Expand Up @@ -406,6 +568,7 @@ func (s *metadataDescriber) refreshAllSchema() error {
if err == ErrKeyspaceDoesNotExist {
s.clearSchema(keyspaceName)
s.removeTabletsWithKeyspace(keyspaceName)
continue
} else if err != nil {
return err
}
Expand All @@ -415,8 +578,13 @@ func (s *metadataDescriber) refreshAllSchema() error {
return err
}

for tableName := range metadata.Tables {
if _, ok := updatedMetadata.Tables[tableName]; !ok {
if !compareInterfaceMaps(metadata.StrategyOptions, updatedMetadata.StrategyOptions) {
s.removeTabletsWithKeyspace(keyspaceName)
continue
}

for tableName, tableMetadata := range metadata.Tables {
if updatedTableMetadata, ok := updatedMetadata.Tables[tableName]; !ok || tableMetadata.Equals(updatedTableMetadata) {
s.removeTabletsWithTable(keyspaceName, tableName)
}
}
Expand Down

0 comments on commit 615c6d9

Please sign in to comment.