Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add coordinator MEMORY_BYTES sensor for reads #1452

Draft
wants to merge 56 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
54b3b28
CNDB-8501 Propagate request sensors via native CQL custom payload
aymkhalil Sep 21, 2024
7613307
Propagate READ_BYTES only
aymkhalil Oct 2, 2024
c755d86
Merge branch 'main' into cndb-8501
aymkhalil Oct 8, 2024
ee9566e
Add WRITE_BYTES and customize header prefix
aymkhalil Oct 15, 2024
8bf72a9
Fix typo
aymkhalil Oct 15, 2024
1e3092a
Fix CounterMutationCallbackTest
aymkhalil Oct 15, 2024
55454d9
Track replica sensors in ResponseVerbHandler + refactor custom sensor…
aymkhalil Oct 18, 2024
f8a1fc6
Add ReplicaSensorsTrackedTest unit tests, more tests in SensorsTest d…
aymkhalil Oct 18, 2024
7c19ca5
Fix CounterMutationCallbackTest
aymkhalil Oct 18, 2024
2373ede
Cleanup
aymkhalil Oct 21, 2024
1311836
Remove unnecessary instanceof check
aymkhalil Oct 21, 2024
78ef0b5
SensorEncoder java docs
aymkhalil Oct 21, 2024
42a41f2
Address generic feedback (javadocs, code style, leftovers, class pack…
aymkhalil Oct 23, 2024
31dfa8f
Add SensorEncoder interface
aymkhalil Oct 23, 2024
fe0628b
table -> global everywhere
aymkhalil Oct 23, 2024
7a9c72f
Register sensors before the mutate call
aymkhalil Oct 23, 2024
42b8ebf
Add to instead of replacing custom payload
aymkhalil Oct 24, 2024
0a9b1c9
Avpid copying customPayload map for now
aymkhalil Oct 24, 2024
32ee16e
Refactor RequestSensorsFactory to SensorsFactory
aymkhalil Nov 6, 2024
034183f
SensorsCustomParams java docs
aymkhalil Nov 6, 2024
d39b40a
Propagate CAS sensors
aymkhalil Nov 7, 2024
bd952b4
Propagate batch insert sensors
aymkhalil Nov 7, 2024
dfafba6
Propagate range query sensors
aymkhalil Nov 7, 2024
b9b4f53
Refactor SensorsTest dtest
aymkhalil Nov 7, 2024
e01c266
Minor types + refactoring of raw type usages
aymkhalil Nov 8, 2024
26b8500
Merge branch 'main' into cndb-8501
aymkhalil Nov 8, 2024
f137fa4
Fix race condition when RequestCallback signals completion before sen…
aymkhalil Nov 13, 2024
f9c7af5
Update SensorsFactory#createRequestSensors to accept array of keyspac…
aymkhalil Nov 13, 2024
833fd89
Decouple WriteCallbackInfo#shouldHint from mutation nullability & int…
aymkhalil Nov 13, 2024
0b74852
erge branch 'main' into cndb-8501
aymkhalil Nov 14, 2024
6287335
Add Counter mutation dtest
aymkhalil Nov 14, 2024
d4a3d6e
Fix Invalid test class 'org.apache.cassandra.distributed.test.sensors…
aymkhalil Nov 14, 2024
35ba7ce
Squash AbstractSensorsTest & SensorsTest into one class
aymkhalil Nov 14, 2024
bc11b24
Rename REQUEST_SENSORS_VIA_NATIVE_PROTOCOL -> REQUEST_VIA_NATIVE_PROT…
aymkhalil Dec 2, 2024
0bc81cd
Fix typos acutal -> actual
aymkhalil Dec 2, 2024
b752ae8
Collect unique contexts from batch statements for adding sensors
aymkhalil Dec 2, 2024
e082a6f
Don't inline if for readability
aymkhalil Dec 2, 2024
118d27c
Reuse addSensorsToInternodeResponse in CounterMutationCallback
aymkhalil Dec 2, 2024
d9d70c0
Refactor RequestCallbacks#getIMutaiton to iMutation()
aymkhalil Dec 2, 2024
e4e18fc
Fix ActiveSensorsFactory javadocs
aymkhalil Dec 2, 2024
a77b5e6
Switch from ByteBuffer.allocate to ByteBuffer.wrap to spare some memo…
aymkhalil Dec 2, 2024
425d27b
Fix SensorsFactory javadocs
aymkhalil Dec 2, 2024
be77507
Fix SensorsCustomParams javadocs
aymkhalil Dec 2, 2024
b4b0076
Remove leading li tag spaces in SensorsFactory
aymkhalil Dec 2, 2024
6682ba8
Remove extra colon in StorageProxy
aymkhalil Dec 2, 2024
670f755
Better methods names in ReplicaSensorsTrackingTest
aymkhalil Dec 2, 2024
795d089
Merge branch 'main' into cndb-8501
aymkhalil Dec 2, 2024
98c0731
Refactor SensorEncoder methods to return optional & have better names
aymkhalil Dec 3, 2024
ff97dec
CNDB-11544 Add coordinator MEMORY_BYTES sensor for reads
aymkhalil Dec 4, 2024
e65faa9
Add coordinator INTERNODE_BYTES sensor for reads
aymkhalil Dec 5, 2024
4123198
Sync sensors on finally blocks
aymkhalil Dec 6, 2024
050ea94
Revert "Add coordinator INTERNODE_BYTES sensor for reads"
aymkhalil Dec 10, 2024
9f345e8
Add dtest for read
aymkhalil Dec 10, 2024
bc05df7
MEMORY_BYTES -> IN_MEMORY_BYTES
aymkhalil Dec 11, 2024
c56d960
Fix testSensorsTrackedForReadCallback by creating concrete DataResponse
aymkhalil Dec 11, 2024
cfc8278
Fix NPE when calling message#serializedSize
aymkhalil Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Propagate READ_BYTES only
  • Loading branch information
aymkhalil committed Oct 2, 2024
commit 7613307a497406d80159bef3f24a9ae0df69b45d
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,12 @@ public enum CassandraRelevantProperties
* Allows custom implementation of {@link org.apache.cassandra.sensors.RequestSensorsFactory} to optionally create
* and configure {@link org.apache.cassandra.sensors.RequestSensors} instances.
*/
REQUEST_SENSORS_FACTORY("cassandra.request_sensors_factory_class");
REQUEST_SENSORS_FACTORY("cassandra.request_sensors_factory_class"),

/**
* If true, the coordinator will propagate request sensors via the native protocol custom payload flag.
*/
PROPAGATE_REQUEST_SENSORS_VIA_NATIVE_PROTOCAL("cassandra.propagate_request_sensors_via_native_protocal", "false");;

CassandraRelevantProperties(String key, String defaultVal)
{
Expand Down
49 changes: 0 additions & 49 deletions src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@
*/
package org.apache.cassandra.cql3.statements;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import com.google.common.collect.ImmutableMap;

import org.apache.cassandra.audit.AuditLogContext;
import org.apache.cassandra.audit.AuditLogEntryType;
Expand All @@ -35,17 +30,8 @@
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.commons.lang3.builder.ToStringBuilder;
Expand Down Expand Up @@ -124,17 +110,6 @@ public void addUpdateForKey(PartitionUpdate.Builder update, Slice slice, UpdateP
throw new UnsupportedOperationException();
}

@Override
public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime)
{
ResultMessage result = super.execute(state, options, queryStartNanoTime);

if (result == null) result = new ResultMessage.Void();
addWriteSensorData(result, options);

return result;
}

public static class ParsedInsert extends ModificationStatement.Parsed
{
private final List<ColumnIdentifier> columnNames;
Expand Down Expand Up @@ -368,28 +343,4 @@ public AuditLogContext getAuditLogContext()
{
return new AuditLogContext(AuditLogEntryType.UPDATE, keyspace(), columnFamily());
}

private void addWriteSensorData(ResultMessage msg, QueryOptions options)
{
// Custom payload is not supported for protocol versions < 4
if (options.getProtocolVersion().isSmallerThan(ProtocolVersion.V4))
{
return;
}

RequestSensors requestSensors = RequestTracker.instance.get();
if (requestSensors == null)
{
return;
}

Context contex = Context.from(this.metadata());
Optional<Sensor> writeRequestSensor = RequestTracker.instance.get().getSensor(contex, Type.WRITE_BYTES);
writeRequestSensor.ifPresent(sensor -> {
ByteBuffer bytes = SensorsCustomParams.sensorValueAsByteBuffer(sensor.getValue());
String headerName = SensorsCustomParams.encodeTableInWriteBytesRequestParam(this.metadata().name);
Map<String, ByteBuffer> sensorHeader = ImmutableMap.of(headerName, bytes);
msg.setCustomPayload(sensorHeader);
});
}
}
13 changes: 6 additions & 7 deletions src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
Expand Down Expand Up @@ -121,7 +122,9 @@
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.ActiveRequestSensors;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.NoOpRequestSensors;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestSensorsFactory;
import org.apache.cassandra.sensors.Type;
Expand Down Expand Up @@ -1066,11 +1069,6 @@ public static void mutate(List<? extends IMutation> mutations,

QueryInfoTracker.WriteTracker writeTracker = queryTracker().onWrite(state, false, mutations, consistencyLevel);

// Request sensors are utilized to track usages from all replicas serving a write request
RequestSensors requestSensors = RequestSensorsFactory.instance.create(state.getKeyspace());
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

long startTime = System.nanoTime();

List<AbstractWriteResponseHandler<IMutation>> responseHandlers = new ArrayList<>(mutations.size());
Expand Down Expand Up @@ -1945,8 +1943,9 @@ public static PartitionIterator read(SinglePartitionReadCommand.Group group,
group.metadata(),
group.queries,
consistencyLevel);
// Request sensors are utilized to track usages from all replicas serving a read request
RequestSensors requestSensors = RequestSensorsFactory.instance.create(group.metadata().keyspace);
// Request sensors are utilized to track usages from replicas serving a read request
RequestSensors requestSensors = CassandraRelevantProperties.PROPAGATE_REQUEST_SENSORS_VIA_NATIVE_PROTOCAL.getBoolean() ?
new ActiveRequestSensors() : NoOpRequestSensors.instance;
Context context = Context.from(group.metadata());
requestSensors.registerSensor(context, Type.READ_BYTES);
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
Expand Down
41 changes: 0 additions & 41 deletions src/java/org/apache/cassandra/service/WriteResponseHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,14 @@
*/
package org.apache.cassandra.service;

import java.util.Map;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.ReplicaPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.net.Message;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Type;

/**
* Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels.
Expand Down Expand Up @@ -62,10 +54,7 @@ public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, WriteType wri
public void onResponse(Message<T> m)
{
if (responsesUpdater.decrementAndGet(this) == 0)
{
estimateAndIncrementWriteSensor(m);
signal();
}
//Must be last after all subclass processing
//The two current subclasses both assume logResponseToIdealCLDelegate is called
//here.
Expand All @@ -76,34 +65,4 @@ public int ackCount()
{
return blockFor() - responses;
}

private void estimateAndIncrementWriteSensor(Message<T> msg)
{
RequestSensors requestSensors = RequestTracker.instance.get();
if (requestSensors == null)
return;

if (msg == null)
return;

Map<String, byte[]> customParams = msg.header.customParams();
if (customParams == null)
return;

if (!(msg instanceof IMutation))
return;

IMutation mutation = (IMutation) msg;
for (PartitionUpdate pu: mutation.getPartitionUpdates())
{
String headerName = SensorsCustomParams.encodeTableInWriteBytesRequestParam(pu.metadata().name);
byte[] writeBytes = msg.header.customParams().get(headerName);
if (writeBytes == null)
continue;
double adjustedSensorValue = SensorsCustomParams.sensorValueFromBytes(writeBytes) * candidateReplicaCount();
Context context = Context.from(pu.metadata());
requestSensors.registerSensor(context, Type.WRITE_BYTES);
requestSensors.incrementSensor(context, Type.WRITE_BYTES, adjustedSensorValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.PageSize;
import org.apache.cassandra.cql3.QueryHandler;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.SelectStatement;
Expand Down Expand Up @@ -59,51 +58,66 @@ public static void setup()
CassandraRelevantProperties.REQUEST_SENSORS_FACTORY.setString(ActiveRequestSensorsFactory.class.getName());
}
@Test
public void testSensorsInResultMessage() throws Throwable
public void testSensorsInResultMessageEnabled() throws Throwable
{
CassandraRelevantProperties.PROPAGATE_REQUEST_SENSORS_VIA_NATIVE_PROTOCAL.setBoolean(true);
try (Cluster cluster = builder().withNodes(1).start())
{
// resister a noop sensor listener before init(cluster) which creates the test keyspace to ensure that the registry singleton instance is subscribed to schema notifications
cluster.get(1).runsOnInstance(initSensorsRegistry()).run();
init(cluster);
cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY, v1 text)"));
String write = withKeyspace("INSERT INTO %s.tbl(pk, v1) VALUES (1, 'read me')");
cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl(pk, v1) VALUES (1, 'read me')"));

String query = withKeyspace("SELECT * FROM %s.tbl WHERE pk=1");

// Any methods used inside the runOnInstance() block should be static, otherwise java.io.NotSerializableException will be thrown
cluster.get(1).runOnInstance(() -> {
ResultMessage writeResult = executeWithResult(write);
Map<String, ByteBuffer> customPayload = writeResult.getCustomPayload();

double writeBytesRequest = getWriteBytesRequest(customPayload, "tbl");
Assertions.assertThat(writeBytesRequest).isGreaterThan(0D);

ResultMessage.Rows readResult = executeWithPagingWithResultMessage(query);
customPayload = readResult.getCustomPayload();
assertReadBytesHeadersExist(customPayload);
Map<String, ByteBuffer> customPayload = readResult.getCustomPayload();
assertReadBytesHeadersExist(customPayload, true);

double readBytesRequest = getReadBytesRequest(customPayload);
Assertions.assertThat(readBytesRequest).isGreaterThan(0D);
});
}
}

private static void assertReadBytesHeadersExist(Map<String, ByteBuffer> customPayload)
@Test
public void testSensorsInResultMessageDisabled() throws Throwable
{
Assertions.assertThat(customPayload).containsKey(SensorsCustomParams.READ_BYTES_REQUEST);
CassandraRelevantProperties.PROPAGATE_REQUEST_SENSORS_VIA_NATIVE_PROTOCAL.setBoolean(false);
try (Cluster cluster = builder().withNodes(1).start())
{
// resister a noop sensor listener before init(cluster) which creates the test keyspace to ensure that the registry singleton instance is subscribed to schema notifications
cluster.get(1).runsOnInstance(initSensorsRegistry()).run();
init(cluster);
cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY, v1 text)"));
cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl(pk, v1) VALUES (1, 'read me')"));

String query = withKeyspace("SELECT * FROM %s.tbl WHERE pk=1");

// Any methods used inside the runOnInstance() block should be static, otherwise java.io.NotSerializableException will be thrown
cluster.get(1).runOnInstance(() -> {
ResultMessage.Rows readResult = executeWithPagingWithResultMessage(query);
Map<String, ByteBuffer> customPayload = readResult.getCustomPayload();
assertReadBytesHeadersExist(customPayload, false);
});
}
}

private static double getReadBytesRequest(Map<String, ByteBuffer> customPayload)
private static void assertReadBytesHeadersExist(Map<String, ByteBuffer> customPayload, boolean exists)
{
Assertions.assertThat(customPayload).containsKey(SensorsCustomParams.READ_BYTES_REQUEST);
return ByteBufferUtil.toDouble(customPayload.get(SensorsCustomParams.READ_BYTES_REQUEST));
if (exists)
Assertions.assertThat(customPayload).containsKey(SensorsCustomParams.READ_BYTES_REQUEST);
else if (customPayload != null)
Assertions.assertThat(customPayload).doesNotContainKey(SensorsCustomParams.READ_BYTES_REQUEST);
}

private static double getWriteBytesRequest(Map<String, ByteBuffer> customPayload, String table)
private static double getReadBytesRequest(Map<String, ByteBuffer> customPayload)
{
String headerName = String.format(SensorsCustomParams.WRITE_BYTES_REQUEST_TEMPLATE, table);
Assertions.assertThat(customPayload).containsKey(headerName);
return ByteBufferUtil.toDouble(customPayload.get(headerName));
Assertions.assertThat(customPayload).containsKey(SensorsCustomParams.READ_BYTES_REQUEST);
return ByteBufferUtil.toDouble(customPayload.get(SensorsCustomParams.READ_BYTES_REQUEST));
}

/**
Expand Down Expand Up @@ -154,24 +168,4 @@ private static ResultMessage.Rows executeWithPagingWithResultMessage(String quer

return selectStatement.execute(state, initialOptions, nanoTime);
}

/**
* TODO: update SimpleQueryResult in the dtest-api project to expose custom payload and use Coordinator##executeWithResult instead
*/
private static ResultMessage executeWithResult(String query, Object... args)
{
long nanoTime = System.nanoTime();
QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(query);
ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(ConsistencyLevel.ALL.name());
org.apache.cassandra.db.ConsistencyLevel cl = org.apache.cassandra.db.ConsistencyLevel.fromCode(consistencyLevel.ordinal());
QueryOptions initialOptions = QueryOptions.create(cl,
null,
false,
PageSize.inRows(512),
null,
null,
ProtocolVersion.CURRENT,
prepared.keyspace);
return prepared.statement.execute(QueryProcessor.internalQueryState(), initialOptions, nanoTime);
}
}