Skip to content

Commit

Permalink
Merge pull request #159 from fauna/stats-collector-impl
Browse files Browse the repository at this point in the history
Add stats collector interface and impl
  • Loading branch information
pnwpedro authored Oct 29, 2024
2 parents 8c3f4d4 + 9e8c0f9 commit 7ff2777
Show file tree
Hide file tree
Showing 4 changed files with 380 additions and 0 deletions.
123 changes: 123 additions & 0 deletions src/main/java/com/fauna/client/QueryStatsSummary.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.fauna.client;

/**
* A class for representing aggregate query stats. This should be used when collecting query stats
* across multiple requests.
* <p>
* For a single request, use @link com.fauna.response.QueryStats instead.
*/
public final class QueryStatsSummary {
private final long readOps;
private final long computeOps;
private final long writeOps;
private final long queryTimeMs;
private final int contentionRetries;
private final long storageBytesRead;
private final long storageBytesWrite;
private final long processingTimeMs;
private final int queryCount;

private final int rateLimitedReadQueryCount;
private final int rateLimitedComputeQueryCount;
private final int rateLimitedWriteQueryCount;

public QueryStatsSummary(
long readOps,
long computeOps,
long writeOps,
long queryTimeMs,
int contentionRetries,
long storageBytesRead,
long storageBytesWrite,
long processingTimeMs,
int queryCount,
int rateLimitedReadQueryCount,
int rateLimitedComputeQueryCount,
int rateLimitedWriteQueryCount
) {
this.readOps = readOps;
this.computeOps = computeOps;
this.writeOps = writeOps;
this.queryTimeMs = queryTimeMs;
this.contentionRetries = contentionRetries;
this.storageBytesRead = storageBytesRead;
this.storageBytesWrite = storageBytesWrite;
this.processingTimeMs = processingTimeMs;
this.queryCount = queryCount;
this.rateLimitedReadQueryCount = rateLimitedReadQueryCount;
this.rateLimitedComputeQueryCount = rateLimitedComputeQueryCount;
this.rateLimitedWriteQueryCount = rateLimitedWriteQueryCount;
}

/**
* Gets the aggregate read ops.
* @return A long representing the aggregate read ops
*/
public long getReadOps() { return readOps; }

/**
* Gets the aggregate compute ops.
* @return A long representing the aggregate compute ops
*/
public long getComputeOps() { return computeOps; }

/**
* Gets the aggregate write ops.
* @return A long representing the aggregate write ops
*/
public long getWriteOps() { return writeOps; }

/**
* Gets the aggregate query time in milliseconds.
* @return A long representing the aggregate query time in milliseconds.
*/
public long getQueryTimeMs() { return queryTimeMs; }

/**
* Gets the count of retries due to contention.
* @return An int representing the count of retries due to contention.
*/
public int getContentionRetries() { return contentionRetries; }

/**
* Gets the aggregate storage bytes read.
* @return A long representing the aggregate number of storage bytes read.
*/
public long getStorageBytesRead() { return storageBytesRead; }

/**
* Gets the aggregate storage bytes written.
* @return A long representing the aggregate number of storage bytes written.
*/
public long getStorageBytesWrite() { return storageBytesWrite; }

/**
* Gets the aggregate processing time in milliseconds.
* @return A long representing the aggregate processing time in milliseconds.
*/
public long getProcessingTimeMs() { return processingTimeMs; }

/**
* Gets the count of queries summarized on this instance.
* @return An int representing the count of queries summarized.
*/
public int getQueryCount() { return queryCount; }

/**
* Gets the count of rate limited queries due to read limits.
* @return An int representing the count of rate limited queries.
*/
public int getRateLimitedReadQueryCount() { return rateLimitedReadQueryCount; }

/**
* Gets the count of rate limited queries due to compute limits.
* @return An int representing the count of rate limited queries.
*/
public int getRateLimitedComputeQueryCount() { return rateLimitedComputeQueryCount; }

/**
* Gets the count of rate limited queries due to write limits.
* @return An int representing the count of rate limited queries.
*/
public int getRateLimitedWriteQueryCount() { return rateLimitedWriteQueryCount; }
}
24 changes: 24 additions & 0 deletions src/main/java/com/fauna/client/StatsCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.fauna.client;

import com.fauna.response.QueryStats;

