diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/DataStoreController.java b/server/controller/src/main/java/ai/starwhale/mlops/api/DataStoreController.java index 6ec9d20c61..076448fcbe 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/api/DataStoreController.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/DataStoreController.java @@ -18,6 +18,7 @@ import ai.starwhale.mlops.api.protocol.Code; import ai.starwhale.mlops.api.protocol.ResponseMessage; +import ai.starwhale.mlops.api.protocol.datastore.BatchDeleteRequest; import ai.starwhale.mlops.api.protocol.datastore.CheckpointVo; import ai.starwhale.mlops.api.protocol.datastore.ColumnDesc; import ai.starwhale.mlops.api.protocol.datastore.CreateCheckpointRequest; @@ -141,6 +142,10 @@ ResponseEntity> updateTable( } } + ResponseEntity> batchDelete(@Valid @RequestBody BatchDeleteRequest request) { + return ResponseEntity.ok(Code.success.asResponse("Success")); + } + @PostMapping(value = "/datastore/flush") @PreAuthorize("hasAnyRole('OWNER', 'MAINTAINER')") ResponseEntity> flush(FlushRequest request) { diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/datastore/BatchDeleteRequest.java b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/datastore/BatchDeleteRequest.java new file mode 100644 index 0000000000..9d8b987fc7 --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/datastore/BatchDeleteRequest.java @@ -0,0 +1,49 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.api.protocol.datastore; + +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class BatchDeleteRequest { + @NotNull + private String tableName; + + // encoded start key, null means start from the beginning + private String startKey; + // encoded end key, null means end at the end + private String endKey; + // keyType can not be null if startKey or endKey is not null + private String keyType; + + @Builder.Default + private Boolean startKeyInclusive = true; + @Builder.Default + private Boolean endKeyInclusive = false; + + // optional key prefix, only works if the keys in the table are strings and the keyPrefix is not null + // keyPrefix and (start, end) can not be used together + private String keyPrefix; +} + diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java index 531165e8a2..505ccb488a 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/DataStore.java @@ -16,9 +16,11 @@ package ai.starwhale.mlops.datastore; +import ai.starwhale.mlops.api.protocol.datastore.BatchDeleteRequest; import ai.starwhale.mlops.api.protocol.datastore.CreateCheckpointRequest; import ai.starwhale.mlops.datastore.ParquetConfig.CompressionCodec; import ai.starwhale.mlops.datastore.impl.MemoryTableImpl; +import ai.starwhale.mlops.datastore.impl.RecordDecoder; import ai.starwhale.mlops.datastore.impl.RecordEncoder; import ai.starwhale.mlops.datastore.type.BaseValue; import ai.starwhale.mlops.datastore.wal.WalManager; @@ -939,4 +941,28 @@ public void deleteCheckpoint(String tableName, long revision) { table.unlock(false); } } + + public void batchDeleteRows(BatchDeleteRequest req) { + var table = this.getTable(req.getTableName(), false, false); + //noinspection ConstantConditions + table.lock(false); + try { + boolean useKeyPrefix = StringUtils.hasText(req.getKeyPrefix()); + // keyPrefix and (start, end) can not be used together + if (useKeyPrefix && (req.getStartKey() != null || req.getEndKey() != null)) { + throw new SwValidationException(SwValidationException.ValidSubject.DATASTORE, + "keyPrefix and (start, end) can not be used together"); + } + + if (useKeyPrefix) { + table.deleteRowsWithKeyPrefix(req.getKeyPrefix()); + } else { + var start = RecordDecoder.decodeScalar(req.getStartKey(), req.getKeyType()); + var end = RecordDecoder.decodeScalar(req.getEndKey(), req.getKeyType()); + table.deleteRowsWithRange(start, req.getStartKeyInclusive(), end, req.getEndKeyInclusive()); + } + } finally { + table.unlock(false); + } + } } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java index ca0edba623..287605c596 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/MemoryTable.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import javax.validation.constraints.NotNull; public interface MemoryTable { @@ -98,4 +99,26 @@ Iterator scan( * @param revision revision in checkpoint */ void deleteCheckpoint(long revision); + + /** + * Batch delete rows by start end + * + * @param start start key, nullable + * if start is null, it means start from the beginning + * if start is not null, it means start from the start key + * (inclusive or not depends on startInclusive) + * @param startInclusive start key inclusive or not + * @param end end key, nullable + * if end is null, it means end at the end + * if end is not null, it means end at the end key (inclusive or not depends on endInclusive) + * @param endInclusive end key inclusive or not + */ + void deleteRowsWithRange(BaseValue start, Boolean startInclusive, BaseValue end, Boolean endInclusive); + + /** + * Delete rows with key prefix + * + * @param keyPrefix key prefix + */ + void deleteRowsWithKeyPrefix(@NotNull String keyPrefix); } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/Tombstone.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/Tombstone.java new file mode 100644 index 0000000000..2db65b21b6 --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/Tombstone.java @@ -0,0 +1,111 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.datastore; + +import ai.starwhale.mlops.datastore.impl.WalRecordDecoder; +import ai.starwhale.mlops.datastore.type.BaseValue; +import ai.starwhale.mlops.datastore.type.StringValue; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class Tombstone { + // notice that the start and end must be the same type + private BaseValue start; + private BaseValue end; + private boolean startInclusive; + private boolean endInclusive; + private String keyPrefix; + + public static Tombstone from(Wal.Tombstone tombstone) { + var ret = Tombstone.builder(); + if (tombstone.hasPrefix()) { + ret.keyPrefix(tombstone.getPrefix().getKeyPrefix()); + return ret.build(); + } + + if (tombstone.hasRange()) { + var range = tombstone.getRange(); + ret.start(WalRecordDecoder.decodeValue(range.getStartKey())); + ret.end(WalRecordDecoder.decodeValue(range.getEndKey())); + ret.startInclusive(range.getStartInclusive()); + ret.endInclusive(range.getEndInclusive()); + return ret.build(); + } + throw new IllegalArgumentException("invalid tombstone"); + } + + /** + * check if the key is deleted by this tombstone + *

+ * if the keyPrefix is set, then the key is deleted if it starts with the prefix + * if the start and end are set, then the key is deleted if it is in the range + * the key type must be the same as the start and end + * if the start or end is null, then the range is unbounded + *

+ * + * @param key the key to check + * @return true if the key is deleted by this tombstone + */ + public boolean keyDeleted(@NonNull BaseValue key) { + if (keyPrefix != null) { + // check if key is StringValue + if (!(key instanceof StringValue)) { + return false; + } + var keyStr = ((StringValue) key).getValue(); + if (keyStr == null) { + return false; + } + return keyStr.startsWith(keyPrefix); + } + if (start == null && end == null) { + return true; + } + + var rangeType = start != null ? start.getClass() : end.getClass(); + if (!rangeType.isInstance(key)) { + return false; + } + + if (start != null && end != null) { + var cmpStart = start.compareTo(key); + var cmpEnd = end.compareTo(key); + if (cmpStart < 0 || (cmpStart == 0 && startInclusive)) { + return cmpEnd > 0 || (cmpEnd == 0 && endInclusive); + } + } else { + if (start != null) { + var cmpStart = start.compareTo(key); + if (cmpStart < 0 || (cmpStart == 0 && startInclusive)) { + return true; + } + } + if (end != null) { + var cmpEnd = end.compareTo(key); + return cmpEnd > 0 || (cmpEnd == 0 && endInclusive); + } + } + return false; + } +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java index 50c60143fc..171900ad61 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java @@ -28,8 +28,12 @@ import ai.starwhale.mlops.datastore.TableQueryFilter; import ai.starwhale.mlops.datastore.TableSchema; import ai.starwhale.mlops.datastore.TableSchemaDesc; +import ai.starwhale.mlops.datastore.Tombstone; import ai.starwhale.mlops.datastore.Wal; import ai.starwhale.mlops.datastore.Wal.Checkpoint.OP; +import ai.starwhale.mlops.datastore.Wal.Tombstone.Builder; +import ai.starwhale.mlops.datastore.Wal.Tombstone.Prefix; +import ai.starwhale.mlops.datastore.Wal.WalEntry.Type; import ai.starwhale.mlops.datastore.parquet.SwParquetReaderBuilder; import ai.starwhale.mlops.datastore.parquet.SwParquetWriterBuilder; import ai.starwhale.mlops.datastore.parquet.SwReadSupport; @@ -127,6 +131,7 @@ public class MemoryTableImpl implements MemoryTable { private final Lock writeLock = lock.writeLock(); private final ConcurrentSkipListMap checkpoints = new ConcurrentSkipListMap<>(); + private final ConcurrentSkipListMap tombstones = new ConcurrentSkipListMap<>(); public MemoryTableImpl( String tableName, @@ -510,7 +515,7 @@ private Map getRecordMap(BaseValue key, List ve if (record.getRevision() <= revision) { // record may be empty, use hasVersion to mark if there is a record hasVersion = true; - if (record.isDeleted()) { + if (record.isDeleted() || checkIfDeletedByTombstone(key, record.getRevision(), revision)) { deleted = true; ret.clear(); } else { @@ -529,6 +534,13 @@ private Map getRecordMap(BaseValue key, List ve return ret; } + private boolean checkIfDeletedByTombstone(BaseValue key, long curRevision, long maxRevision) { + return this.tombstones.tailMap(curRevision, false).entrySet().stream() + .filter(tombstone -> tombstone.getKey() <= maxRevision) + .anyMatch(tombstone -> tombstone.getValue().keyDeleted(key) + ); + } + @Override public Iterator query( long revision, @@ -920,6 +932,41 @@ public void deleteCheckpoint(long revision) { deleteCheckpointAndGc(revision); } + @Override + public void deleteRowsWithRange(BaseValue start, Boolean startInclusive, BaseValue end, Boolean endInclusive) { + var range = Wal.Tombstone.Range.newBuilder() + .setStartKey(BaseValue.encodeWal(start)) + .setStartInclusive(startInclusive) + .setEndKey(BaseValue.encodeWal(end)) + .setEndInclusive(endInclusive); + var tombstone = Wal.Tombstone.newBuilder().setRange(range); + addTombstone(tombstone); + } + + @Override + public void deleteRowsWithKeyPrefix(String keyPrefix) { + // write wal entry + var tombstone = Wal.Tombstone.newBuilder() + .setPrefix(Prefix.newBuilder().setKeyPrefix(keyPrefix).build()); + addTombstone(tombstone); + } + + private void addTombstone(Builder tombstone) { + //noinspection NonAtomicOperationOnVolatileField + this.lastRevision++; + tombstone.setRevision(this.lastRevision); + var entry = Wal.WalEntry.newBuilder() + .setEntryType(Type.TOMBSTONE) + .setTableName(this.tableName) + .setTombstone(tombstone); + + this.walManager.append(entry); + if (this.firstWalLogId < 0) { + this.firstWalLogId = this.lastWalLogId; + } + this.tombstones.put(tombstone.getRevision(), Tombstone.from(tombstone.build())); + } + /** * Check if the revision is valid, and valid means one of the following: *
    @@ -982,7 +1029,7 @@ private void garbageCollect(@NotNull Checkpoint from, @NotNull Checkpoint to) { return; } - this.recordMap.values().forEach(versions -> { + this.recordMap.forEach((key, versions) -> { if (versions.isEmpty()) { return; } @@ -993,10 +1040,11 @@ private void garbageCollect(@NotNull Checkpoint from, @NotNull Checkpoint to) { long lastRevision = 0; while (it.hasNext()) { var record = it.next(); - if (record.getRevision() <= fromRevision) { + var recordRevision = record.getRevision(); + if (recordRevision <= fromRevision) { continue; } - if (record.getRevision() > toRevision) { + if (recordRevision > toRevision) { break; } @@ -1004,6 +1052,7 @@ var record = it.next(); current = record; continue; } + if (record.isDeleted()) { lastDeletion = record; } @@ -1016,7 +1065,7 @@ var record = it.next(); current.getValues().put(entry.getKey(), entry.getValue()); } } - lastRevision = record.getRevision(); + lastRevision = recordRevision; it.remove(); } // replace the current item with the patched one @@ -1031,6 +1080,7 @@ var record = it.next(); .values(current.getValues()) .build()); } + versions.sort(Comparator.comparingLong(MemoryRecord::getRevision)); }); } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/RecordDecoder.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/RecordDecoder.java index 3575050246..4d707f78f2 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/RecordDecoder.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/RecordDecoder.java @@ -126,6 +126,14 @@ public static ScalarValue decodeScalar(@NonNull ColumnType type, Object value) { } } + public static ScalarValue decodeScalar(@NonNull String type, String value) { + if (value == null) { + return null; + } + var columnType = ColumnType.valueOf(type); + return RecordDecoder.decodeScalar(columnType, value); + } + private static ListValue decodeList(@NonNull ColumnSchemaDesc columnSchema, Object value) { var ret = new ListValue(); var elements = (List) value; diff --git a/server/controller/src/main/protobuf/wal.proto b/server/controller/src/main/protobuf/wal.proto index 15235389eb..5339bafc8e 100644 --- a/server/controller/src/main/protobuf/wal.proto +++ b/server/controller/src/main/protobuf/wal.proto @@ -64,7 +64,20 @@ message Checkpoint { } message Tombstone { - // TODO add more biz fields + message Range { + Column start_key = 1; + Column end_key = 2; + bool start_inclusive = 3; + bool end_inclusive = 4; + } + message Prefix { + string key_prefix = 1; + } + int64 revision = 1; + oneof optional_range_or_prefix { + Range range = 2; + Prefix prefix = 3; + } } message WalEntry { diff --git a/server/controller/src/test/java/ai/starwhale/mlops/datastore/TombstoneTest.java b/server/controller/src/test/java/ai/starwhale/mlops/datastore/TombstoneTest.java new file mode 100644 index 0000000000..dfb303cb3d --- /dev/null +++ b/server/controller/src/test/java/ai/starwhale/mlops/datastore/TombstoneTest.java @@ -0,0 +1,188 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.datastore; + + +import ai.starwhale.mlops.datastore.type.BaseValue; +import ai.starwhale.mlops.datastore.type.StringValue; +import org.junit.jupiter.api.Test; + +class TombstoneTest { + @Test + public void testKeyDeletedWithPrefix() { + var tombstone = Tombstone.builder().keyPrefix("foo").build(); + for (var item : new String[]{"foo", "foobar", "foobaz"}) { + assert tombstone.keyDeleted(new StringValue(item)); + } + for (var item : new String[]{"bar", "baz", "barfoo"}) { + assert !tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new BaseValue[]{BaseValue.valueOf(1), BaseValue.valueOf(1.0), BaseValue.valueOf(true)}) { + assert !tombstone.keyDeleted(item); + } + } + + @Test + public void testKeyDeletedWithRange() { + var tombstone = Tombstone.builder() + .start(new StringValue("foo1")) + .end(new StringValue("foo9")) + .startInclusive(true) + .endInclusive(true) + .build(); + for (var item : new String[]{"foo1", "foo2", "foo3", "foo4", "foo5", "foo6", "foo7", "foo8", "foo9"}) { + assert tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new String[]{"foo", "foo0", "fooa", "bar", "baz", "barfoo"}) { + assert !tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new BaseValue[]{BaseValue.valueOf(1), BaseValue.valueOf(1.0), BaseValue.valueOf(true)}) { + assert !tombstone.keyDeleted(item); + } + + tombstone = Tombstone.builder() + .start(new StringValue("foo1")) + .end(new StringValue("foo9")) + .startInclusive(false) + .endInclusive(false) + .build(); + for (var item : new String[]{"foo2", "foo3", "foo4", "foo5", "foo6", "foo7", "foo8"}) { + assert tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new String[]{"foo1", "foo9", "foo", "foo0", "fooa", "bar", "baz", "barfoo"}) { + assert !tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new BaseValue[]{BaseValue.valueOf(1), BaseValue.valueOf(1.0), BaseValue.valueOf(true)}) { + assert !tombstone.keyDeleted(item); + } + + tombstone = Tombstone.builder() + .start(new StringValue("foo1")) + .end(new StringValue("foo9")) + .startInclusive(false) + .endInclusive(true) + .build(); + for (var item : new String[]{"foo2", "foo3", "foo4", "foo5", "foo6", "foo7", "foo8", "foo9"}) { + assert tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new String[]{"foo1", "foo", "foo0", "fooa", "bar", "baz", "barfoo"}) { + assert !tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new BaseValue[]{BaseValue.valueOf(1), BaseValue.valueOf(1.0), BaseValue.valueOf(true)}) { + assert !tombstone.keyDeleted(item); + } + + tombstone = Tombstone.builder() + .start(new StringValue("foo1")) + .end(new StringValue("foo9")) + .startInclusive(true) + .endInclusive(false) + .build(); + + for (var item : new String[]{"foo1", "foo2", "foo3", "foo4", "foo5", "foo6", "foo7", "foo8"}) { + assert tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new String[]{"foo9", "foo", "foo0", "fooa", "bar", "baz", "barfoo"}) { + assert !tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new BaseValue[]{BaseValue.valueOf(1), BaseValue.valueOf(1.0), BaseValue.valueOf(true)}) { + assert !tombstone.keyDeleted(item); + } + + tombstone = Tombstone.builder() + .start(new StringValue("foo1")) + .end(new StringValue("foo1")) + .startInclusive(true) + .endInclusive(true) + .build(); + + for (var item : new String[]{"foo1"}) { + assert tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new String[]{"foo", "foo0", "foo2", "fooa", "bar", "baz", "barfoo"}) { + assert !tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new BaseValue[]{BaseValue.valueOf(1), BaseValue.valueOf(1.0), BaseValue.valueOf(true)}) { + assert !tombstone.keyDeleted(item); + } + + tombstone = Tombstone.builder() + .start(new StringValue("foo1")) + .end(new StringValue("foo1")) + .startInclusive(false) + .endInclusive(false) + .build(); + + for (var item : new String[]{"foo1", "foo", "foo0", "foo2", "fooa", "bar", "baz", "barfoo"}) { + assert !tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new BaseValue[]{BaseValue.valueOf(1), BaseValue.valueOf(1.0), BaseValue.valueOf(true)}) { + assert !tombstone.keyDeleted(item); + } + + // none, none + tombstone = Tombstone.builder().build(); + for (var item : new String[]{"foo1", "foo", "foo0", "foo2", "fooa", "bar", "baz", "barfoo"}) { + assert tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new BaseValue[]{BaseValue.valueOf(1), BaseValue.valueOf(1.0), BaseValue.valueOf(true)}) { + assert tombstone.keyDeleted(item); + } + + // start, none + tombstone = Tombstone.builder().start(new StringValue("foo1")).startInclusive(true).build(); + for (var item : new String[]{"foo1", "foo2", "foo3", "foo4", "foo5", "foo6", "foo7", "foo8", "foo9", "fooa", + "x"}) { + assert tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new String[]{"foo", "foo0", "bar", "baz", "barfoo"}) { + assert !tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new BaseValue[]{BaseValue.valueOf(1), BaseValue.valueOf(1.0), BaseValue.valueOf(true)}) { + assert !tombstone.keyDeleted(item); + } + + // none, end + tombstone = Tombstone.builder().end(new StringValue("foo9")).endInclusive(true).build(); + for (var item : new String[]{"a", "foo1", "foo2", "foo3", "foo4", "foo5", "foo6", "foo7", "foo8", "foo9"}) { + assert tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new String[]{"fooa", "x"}) { + assert !tombstone.keyDeleted(new StringValue(item)); + } + + for (var item : new BaseValue[]{BaseValue.valueOf(1), BaseValue.valueOf(1.0), BaseValue.valueOf(true)}) { + assert !tombstone.keyDeleted(item); + } + } +} diff --git a/server/controller/src/test/java/ai/starwhale/mlops/datastore/impl/MemoryTableImplTest.java b/server/controller/src/test/java/ai/starwhale/mlops/datastore/impl/MemoryTableImplTest.java index 1becff14d0..a218776367 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/datastore/impl/MemoryTableImplTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/datastore/impl/MemoryTableImplTest.java @@ -3383,4 +3383,84 @@ public void testCheckpointIntegrate() { } } } + + @Test + public void testTombstone() { + var memoryTable = createInstance("tombstone"); + var tableSchemaDesc = new TableSchemaDesc("key", + List.of( + ColumnSchemaDesc.builder().name("key").type("INT32").build(), + ColumnSchemaDesc.builder().name("a").type("INT32").build() + )); + memoryTable.update(tableSchemaDesc, List.of(Map.of("key", "1", "a", "1"))); + memoryTable.update(tableSchemaDesc, List.of(Map.of("key", "2", "a", "2"))); + memoryTable.update(tableSchemaDesc, List.of(Map.of("key", "3", "a", "3"))); + memoryTable.update(tableSchemaDesc, List.of(Map.of("key", "4", "a", "4"))); + + // test by range + memoryTable.deleteRowsWithRange(BaseValue.valueOf(1), true, BaseValue.valueOf(3), false); + var resp = memoryTable.query(Long.MAX_VALUE, Map.of("a", "a"), null, null, false); + var result = ImmutableList.copyOf(resp); + assertThat(result.size(), is(4)); + assertThat(result.get(0), is(new RecordResult(BaseValue.valueOf(1), true, null))); + assertThat(result.get(1), is(new RecordResult(BaseValue.valueOf(2), true, null))); + assertThat(result.get(2).getValues(), is(Map.of("a", BaseValue.valueOf(3)))); + assertThat(result.get(3).getValues(), is(Map.of("a", BaseValue.valueOf(4)))); + + // test by key prefix + memoryTable = createInstance("tombstone-key-prefix"); + tableSchemaDesc = new TableSchemaDesc("key", + List.of( + ColumnSchemaDesc.builder().name("key").type("STRING").build(), + ColumnSchemaDesc.builder().name("a").type("INT32").build() + )); + memoryTable.update(tableSchemaDesc, List.of(Map.of("key", "aac", "a", "1"))); + memoryTable.update(tableSchemaDesc, List.of(Map.of("key", "abc", "a", "2"))); + memoryTable.update(tableSchemaDesc, List.of(Map.of("key", "abd", "a", "3"))); + memoryTable.update(tableSchemaDesc, List.of(Map.of("key", "afe", "a", "4"))); + + memoryTable.deleteRowsWithKeyPrefix("ab"); + resp = memoryTable.query(Long.MAX_VALUE, Map.of("a", "a"), null, null, false); + result = ImmutableList.copyOf(resp); + assertThat(result.size(), is(4)); + assertThat(result.get(0).getValues(), is(Map.of("a", BaseValue.valueOf(1)))); + assertThat(result.get(1), is(new RecordResult(BaseValue.valueOf("abc"), true, null))); + assertThat(result.get(2), is(new RecordResult(BaseValue.valueOf("abd"), true, null))); + assertThat(result.get(3).getValues(), is(Map.of("a", BaseValue.valueOf(4)))); + } + + @Test + public void testTombStoneWithRevision() { + var memoryTable = createInstance("tombstone"); + var tableSchemaDesc = new TableSchemaDesc("key", + List.of( + ColumnSchemaDesc.builder().name("key").type("STRING").build(), + ColumnSchemaDesc.builder().name("a").type("INT32").build() + )); + + memoryTable.createCheckpoint(null); + memoryTable.update(tableSchemaDesc, List.of(Map.of("key", "aac", "a", "1"))); + memoryTable.update(tableSchemaDesc, List.of(Map.of("key", "abc", "a", "2"))); + memoryTable.update(tableSchemaDesc, List.of(Map.of("key", "abd", "a", "3"))); + + memoryTable.deleteRowsWithKeyPrefix("ab"); + memoryTable.update(tableSchemaDesc, List.of(Map.of("key", "abc", "a", "4"))); + + var resp = memoryTable.query(Long.MAX_VALUE, Map.of("a", "a"), null, null, false); + var result = ImmutableList.copyOf(resp); + assertThat(result.size(), is(3)); + assertThat(result.get(0).getValues(), is(Map.of("a", BaseValue.valueOf(1)))); + assertThat(result.get(1).getValues(), is(Map.of("a", BaseValue.valueOf(4)))); + assertThat(result.get(2), is(new RecordResult(BaseValue.valueOf("abd"), true, null))); + + // create a checkpoint to trigger garbage collection + memoryTable.createCheckpoint(null); + + resp = memoryTable.query(Long.MAX_VALUE, Map.of("a", "a"), null, null, false); + result = ImmutableList.copyOf(resp); + assertThat(result.size(), is(3)); + assertThat(result.get(0).getValues(), is(Map.of("a", BaseValue.valueOf(1)))); + assertThat(result.get(1).getValues(), is(Map.of("a", BaseValue.valueOf(4)))); + assertThat(result.get(2), is(new RecordResult(BaseValue.valueOf("abd"), true, null))); + } }