Skip to content

Commit

Permalink
[core] Cache statistics in AbstractFileStoreTable (#4601)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Nov 27, 2024
1 parent 7e4148a commit fe69313
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,13 @@ private void putTableCache(Identifier identifier, Table table) {
.maximumSize(snapshotMaxNumPerTable)
.executor(Runnable::run)
.build());
storeTable.setStatsCache(
Caffeine.newBuilder()
.softValues()
.expireAfterAccess(expirationInterval)
.maximumSize(5)
.executor(Runnable::run)
.build());
if (manifestCache != null) {
storeTable.setManifestCache(manifestCache);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ public Optional<Statistics> readStats(long snapshotId) {
}

public Optional<Statistics> readStats(Snapshot snapshot) {
if (snapshot.statistics() == null) {
return Optional.empty();
} else {
Statistics stats = statsFile.read(snapshot.statistics());
stats.deserializeFieldsFromString(schemaManager.schema(stats.schemaId()));
return Optional.of(stats);
}
String file = snapshot.statistics();
return file == null ? Optional.empty() : Optional.of(readStats(file));
}

public Statistics readStats(String file) {
Statistics stats = statsFile.read(file);
stats.deserializeFieldsFromString(schemaManager.schema(stats.schemaId()));
return stats;
}

/** Delete stats of the specified snapshot. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,18 @@
abstract class AbstractFileStoreTable implements FileStoreTable {

private static final long serialVersionUID = 1L;

private static final String WATERMARK_PREFIX = "watermark-";

protected final FileIO fileIO;
protected final Path path;
protected final TableSchema tableSchema;
protected final CatalogEnvironment catalogEnvironment;

@Nullable protected transient SegmentsCache<Path> manifestCache;
@Nullable protected transient Cache<Path, Snapshot> snapshotCache;
@Nullable protected transient Cache<String, Statistics> statsCache;

protected AbstractFileStoreTable(
FileIO fileIO,
Path path,
Expand All @@ -122,14 +127,21 @@ public String currentBranch() {

@Override
public void setManifestCache(SegmentsCache<Path> manifestCache) {
this.manifestCache = manifestCache;
store().setManifestCache(manifestCache);
}

@Override
public void setSnapshotCache(Cache<Path, Snapshot> cache) {
this.snapshotCache = cache;
store().setSnapshotCache(cache);
}

@Override
public void setStatsCache(Cache<String, Statistics> cache) {
this.statsCache = cache;
}

@Override
public OptionalLong latestSnapshotId() {
Long snapshot = store().snapshotManager().latestSnapshotId();
Expand Down Expand Up @@ -187,7 +199,21 @@ public String uuid() {
public Optional<Statistics> statistics() {
Snapshot snapshot = TimeTravelUtil.resolveSnapshot(this);
if (snapshot != null) {
return store().newStatsFileHandler().readStats(snapshot);
String file = snapshot.statistics();
if (file == null) {
return Optional.empty();
}
if (statsCache != null) {
Statistics stats = statsCache.getIfPresent(file);
if (stats != null) {
return Optional.of(stats);
}
}
Statistics stats = store().newStatsFileHandler().readStats(file);
if (statsCache != null) {
statsCache.put(file, stats);
}
return Optional.of(stats);
}
return Optional.empty();
}
Expand Down Expand Up @@ -342,9 +368,22 @@ public FileStoreTable copyWithLatestSchema() {

@Override
public FileStoreTable copy(TableSchema newTableSchema) {
return newTableSchema.primaryKeys().isEmpty()
? new AppendOnlyFileStoreTable(fileIO, path, newTableSchema, catalogEnvironment)
: new PrimaryKeyFileStoreTable(fileIO, path, newTableSchema, catalogEnvironment);
AbstractFileStoreTable copied =
newTableSchema.primaryKeys().isEmpty()
? new AppendOnlyFileStoreTable(
fileIO, path, newTableSchema, catalogEnvironment)
: new PrimaryKeyFileStoreTable(
fileIO, path, newTableSchema, catalogEnvironment);
if (snapshotCache != null) {
copied.setSnapshotCache(snapshotCache);
}
if (manifestCache != null) {
copied.setManifestCache(manifestCache);
}
if (statsCache != null) {
copied.setStatsCache(statsCache);
}
return copied;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public void setSnapshotCache(Cache<Path, Snapshot> cache) {
wrapped.setSnapshotCache(cache);
}

@Override
public void setStatsCache(Cache<String, Statistics> cache) {
wrapped.setStatsCache(cache);
}

@Override
public TableSchema schema() {
return wrapped.schema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
Expand All @@ -47,6 +48,8 @@ public interface FileStoreTable extends DataTable {

void setSnapshotCache(Cache<Path, Snapshot> cache);

void setStatsCache(Cache<String, Statistics> cache);

@Override
default RowType rowType() {
return schema().logicalRowType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ public void testSnapshotCache() throws Exception {

Snapshot snapshot = table.snapshot(1);
assertThat(snapshot).isSameAs(table.snapshot(1));

// copy
Snapshot copied = table.copy(Collections.singletonMap("a", "b")).snapshot(1);
assertThat(copied).isSameAs(snapshot);
}

@Test
Expand Down Expand Up @@ -386,7 +390,8 @@ private void innerTestManifestCache(long manifestCacheThreshold) throws Exceptio

// repeat read
for (int i = 0; i < 5; i++) {
table = catalog.getTable(tableIdent);
// test copy too
table = catalog.getTable(tableIdent).copy(Collections.singletonMap("a", "b"));
ReadBuilder readBuilder = table.newReadBuilder();
TableScan scan = readBuilder.newScan();
TableRead read = readBuilder.newRead();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.apache.paimon.data.Decimal;
import org.apache.paimon.stats.ColStats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.DateTimeUtils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;

Expand All @@ -51,7 +53,8 @@ public void testAnalyzeTable() throws Catalog.TableNotExistException {
sql("INSERT INTO T VALUES ('2', 'aaa', 1, 2)");
sql("ANALYZE TABLE T COMPUTE STATISTICS");

Optional<Statistics> statisticsOpt = paimonTable("T").statistics();
FileStoreTable table = paimonTable("T");
Optional<Statistics> statisticsOpt = table.statistics();
assertThat(statisticsOpt.isPresent()).isTrue();
Statistics stats = statisticsOpt.get();

Expand All @@ -60,6 +63,16 @@ public void testAnalyzeTable() throws Catalog.TableNotExistException {

Assertions.assertTrue(stats.mergedRecordSize().isPresent());
Assertions.assertTrue(stats.colStats().isEmpty());

// by default, caching catalog should cache it
Optional<Statistics> newStats = table.statistics();
assertThat(newStats.isPresent()).isTrue();
assertThat(newStats.get()).isSameAs(stats);

// copy the table
newStats = table.copy(Collections.singletonMap("a", "b")).statistics();
assertThat(newStats.isPresent()).isTrue();
assertThat(newStats.get()).isSameAs(stats);
}

@Test
Expand Down

0 comments on commit fe69313

Please sign in to comment.