public interface StatsCollector {

/**
* Add the QueryStats to the current counts.
* @param stats QueryStats object
*/
void add(QueryStats stats);

/**
* Return the collected Stats.
* @return Stats object
*/
QueryStatsSummary read();

/**
* Return the collected Stats and reset counts.
* @return Stats object
*/
QueryStatsSummary readAndReset();
}
93 changes: 93 additions & 0 deletions src/main/java/com/fauna/client/StatsCollectorImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.fauna.client;

import com.fauna.response.QueryStats;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class StatsCollectorImpl implements StatsCollector {

private static final String RATE_LIMIT_READ_OPS = "read";
private static final String RATE_LIMIT_COMPUTE_OPS = "compute";
private static final String RATE_LIMIT_WRITE_OPS = "write";

private final AtomicLong readOps = new AtomicLong();
private final AtomicLong computeOps = new AtomicLong();
private final AtomicLong writeOps = new AtomicLong();
private final AtomicLong queryTimeMs = new AtomicLong();
private final AtomicInteger contentionRetries = new AtomicInteger();
private final AtomicLong storageBytesRead = new AtomicLong();
private final AtomicLong storageBytesWrite = new AtomicLong();
private final AtomicLong processingTimeMs = new AtomicLong();
private final AtomicInteger queryCount = new AtomicInteger();
private final AtomicInteger rateLimitedReadQueryCount = new AtomicInteger();
private final AtomicInteger rateLimitedComputeQueryCount = new AtomicInteger();
private final AtomicInteger rateLimitedWriteQueryCount = new AtomicInteger();

@Override
public void add(QueryStats stats) {
readOps.addAndGet(stats.readOps);
computeOps.addAndGet(stats.computeOps);
writeOps.addAndGet(stats.writeOps);
queryTimeMs.addAndGet(stats.queryTimeMs);
contentionRetries.addAndGet(stats.contentionRetries);
storageBytesRead.addAndGet(stats.storageBytesRead);
storageBytesWrite.addAndGet(stats.storageBytesWrite);
processingTimeMs.addAndGet(stats.processingTimeMs);

List<String> rateLimitsHit = stats.rateLimitsHit;
rateLimitsHit.forEach(limitHit -> {
switch (limitHit) {
case RATE_LIMIT_READ_OPS:
rateLimitedReadQueryCount.incrementAndGet();
break;
case RATE_LIMIT_COMPUTE_OPS:
rateLimitedComputeQueryCount.incrementAndGet();
break;
case RATE_LIMIT_WRITE_OPS:
rateLimitedWriteQueryCount.incrementAndGet();
break;
}
});

queryCount.incrementAndGet();
}

@Override
public QueryStatsSummary read() {
return new QueryStatsSummary(
readOps.get(),
computeOps.get(),
writeOps.get(),
queryTimeMs.get(),
contentionRetries.get(),
storageBytesRead.get(),
storageBytesWrite.get(),
processingTimeMs.get(),
queryCount.get(),
rateLimitedReadQueryCount.get(),
rateLimitedComputeQueryCount.get(),
rateLimitedWriteQueryCount.get()
);
}

@Override
public QueryStatsSummary readAndReset() {
return new QueryStatsSummary(
readOps.getAndSet(0),
computeOps.getAndSet(0),
writeOps.getAndSet(0),
queryTimeMs.getAndSet(0),
contentionRetries.getAndSet(0),
storageBytesRead.getAndSet(0),
storageBytesWrite.getAndSet(0),
processingTimeMs.getAndSet(0),
queryCount.getAndSet(0),
rateLimitedReadQueryCount.getAndSet(0),
rateLimitedComputeQueryCount.getAndSet(0),
rateLimitedWriteQueryCount.getAndSet(0)
);
}
}

140 changes: 140 additions & 0 deletions src/test/java/com/fauna/client/TestStatsCollectorImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package com.fauna.client;

import com.fauna.response.QueryStats;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;

import java.util.Arrays;
import java.util.Collections;

