From 238d968c6dc66a12b945ed2a2bba0df4cc157d85 Mon Sep 17 00:00:00 2001 From: Alexander Lavrukov Date: Mon, 8 Apr 2024 11:48:59 +0300 Subject: [PATCH] table-cache: common table methods --- .../test/inmemory/InMemoryTable.java | 6 ++ .../yoj/repository/ydb/table/YdbTable.java | 7 +- .../yoj/repository/ydb/table/YdbTable.java | 6 ++ .../ydb/yoj/repository/db/CommonTable.java | 68 +++++++++++++++++++ .../tech/ydb/yoj/repository/db/Table.java | 54 +-------------- 5 files changed, 88 insertions(+), 53 deletions(-) create mode 100644 repository/src/main/java/tech/ydb/yoj/repository/db/CommonTable.java diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java index 9951fa14..807f1c38 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java @@ -7,6 +7,7 @@ import tech.ydb.yoj.databind.expression.OrderExpression; import tech.ydb.yoj.databind.schema.ObjectSchema; import tech.ydb.yoj.databind.schema.Schema; +import tech.ydb.yoj.repository.db.CommonTable; import tech.ydb.yoj.repository.db.Entity; import tech.ydb.yoj.repository.db.EntityExpressions; import tech.ydb.yoj.repository.db.EntityIdSchema; @@ -184,6 +185,11 @@ public V find(Class viewType, Entity.Id id) { return transaction.doInTransaction("find(" + id + ")", type, shard -> shard.find(id, viewType)); } + @Override + public > List find(Set ids) { + return CommonTable.find(transaction.getTransactionLocal(), this, ids); + } + @Override @SuppressWarnings("unchecked") public > List find(Range range) { diff --git a/repository-ydb-v1/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java b/repository-ydb-v1/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java index 0e143542..108dd6ee 100644 --- a/repository-ydb-v1/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java +++ b/repository-ydb-v1/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java @@ -7,6 +7,7 @@ import lombok.NonNull; import tech.ydb.yoj.databind.expression.FilterExpression; import tech.ydb.yoj.databind.expression.OrderExpression; +import tech.ydb.yoj.repository.db.CommonTable; import tech.ydb.yoj.repository.db.Entity; import tech.ydb.yoj.repository.db.Entity.Id; import tech.ydb.yoj.repository.db.EntityIdSchema; @@ -16,7 +17,6 @@ import tech.ydb.yoj.repository.db.Tx; import tech.ydb.yoj.repository.db.ViewSchema; import tech.ydb.yoj.repository.db.bulk.BulkParams; -import tech.ydb.yoj.repository.db.cache.FirstLevelCache; import tech.ydb.yoj.repository.db.cache.TransactionLocal; import tech.ydb.yoj.repository.db.readtable.ReadTableParams; import tech.ydb.yoj.repository.db.statement.Changeset; @@ -235,6 +235,11 @@ public V find(Class viewType, Entity.Id id) { return res.isEmpty() ? null : res.get(0); } + @Override + public > List find(Set ids) { + return CommonTable.find(executor.getTransactionLocal(), this, ids); + } + @Override public > List find(Range range) { return postLoad(executor.execute(YqlStatement.findRange(type, range), range)); diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java index baf7445e..108dd6ee 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java @@ -7,6 +7,7 @@ import lombok.NonNull; import tech.ydb.yoj.databind.expression.FilterExpression; import tech.ydb.yoj.databind.expression.OrderExpression; +import tech.ydb.yoj.repository.db.CommonTable; import tech.ydb.yoj.repository.db.Entity; import tech.ydb.yoj.repository.db.Entity.Id; import tech.ydb.yoj.repository.db.EntityIdSchema; @@ -234,6 +235,11 @@ public V find(Class viewType, Entity.Id id) { return res.isEmpty() ? null : res.get(0); } + @Override + public > List find(Set ids) { + return CommonTable.find(executor.getTransactionLocal(), this, ids); + } + @Override public > List find(Range range) { return postLoad(executor.execute(YqlStatement.findRange(type, range), range)); diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/CommonTable.java b/repository/src/main/java/tech/ydb/yoj/repository/db/CommonTable.java new file mode 100644 index 00000000..81ce3d66 --- /dev/null +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/CommonTable.java @@ -0,0 +1,68 @@ +package tech.ydb.yoj.repository.db; + +import com.google.common.collect.Sets; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import tech.ydb.yoj.repository.db.cache.TransactionLocal; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.toSet; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class CommonTable { + public static , ID extends Entity.Id> List find( + TransactionLocal transactionLocal, Table table, Set ids) { + if (ids.isEmpty()) { + return List.of(); + } + + var orderBy = EntityExpressions.defaultOrder(table.getType()); + var cache = transactionLocal.firstLevelCache(); + var isPartialIdMode = ids.iterator().next().isPartial(); + + var foundInCache = ids.stream() + .filter(cache::containsKey) + .map(cache::peek) + .flatMap(Optional::stream) + .collect(Collectors.toMap(Entity::getId, Function.identity())); + var remainingIds = Sets.difference(ids, foundInCache.keySet()); + var foundInDb = table.findUncached(remainingIds, null, orderBy, null); + + var merged = new HashMap, E>(); + + // some entries found in db with partial id query may already be in cache (after update/delete), + // so we must return actual entries from cache + for (var entry : foundInDb) { + var id = entry.getId(); + if (cache.containsKey(id)) { + var cached = cache.peek(id); + cached.ifPresent(t -> merged.put(id, t)); + // not present means marked as deleted in cache + } else { + merged.put(id, table.postLoad(entry)); + } + } + + // add entries found in cache and not fetched from db + for (var pair : foundInCache.entrySet()) { + var id = pair.getKey(); + var entry = pair.getValue(); + merged.put(id, entry); + } + + if (!isPartialIdMode) { + Set> foundInDbIds = foundInDb.stream().map(Entity::getId).collect(toSet()); + Set> foundInCacheIds = new HashSet<>(foundInCache.keySet()); + Sets.difference(Sets.difference(ids, foundInDbIds), foundInCacheIds).forEach(cache::putEmpty); + } + + return merged.values().stream().sorted(EntityIdSchema.SORT_ENTITY_BY_ID).collect(Collectors.toList()); + } +} diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/Table.java b/repository/src/main/java/tech/ydb/yoj/repository/db/Table.java index 92da6abe..a2140eea 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/Table.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/Table.java @@ -1,6 +1,5 @@ package tech.ydb.yoj.repository.db; -import com.google.common.collect.Sets; import lombok.NonNull; import tech.ydb.yoj.databind.expression.FilterExpression; import tech.ydb.yoj.databind.expression.OrderExpression; @@ -14,8 +13,6 @@ import javax.annotation.CheckForNull; import javax.annotation.Nullable; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -25,7 +22,6 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; import static java.util.stream.Stream.concat; public interface Table> { @@ -42,6 +38,8 @@ public interface Table> { V find(Class viewType, Entity.Id id); + > List find(Set ids); + > List find(Range range); > List findIds(Range range); @@ -265,54 +263,6 @@ default ViewListResult list(Class viewType, List return ViewListResult.forPage(request, viewType, nextPage); } - default > List find(Set ids) { - if (ids.isEmpty()) { - return List.of(); - } - - var orderBy = EntityExpressions.defaultOrder(getType()); - var cache = Tx.Current.get().getRepositoryTransaction().getTransactionLocal().firstLevelCache(); - var isPartialIdMode = ids.iterator().next().isPartial(); - - var foundInCache = ids.stream() - .filter(cache::containsKey) - .map(cache::peek) - .flatMap(Optional::stream) - .collect(Collectors.toMap(Entity::getId, Function.identity())); - var remainingIds = Sets.difference(ids, foundInCache.keySet()); - var foundInDb = findUncached(remainingIds, null, orderBy, null); - - var merged = new HashMap, T>(); - - // some entries found in db with partial id query may already be in cache (after update/delete), - // so we must return actual entries from cache - for (var entry : foundInDb) { - var id = entry.getId(); - if (cache.containsKey(id)) { - var cached = cache.peek(id); - cached.ifPresent(t -> merged.put(id, t)); - // not present means marked as deleted in cache - } else { - merged.put(id, this.postLoad(entry)); - } - } - - // add entries found in cache and not fetched from db - for (var pair : foundInCache.entrySet()) { - var id = pair.getKey(); - var entry = pair.getValue(); - merged.put(id, entry); - } - - if (!isPartialIdMode) { - Set> foundInDbIds = foundInDb.stream().map(Entity::getId).collect(toSet()); - Set> foundInCacheIds = new HashSet<>(foundInCache.keySet()); - Sets.difference(Sets.difference(ids, foundInDbIds), foundInCacheIds).forEach(cache::putEmpty); - } - - return merged.values().stream().sorted(EntityIdSchema.SORT_ENTITY_BY_ID).collect(Collectors.toList()); - } - default void bulkUpsert(List input, BulkParams params) { throw new UnsupportedOperationException(); }