diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index b3051437..fd680b25 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -13,6 +13,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Server +> **Note** All caches for stores using the updatePolicy `set_sum` (added in substreams v1.7.0) and modules that depend on them will need to be deleted, since they may contain bad data. + +* Fix bad data in stores using `set_sum` policy: squashing of store segments incorrectly "summed" some values that should have been "set" if the last event for a key on this segment was a "sum" * Fix panic in initialization (`metrics sender not set`) ## v1.10.7 diff --git a/storage/store/merge_test.go b/storage/store/merge_test.go index 273eb1eb..5d5ec933 100644 --- a/storage/store/merge_test.go +++ b/storage/store/merge_test.go @@ -132,69 +132,81 @@ func TestStore_Merge(t *testing.T) { { name: "set_sum_int", latest: newPartialStore(map[string][]byte{ - "one": []byte("set:1"), - "two": []byte("sum:2"), + "one": []byte("set:1"), + "two": []byte("sum:2"), + "four": []byte("sum:2"), }, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeInt64, nil), prev: newStore(map[string][]byte{ "one": []byte("sum:1"), "three": []byte("sum:3"), + "four": []byte("sum:2"), }, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeInt64), expectedError: false, expectedKV: map[string][]byte{ "one": []byte("sum:1"), "two": []byte("sum:2"), "three": []byte("sum:3"), + "four": []byte("sum:4"), }, }, { name: "set_sum_float", latest: newPartialStore(map[string][]byte{ - "one": []byte("set:1.1"), - "two": []byte("sum:2.2"), + "one": []byte("set:1.1"), + "two": []byte("sum:2.2"), + "four": []byte("sum:2.2"), }, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeFloat64, nil), prev: newStore(map[string][]byte{ "one": []byte("sum:1.1"), "three": []byte("sum:3.3"), + "four": []byte("sum:2.2"), }, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeFloat64), expectedError: false, expectedKV: map[string][]byte{ "one": []byte("sum:1.1"), "two": []byte("sum:2.2"), "three": []byte("sum:3.3"), + "four": []byte("sum:4.4"), }, }, { name: "set_sum_big_int", latest: newPartialStore(map[string][]byte{ - "one": []byte("set:1"), - "two": []byte("sum:2"), + "one": []byte("set:1"), + "two": []byte("sum:2"), + "four": []byte("sum:2"), }, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeBigInt, nil), prev: newStore(map[string][]byte{ "one": []byte("sum:1"), "three": []byte("sum:3"), + "four": []byte("sum:2"), }, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeBigInt), expectedError: false, expectedKV: map[string][]byte{ "one": []byte("sum:1"), "two": []byte("sum:2"), "three": []byte("sum:3"), + "four": []byte("sum:4"), }, }, { name: "set_sum_big_decimal", latest: newPartialStore(map[string][]byte{ - "one": []byte("set:1.1"), - "two": []byte("sum:2.2"), + "one": []byte("set:1.1"), + "two": []byte("sum:2.2"), + "four": []byte("sum:2.2"), }, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeBigDecimal, nil), prev: newStore(map[string][]byte{ "one": []byte("sum:1.1"), "three": []byte("sum:3.3"), + "four": []byte("sum:2.2"), }, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeBigDecimal), expectedError: false, expectedKV: map[string][]byte{ "one": []byte("sum:1.1"), "two": []byte("sum:2.2"), "three": []byte("sum:3.3"), + "four": []byte("sum:4.4"), }, }, { diff --git a/storage/store/store_setsum.go b/storage/store/store_setsum.go index 242ff241..24dfa823 100644 --- a/storage/store/store_setsum.go +++ b/storage/store/store_setsum.go @@ -26,9 +26,10 @@ func (b *baseStore) setSumInt64(ord uint64, key string, value []byte) { } else { switch string(value[:4]) { case "sum:": + prevPrefix := string(val[:4]) // if we had a 'set:' before, we keep it. prev, _ := strconv.ParseInt(string(val[4:]), 10, 64) next, _ := strconv.ParseInt(string(value[4:]), 10, 64) - data = []byte("sum:" + strconv.FormatInt(prev+next, 10)) + data = []byte(prevPrefix + strconv.FormatInt(prev+next, 10)) case "set:": data = value default: @@ -55,9 +56,10 @@ func (b *baseStore) setSumFloat64(ord uint64, key string, value []byte) { } else { switch string(value[:4]) { case "sum:": + prevPrefix := string(val[:4]) // if we had a 'set:' before, we keep it. prev, _ := strconv.ParseFloat(string(val[4:]), 64) next, _ := strconv.ParseFloat(string(value[4:]), 64) - data = []byte("sum:" + strconv.FormatFloat(prev+next, 'g', 100, 64)) + data = []byte(prevPrefix + strconv.FormatFloat(prev+next, 'g', 100, 64)) case "set:": data = value default: @@ -84,9 +86,10 @@ func (b *baseStore) setSumBigInt(ord uint64, key string, value []byte) { } else { switch string(value[:4]) { case "sum:": + prevPrefix := string(val[:4]) // if we had a 'set:' before, we keep it. prev := valueToBigInt(val[4:]) next := valueToBigInt(value[4:]) - data = []byte("sum:" + big.NewInt(0).Add(prev, next).String()) + data = []byte(prevPrefix + big.NewInt(0).Add(prev, next).String()) case "set:": data = value default: @@ -113,9 +116,10 @@ func (b *baseStore) setSumBigDecimal(ord uint64, key string, value []byte) { } else { switch string(value[:4]) { case "sum:": + prevPrefix := string(val[:4]) // if we had a 'set:' before, we keep it. prev := mustDecimalFromBytes(val[4:]) next := mustDecimalFromBytes(value[4:]) - data = []byte("sum:" + prev.Add(next).String()) + data = []byte(prevPrefix + prev.Add(next).String()) case "set:": data = value default: diff --git a/storage/store/store_setsum_test.go b/storage/store/store_setsum_test.go index c8dc5092..c953b69f 100644 --- a/storage/store/store_setsum_test.go +++ b/storage/store/store_setsum_test.go @@ -63,7 +63,7 @@ func TestStoreSetSumInt64(t *testing.T) { existingValue: []byte("set:7"), value: []byte("sum:3"), expectedGetValue: []byte("10"), - expectedActualValue: []byte("sum:10"), + expectedActualValue: []byte("set:10"), // always keep a 'set:' prefix }, } @@ -150,7 +150,7 @@ func TestStoreSetSumFloat64(t *testing.T) { existingValue: []byte("set:7.0"), value: []byte("sum:3.0"), expectedGetValue: []byte("10"), - expectedActualValue: []byte("sum:10"), + expectedActualValue: []byte("set:10"), }, } @@ -236,7 +236,7 @@ func TestStoreSetSumBigInt(t *testing.T) { existingValue: []byte("set:7"), value: []byte("sum:3"), expectedGetValue: []byte("10"), - expectedActualValue: []byte("sum:10"), + expectedActualValue: []byte("set:10"), }, } @@ -322,7 +322,7 @@ func TestStoreSetSumBigDecimal(t *testing.T) { existingValue: []byte("set:7.0"), value: []byte("sum:3.0"), expectedGetValue: []byte("10"), - expectedActualValue: []byte("sum:10"), + expectedActualValue: []byte("set:10"), // always keep a 'set:' prefix }, }