public class TestStatsCollectorImpl {

private StatsCollectorImpl statsCollector;

@BeforeEach
public void setUp() {
statsCollector = new StatsCollectorImpl();
}

@Test
public void testAdd_singleQueryStats_updatesCorrectly() {
// Arrange
QueryStats stats = new QueryStats(
10,
20,
5,
100,
1,
500,
300,
50,
Arrays.asList("read", "compute")
);

// Act
statsCollector.add(stats);

// Assert
QueryStatsSummary result = statsCollector.read();
assertEquals(10, result.getComputeOps());
assertEquals(20, result.getReadOps());
assertEquals(5, result.getWriteOps());
assertEquals(100, result.getQueryTimeMs());
assertEquals(1, result.getContentionRetries());
assertEquals(500, result.getStorageBytesRead());
assertEquals(300, result.getStorageBytesWrite());
assertEquals(50, result.getProcessingTimeMs());
assertEquals(1, result.getQueryCount());
assertEquals(1, result.getRateLimitedReadQueryCount());
assertEquals(1, result.getRateLimitedComputeQueryCount());
assertEquals(0, result.getRateLimitedWriteQueryCount());
}

@Test
public void testAdd_multipleQueryStats_accumulatesValuesCorrectly() {
// Arrange
QueryStats stats1 = new QueryStats(10, 20, 5, 100, 1, 500, 300, 30, Collections.singletonList("read"));
QueryStats stats2 = new QueryStats(15, 25, 10, 200, 2, 600, 400, 40, Collections.singletonList("write"));

// Act
statsCollector.add(stats1);
statsCollector.add(stats2);

// Assert
QueryStatsSummary result = statsCollector.read();
assertEquals(25, result.getComputeOps());
assertEquals(45, result.getReadOps());
assertEquals(15, result.getWriteOps());
assertEquals(300, result.getQueryTimeMs());
assertEquals(3, result.getContentionRetries());
assertEquals(1100, result.getStorageBytesRead());
assertEquals(700, result.getStorageBytesWrite());
assertEquals(70, result.getProcessingTimeMs());
assertEquals(2, result.getQueryCount());
assertEquals(1, result.getRateLimitedReadQueryCount());
assertEquals(0, result.getRateLimitedComputeQueryCount());
assertEquals(1, result.getRateLimitedWriteQueryCount());
}

@Test
public void testRead_initialStats_returnsZeroStats() {
// Act
QueryStatsSummary result = statsCollector.read();

// Assert
assertEquals(0, result.getComputeOps());
assertEquals(0, result.getReadOps());
assertEquals(0, result.getWriteOps());
assertEquals(0, result.getQueryTimeMs());
assertEquals(0, result.getContentionRetries());
assertEquals(0, result.getStorageBytesRead());
assertEquals(0, result.getStorageBytesWrite());
assertEquals(0, result.getProcessingTimeMs());
assertEquals(0, result.getQueryCount());
assertEquals(0, result.getRateLimitedReadQueryCount());
assertEquals(0, result.getRateLimitedComputeQueryCount());
assertEquals(0, result.getRateLimitedWriteQueryCount());
}

@Test
public void testReadAndReset_returnsAndResetsStats() {
// Arrange
QueryStats stats = new QueryStats(
10, 20, 5, 100, 1, 500, 300, 75, Arrays.asList("read", "write")
);
statsCollector.add(stats);

// Act
QueryStatsSummary beforeReset = statsCollector.readAndReset();
QueryStatsSummary afterReset = statsCollector.read();

// Assert the stats before reset
assertEquals(10, beforeReset.getComputeOps());
assertEquals(20, beforeReset.getReadOps());
assertEquals(5, beforeReset.getWriteOps());
assertEquals(100, beforeReset.getQueryTimeMs());
assertEquals(1, beforeReset.getContentionRetries());
assertEquals(500, beforeReset.getStorageBytesRead());
assertEquals(300, beforeReset.getStorageBytesWrite());
assertEquals(75, beforeReset.getProcessingTimeMs());
assertEquals(1, beforeReset.getQueryCount());
assertEquals(1, beforeReset.getRateLimitedReadQueryCount());
assertEquals(0, beforeReset.getRateLimitedComputeQueryCount());
assertEquals(1, beforeReset.getRateLimitedWriteQueryCount());

// Assert the stats after reset
assertEquals(0, afterReset.getReadOps());
assertEquals(0, afterReset.getComputeOps());
assertEquals(0, afterReset.getWriteOps());
assertEquals(0, afterReset.getQueryTimeMs());
assertEquals(0, afterReset.getContentionRetries());
assertEquals(0, afterReset.getStorageBytesRead());
assertEquals(0, afterReset.getStorageBytesWrite());
assertEquals(0, afterReset.getProcessingTimeMs());
assertEquals(0, afterReset.getQueryCount());
assertEquals(0, afterReset.getRateLimitedReadQueryCount());
assertEquals(0, afterReset.getRateLimitedComputeQueryCount());
assertEquals(0, afterReset.getRateLimitedWriteQueryCount());
}
}

0 comments on commit 7ff2777

Please sign in to comment.