Skip to content

Commit

Permalink
Add support for Cassandra 5.0 table options
Browse files Browse the repository at this point in the history
  • Loading branch information
OleksiienkoMykyta committed Aug 7, 2024
1 parent 34fdeeb commit 7b90e9f
Showing 1 changed file with 185 additions and 51 deletions.
236 changes: 185 additions & 51 deletions metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type ViewMetadata struct {
type MaterializedViewMetadata struct {
Keyspace string
Name string
AdditionalWritePolicy string
BaseTableId UUID
BaseTable *TableMetadata
BloomFilterFpChance float64
Expand All @@ -115,6 +116,7 @@ type MaterializedViewMetadata struct {
MaxIndexInterval int
MemtableFlushPeriodInMs int
MinIndexInterval int
ReadRepair string
ReadRepairChance float64
SpeculativeRetry string

Expand Down Expand Up @@ -975,69 +977,201 @@ func getViewsMetadata(session *Session, keyspaceName string) ([]ViewMetadata, er
return views, nil
}

func bytesMapToStringsMap(byteData map[string][]byte) map[string]string {
extensions := make(map[string]string, len(byteData))
for key, rowByte := range byteData {
extensions[key] = string(rowByte)
}

return extensions
}

func materializedViewMetadataFromMap(currentObject map[string]interface{}, materializedView *MaterializedViewMetadata) error {
const errorMessage = "gocql.materializedViewMetadataFromMap failed to read column %s"
var ok bool
for key, value := range currentObject {
switch key {
case "keyspace_name":
materializedView.Keyspace, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "view_name":
materializedView.Name, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "additional_write_policy":
materializedView.AdditionalWritePolicy, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "base_table_id":
materializedView.BaseTableId, ok = value.(UUID)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "base_table_name":
materializedView.baseTableName, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "bloom_filter_fp_chance":
materializedView.BloomFilterFpChance, ok = value.(float64)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "caching":
materializedView.Caching, ok = value.(map[string]string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "comment":
materializedView.Comment, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "compaction":
materializedView.Compaction, ok = value.(map[string]string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "compression":
materializedView.Compression, ok = value.(map[string]string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "crc_check_chance":
materializedView.CrcCheckChance, ok = value.(float64)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "dclocal_read_repair_chance":
materializedView.DcLocalReadRepairChance, ok = value.(float64)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "default_time_to_live":
materializedView.DefaultTimeToLive, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "extensions":
byteData, ok := value.(map[string][]byte)
if !ok {
return fmt.Errorf(errorMessage, key)
}

materializedView.Extensions = bytesMapToStringsMap(byteData)

case "gc_grace_seconds":
materializedView.GcGraceSeconds, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "id":
materializedView.Id, ok = value.(UUID)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "include_all_columns":
materializedView.IncludeAllColumns, ok = value.(bool)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "max_index_interval":
materializedView.MaxIndexInterval, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "memtable_flush_period_in_ms":
materializedView.MemtableFlushPeriodInMs, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "min_index_interval":
materializedView.MinIndexInterval, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "read_repair":
materializedView.ReadRepair, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "read_repair_chance":
materializedView.ReadRepairChance, ok = value.(float64)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "speculative_retry":
materializedView.SpeculativeRetry, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

}
}
return nil
}

func parseSystemSchemaViews(iter *Iter) ([]MaterializedViewMetadata, error) {
var materializedViews []MaterializedViewMetadata
s, err := iter.SliceMap()
if err != nil {
return nil, err
}

for _, row := range s {
var materializedView MaterializedViewMetadata
err = materializedViewMetadataFromMap(row, &materializedView)
if err != nil {
return nil, err
}

materializedViews = append(materializedViews, materializedView)
}

return materializedViews, nil
}

func getMaterializedViewsMetadata(session *Session, keyspaceName string) ([]MaterializedViewMetadata, error) {
if !session.useSystemSchema {
return nil, nil
}
var tableName = "system_schema.views"
stmt := fmt.Sprintf(`
SELECT
view_name,
base_table_id,
base_table_name,
bloom_filter_fp_chance,
caching,
comment,
compaction,
compression,
crc_check_chance,
dclocal_read_repair_chance,
default_time_to_live,
extensions,
gc_grace_seconds,
id,
include_all_columns,
max_index_interval,
memtable_flush_period_in_ms,
min_index_interval,
read_repair_chance,
speculative_retry
SELECT *
FROM %s
WHERE keyspace_name = ?`, tableName)

var materializedViews []MaterializedViewMetadata

rows := session.control.query(stmt, keyspaceName).Scanner()
for rows.Next() {
materializedView := MaterializedViewMetadata{Keyspace: keyspaceName}
err := rows.Scan(&materializedView.Name,
&materializedView.BaseTableId,
&materializedView.baseTableName,
&materializedView.BloomFilterFpChance,
&materializedView.Caching,
&materializedView.Comment,
&materializedView.Compaction,
&materializedView.Compression,
&materializedView.CrcCheckChance,
&materializedView.DcLocalReadRepairChance,
&materializedView.DefaultTimeToLive,
&materializedView.Extensions,
&materializedView.GcGraceSeconds,
&materializedView.Id,
&materializedView.IncludeAllColumns,
&materializedView.MaxIndexInterval,
&materializedView.MemtableFlushPeriodInMs,
&materializedView.MinIndexInterval,
&materializedView.ReadRepairChance,
&materializedView.SpeculativeRetry,
)
if err != nil {
return nil, err
}
materializedViews = append(materializedViews, materializedView)
}
iter := session.control.query(stmt, keyspaceName)

if err := rows.Err(); err != nil {
materializedViews, err := parseSystemSchemaViews(iter)
if err != nil {
return nil, err
}

Expand Down

0 comments on commit 7b90e9f

Please sign in to comment.