Skip to content

Commit

Permalink
init tombstone
Browse files Browse the repository at this point in the history
  • Loading branch information
jialeicui committed Jan 4, 2024
1 parent ff4446e commit 3cc5218
Show file tree
Hide file tree
Showing 10 changed files with 559 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import ai.starwhale.mlops.api.protocol.Code;
import ai.starwhale.mlops.api.protocol.ResponseMessage;
import ai.starwhale.mlops.api.protocol.datastore.BatchDeleteRequest;
import ai.starwhale.mlops.api.protocol.datastore.CheckpointVo;
import ai.starwhale.mlops.api.protocol.datastore.ColumnDesc;
import ai.starwhale.mlops.api.protocol.datastore.CreateCheckpointRequest;
Expand Down Expand Up @@ -141,6 +142,10 @@ ResponseEntity<ResponseMessage<String>> updateTable(
}
}

ResponseEntity<ResponseMessage<String>> batchDelete(@Valid @RequestBody BatchDeleteRequest request) {
return ResponseEntity.ok(Code.success.asResponse("Success"));
}

@PostMapping(value = "/datastore/flush")
@PreAuthorize("hasAnyRole('OWNER', 'MAINTAINER')")
ResponseEntity<ResponseMessage<String>> flush(FlushRequest request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2022 Starwhale, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.starwhale.mlops.api.protocol.datastore;

import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BatchDeleteRequest {
@NotNull
private String tableName;

// encoded start key, null means start from the beginning
private String startKey;
// encoded end key, null means end at the end
private String endKey;
// keyType can not be null if startKey or endKey is not null
private String keyType;

@Builder.Default
private Boolean startKeyInclusive = true;
@Builder.Default
private Boolean endKeyInclusive = false;

// optional key prefix, only works if the keys in the table are strings and the keyPrefix is not null
// keyPrefix and (start, end) can not be used together
private String keyPrefix;
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package ai.starwhale.mlops.datastore;

import ai.starwhale.mlops.api.protocol.datastore.BatchDeleteRequest;
import ai.starwhale.mlops.api.protocol.datastore.CreateCheckpointRequest;
import ai.starwhale.mlops.datastore.ParquetConfig.CompressionCodec;
import ai.starwhale.mlops.datastore.impl.MemoryTableImpl;
import ai.starwhale.mlops.datastore.impl.RecordDecoder;
import ai.starwhale.mlops.datastore.impl.RecordEncoder;
import ai.starwhale.mlops.datastore.type.BaseValue;
import ai.starwhale.mlops.datastore.wal.WalManager;
Expand Down Expand Up @@ -939,4 +941,28 @@ public void deleteCheckpoint(String tableName, long revision) {
table.unlock(false);
}
}

public void batchDeleteRows(BatchDeleteRequest req) {
var table = this.getTable(req.getTableName(), false, false);
//noinspection ConstantConditions
table.lock(false);
try {
boolean useKeyPrefix = StringUtils.hasText(req.getKeyPrefix());
// keyPrefix and (start, end) can not be used together
if (useKeyPrefix && (req.getStartKey() != null || req.getEndKey() != null)) {
throw new SwValidationException(SwValidationException.ValidSubject.DATASTORE,
"keyPrefix and (start, end) can not be used together");
}

if (useKeyPrefix) {
table.deleteRowsWithKeyPrefix(req.getKeyPrefix());
} else {
var start = RecordDecoder.decodeScalar(req.getStartKey(), req.getKeyType());
var end = RecordDecoder.decodeScalar(req.getEndKey(), req.getKeyType());
table.deleteRowsWithRange(start, req.getStartKeyInclusive(), end, req.getEndKeyInclusive());
}
} finally {
table.unlock(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.NotNull;

public interface MemoryTable {

Expand Down Expand Up @@ -98,4 +99,26 @@ Iterator<RecordResult> scan(
* @param revision revision in checkpoint
*/
void deleteCheckpoint(long revision);

/**
* Batch delete rows by start end
*
* @param start start key, nullable
* if start is null, it means start from the beginning
* if start is not null, it means start from the start key
* (inclusive or not depends on startInclusive)
* @param startInclusive start key inclusive or not
* @param end end key, nullable
* if end is null, it means end at the end
* if end is not null, it means end at the end key (inclusive or not depends on endInclusive)
* @param endInclusive end key inclusive or not
*/
void deleteRowsWithRange(BaseValue start, Boolean startInclusive, BaseValue end, Boolean endInclusive);

/**
* Delete rows with key prefix
*
* @param keyPrefix key prefix
*/
void deleteRowsWithKeyPrefix(@NotNull String keyPrefix);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2022 Starwhale, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.starwhale.mlops.datastore;

import ai.starwhale.mlops.datastore.impl.WalRecordDecoder;
import ai.starwhale.mlops.datastore.type.BaseValue;
import ai.starwhale.mlops.datastore.type.StringValue;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Tombstone {
// notice that the start and end must be the same type
private BaseValue start;
private BaseValue end;
private boolean startInclusive;
private boolean endInclusive;
private String keyPrefix;

public static Tombstone from(Wal.Tombstone tombstone) {
var ret = Tombstone.builder();
if (tombstone.hasPrefix()) {
ret.keyPrefix(tombstone.getPrefix().getKeyPrefix());
return ret.build();
}

if (tombstone.hasRange()) {
var range = tombstone.getRange();
ret.start(WalRecordDecoder.decodeValue(range.getStartKey()));
ret.end(WalRecordDecoder.decodeValue(range.getEndKey()));
ret.startInclusive(range.getStartInclusive());
ret.endInclusive(range.getEndInclusive());
return ret.build();
}
throw new IllegalArgumentException("invalid tombstone");
}

/**
* check if the key is deleted by this tombstone
* <p>
* if the keyPrefix is set, then the key is deleted if it starts with the prefix
* if the start and end are set, then the key is deleted if it is in the range
* the key type must be the same as the start and end
* if the start or end is null, then the range is unbounded
* </p>
*
* @param key the key to check
* @return true if the key is deleted by this tombstone
*/
public boolean keyDeleted(@NonNull BaseValue key) {
if (keyPrefix != null) {
// check if key is StringValue
if (!(key instanceof StringValue)) {
return false;
}
var keyStr = ((StringValue) key).getValue();
if (keyStr == null) {
return false;
}
return keyStr.startsWith(keyPrefix);
}
if (start == null && end == null) {
return true;
}

var rangeType = start != null ? start.getClass() : end.getClass();
if (!rangeType.isInstance(key)) {
return false;
}

if (start != null && end != null) {
var cmpStart = start.compareTo(key);
var cmpEnd = end.compareTo(key);
if (cmpStart < 0 || (cmpStart == 0 && startInclusive)) {
return cmpEnd > 0 || (cmpEnd == 0 && endInclusive);
}
} else {
if (start != null) {
var cmpStart = start.compareTo(key);
if (cmpStart < 0 || (cmpStart == 0 && startInclusive)) {
return true;
}
}
if (end != null) {
var cmpEnd = end.compareTo(key);
return cmpEnd > 0 || (cmpEnd == 0 && endInclusive);
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
import ai.starwhale.mlops.datastore.TableQueryFilter;
import ai.starwhale.mlops.datastore.TableSchema;
import ai.starwhale.mlops.datastore.TableSchemaDesc;
import ai.starwhale.mlops.datastore.Tombstone;
import ai.starwhale.mlops.datastore.Wal;
import ai.starwhale.mlops.datastore.Wal.Checkpoint.OP;
import ai.starwhale.mlops.datastore.Wal.Tombstone.Builder;
import ai.starwhale.mlops.datastore.Wal.Tombstone.Prefix;
import ai.starwhale.mlops.datastore.Wal.WalEntry.Type;
import ai.starwhale.mlops.datastore.parquet.SwParquetReaderBuilder;
import ai.starwhale.mlops.datastore.parquet.SwParquetWriterBuilder;
import ai.starwhale.mlops.datastore.parquet.SwReadSupport;
Expand Down Expand Up @@ -127,6 +131,7 @@ public class MemoryTableImpl implements MemoryTable {
private final Lock writeLock = lock.writeLock();

private final ConcurrentSkipListMap<Long, Checkpoint> checkpoints = new ConcurrentSkipListMap<>();
private final ConcurrentSkipListMap<Long, Tombstone> tombstones = new ConcurrentSkipListMap<>();

public MemoryTableImpl(
String tableName,
Expand Down Expand Up @@ -510,7 +515,7 @@ private Map<String, BaseValue> getRecordMap(BaseValue key, List<MemoryRecord> ve
if (record.getRevision() <= revision) {
// record may be empty, use hasVersion to mark if there is a record
hasVersion = true;
if (record.isDeleted()) {
if (record.isDeleted() || checkIfDeletedByTombstone(key, record.getRevision(), revision)) {
deleted = true;
ret.clear();
} else {
Expand All @@ -529,6 +534,13 @@ private Map<String, BaseValue> getRecordMap(BaseValue key, List<MemoryRecord> ve
return ret;
}

private boolean checkIfDeletedByTombstone(BaseValue key, long curRevision, long maxRevision) {
return this.tombstones.tailMap(curRevision, false).entrySet().stream()
.filter(tombstone -> tombstone.getKey() <= maxRevision)
.anyMatch(tombstone -> tombstone.getValue().keyDeleted(key)
);
}

@Override
public Iterator<RecordResult> query(
long revision,
Expand Down Expand Up @@ -920,6 +932,41 @@ public void deleteCheckpoint(long revision) {
deleteCheckpointAndGc(revision);
}

@Override
public void deleteRowsWithRange(BaseValue start, Boolean startInclusive, BaseValue end, Boolean endInclusive) {
var range = Wal.Tombstone.Range.newBuilder()
.setStartKey(BaseValue.encodeWal(start))
.setStartInclusive(startInclusive)
.setEndKey(BaseValue.encodeWal(end))
.setEndInclusive(endInclusive);
var tombstone = Wal.Tombstone.newBuilder().setRange(range);
addTombstone(tombstone);
}

@Override
public void deleteRowsWithKeyPrefix(String keyPrefix) {
// write wal entry
var tombstone = Wal.Tombstone.newBuilder()
.setPrefix(Prefix.newBuilder().setKeyPrefix(keyPrefix).build());
addTombstone(tombstone);
}

private void addTombstone(Builder tombstone) {
//noinspection NonAtomicOperationOnVolatileField
this.lastRevision++;
tombstone.setRevision(this.lastRevision);
var entry = Wal.WalEntry.newBuilder()
.setEntryType(Type.TOMBSTONE)
.setTableName(this.tableName)
.setTombstone(tombstone);

this.walManager.append(entry);
if (this.firstWalLogId < 0) {
this.firstWalLogId = this.lastWalLogId;
}
this.tombstones.put(tombstone.getRevision(), Tombstone.from(tombstone.build()));
}

/**
* Check if the revision is valid, and valid means one of the following:
* <ul>
Expand Down Expand Up @@ -982,7 +1029,7 @@ private void garbageCollect(@NotNull Checkpoint from, @NotNull Checkpoint to) {
return;
}

this.recordMap.values().forEach(versions -> {
this.recordMap.forEach((key, versions) -> {
if (versions.isEmpty()) {
return;
}
Expand All @@ -993,17 +1040,19 @@ private void garbageCollect(@NotNull Checkpoint from, @NotNull Checkpoint to) {
long lastRevision = 0;
while (it.hasNext()) {
var record = it.next();
if (record.getRevision() <= fromRevision) {
var recordRevision = record.getRevision();
if (recordRevision <= fromRevision) {
continue;
}
if (record.getRevision() > toRevision) {
if (recordRevision > toRevision) {
break;
}

if (current == null) {
current = record;
continue;
}

if (record.isDeleted()) {
lastDeletion = record;
}
Expand All @@ -1016,7 +1065,7 @@ var record = it.next();
current.getValues().put(entry.getKey(), entry.getValue());
}
}
lastRevision = record.getRevision();
lastRevision = recordRevision;
it.remove();
}
// replace the current item with the patched one
Expand All @@ -1031,6 +1080,7 @@ var record = it.next();
.values(current.getValues())
.build());
}

versions.sort(Comparator.comparingLong(MemoryRecord::getRevision));
});
}
Expand Down
Loading

0 comments on commit 3cc5218

Please sign in to comment.