Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add coordinator MEMORY_BYTES sensor for reads #1452

Draft
wants to merge 56 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
54b3b28
CNDB-8501 Propagate request sensors via native CQL custom payload
aymkhalil Sep 21, 2024
7613307
Propagate READ_BYTES only
aymkhalil Oct 2, 2024
c755d86
Merge branch 'main' into cndb-8501
aymkhalil Oct 8, 2024
ee9566e
Add WRITE_BYTES and customize header prefix
aymkhalil Oct 15, 2024
8bf72a9
Fix typo
aymkhalil Oct 15, 2024
1e3092a
Fix CounterMutationCallbackTest
aymkhalil Oct 15, 2024
55454d9
Track replica sensors in ResponseVerbHandler + refactor custom sensor…
aymkhalil Oct 18, 2024
f8a1fc6
Add ReplicaSensorsTrackedTest unit tests, more tests in SensorsTest d…
aymkhalil Oct 18, 2024
7c19ca5
Fix CounterMutationCallbackTest
aymkhalil Oct 18, 2024
2373ede
Cleanup
aymkhalil Oct 21, 2024
1311836
Remove unnecessary instanceof check
aymkhalil Oct 21, 2024
78ef0b5
SensorEncoder java docs
aymkhalil Oct 21, 2024
42a41f2
Address generic feedback (javadocs, code style, leftovers, class pack…
aymkhalil Oct 23, 2024
31dfa8f
Add SensorEncoder interface
aymkhalil Oct 23, 2024
fe0628b
table -> global everywhere
aymkhalil Oct 23, 2024
7a9c72f
Register sensors before the mutate call
aymkhalil Oct 23, 2024
42b8ebf
Add to instead of replacing custom payload
aymkhalil Oct 24, 2024
0a9b1c9
Avpid copying customPayload map for now
aymkhalil Oct 24, 2024
32ee16e
Refactor RequestSensorsFactory to SensorsFactory
aymkhalil Nov 6, 2024
034183f
SensorsCustomParams java docs
aymkhalil Nov 6, 2024
d39b40a
Propagate CAS sensors
aymkhalil Nov 7, 2024
bd952b4
Propagate batch insert sensors
aymkhalil Nov 7, 2024
dfafba6
Propagate range query sensors
aymkhalil Nov 7, 2024
b9b4f53
Refactor SensorsTest dtest
aymkhalil Nov 7, 2024
e01c266
Minor types + refactoring of raw type usages
aymkhalil Nov 8, 2024
26b8500
Merge branch 'main' into cndb-8501
aymkhalil Nov 8, 2024
f137fa4
Fix race condition when RequestCallback signals completion before sen…
aymkhalil Nov 13, 2024
f9c7af5
Update SensorsFactory#createRequestSensors to accept array of keyspac…
aymkhalil Nov 13, 2024
833fd89
Decouple WriteCallbackInfo#shouldHint from mutation nullability & int…
aymkhalil Nov 13, 2024
0b74852
erge branch 'main' into cndb-8501
aymkhalil Nov 14, 2024
6287335
Add Counter mutation dtest
aymkhalil Nov 14, 2024
d4a3d6e
Fix Invalid test class 'org.apache.cassandra.distributed.test.sensors…
aymkhalil Nov 14, 2024
35ba7ce
Squash AbstractSensorsTest & SensorsTest into one class
aymkhalil Nov 14, 2024
bc11b24
Rename REQUEST_SENSORS_VIA_NATIVE_PROTOCOL -> REQUEST_VIA_NATIVE_PROT…
aymkhalil Dec 2, 2024
0bc81cd
Fix typos acutal -> actual
aymkhalil Dec 2, 2024
b752ae8
Collect unique contexts from batch statements for adding sensors
aymkhalil Dec 2, 2024
e082a6f
Don't inline if for readability
aymkhalil Dec 2, 2024
118d27c
Reuse addSensorsToInternodeResponse in CounterMutationCallback
aymkhalil Dec 2, 2024
d9d70c0
Refactor RequestCallbacks#getIMutaiton to iMutation()
aymkhalil Dec 2, 2024
e4e18fc
Fix ActiveSensorsFactory javadocs
aymkhalil Dec 2, 2024
a77b5e6
Switch from ByteBuffer.allocate to ByteBuffer.wrap to spare some memo…
aymkhalil Dec 2, 2024
425d27b
Fix SensorsFactory javadocs
aymkhalil Dec 2, 2024
be77507
Fix SensorsCustomParams javadocs
aymkhalil Dec 2, 2024
b4b0076
Remove leading li tag spaces in SensorsFactory
aymkhalil Dec 2, 2024
6682ba8
Remove extra colon in StorageProxy
aymkhalil Dec 2, 2024
670f755
Better methods names in ReplicaSensorsTrackingTest
aymkhalil Dec 2, 2024
795d089
Merge branch 'main' into cndb-8501
aymkhalil Dec 2, 2024
98c0731
Refactor SensorEncoder methods to return optional & have better names
aymkhalil Dec 3, 2024
ff97dec
CNDB-11544 Add coordinator MEMORY_BYTES sensor for reads
aymkhalil Dec 4, 2024
e65faa9
Add coordinator INTERNODE_BYTES sensor for reads
aymkhalil Dec 5, 2024
4123198
Sync sensors on finally blocks
aymkhalil Dec 6, 2024
050ea94
Revert "Add coordinator INTERNODE_BYTES sensor for reads"
aymkhalil Dec 10, 2024
9f345e8
Add dtest for read
aymkhalil Dec 10, 2024
bc05df7
MEMORY_BYTES -> IN_MEMORY_BYTES
aymkhalil Dec 11, 2024
c56d960
Fix testSensorsTrackedForReadCallback by creating concrete DataResponse
aymkhalil Dec 11, 2024
cfc8278
Fix NPE when calling message#serializedSize
aymkhalil Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.sensors.SensorsFactory;
import org.apache.cassandra.service.reads.range.EndpointGroupingRangeCommandIterator;

/** A class that extracts system properties for the cassandra node it runs within. */
Expand Down Expand Up @@ -514,10 +515,10 @@ public enum CassandraRelevantProperties
SAI_INDEX_READS_DISABLED("cassandra.sai.disabled_reads", "false"),

/**
* Allows custom implementation of {@link org.apache.cassandra.sensors.RequestSensorsFactory} to optionally create
* Allows custom implementation of {@link SensorsFactory} to optionally create
* and configure {@link org.apache.cassandra.sensors.RequestSensors} instances.
*/
REQUEST_SENSORS_FACTORY("cassandra.request_sensors_factory_class"),
SENSORS_FACTORY("cassandra.sensors_factory_class"),

/**
* This property allows configuring the maximum time that CachingRebufferer.rebuffer will wait when waiting for a
Expand All @@ -538,7 +539,12 @@ public enum CassandraRelevantProperties
* If provided, this custom factory class will be used to create stage executor for a couple of stages.
* @see Stage for details
*/
CUSTOM_STAGE_EXECUTOR_FACTORY_PROPERTY("cassandra.custom_stage_executor_factory_class");
CUSTOM_STAGE_EXECUTOR_FACTORY_PROPERTY("cassandra.custom_stage_executor_factory_class"),

/**
* If true, the coordinator will propagate sensors via the native protocol custom payload bytes map.
*/
SENSORS_VIA_NATIVE_PROTOCOL("cassandra.sensors_via_native_protocol", "false");

CassandraRelevantProperties(String key, String defaultVal)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultiset;
Expand Down Expand Up @@ -67,6 +68,10 @@
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.SensorsCustomParams;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
Expand Down Expand Up @@ -441,7 +446,16 @@ public ResultMessage execute(QueryState queryState, BatchQueryOptions options, l
executeWithoutConditions(getMutations(queryState, options, false, timestamp, nowInSeconds, queryStartNanoTime),
queryState, cl, queryStartNanoTime);

return new ResultMessage.Void();
ResultMessage<ResultMessage.Void> result = new ResultMessage.Void();
RequestSensors sensors = RequestTracker.instance.get();
Set<Context> uniqueContexts = statements.stream()
.map(ModificationStatement::metadata)
.map(Context::from)
.collect(Collectors.toSet());
for (Context context : uniqueContexts)
SensorsCustomParams.addSensorToCQLResponse(result, options.wrapped.getProtocolVersion(), sensors, context, org.apache.cassandra.sensors.Type.WRITE_BYTES);

return result;
}

private void executeWithoutConditions(List<? extends IMutation> mutations,
Expand Down
16 changes: 16 additions & 0 deletions src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.cassandra.cql3.selection.SortedRowsBuilder;
import org.apache.cassandra.guardrails.Guardrails;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.sensors.SensorsCustomParams;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
Expand All @@ -67,6 +68,10 @@
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.ClientWarn;
Expand Down Expand Up @@ -567,6 +572,11 @@ else if (restrictions.keyIsInRelation())
msg = processResults(partitions, options, selectors, nowInSec, userLimit, userOffset);
}

RequestSensors sensors = RequestTracker.instance.get();
Context context = Context.from(this.table);
Type sensorType = Type.READ_BYTES;
SensorsCustomParams.addSensorToCQLResponse(msg, options.getProtocolVersion(), sensors, context, sensorType);

// Please note that the isExhausted state of the pager only gets updated when we've closed the page, so this
// shouldn't be moved inside the 'try' above.
if (!pager.isExhausted() && !pager.pager.isTopK())
Expand Down Expand Up @@ -1008,7 +1018,13 @@ private ResultSet process(PartitionIterator partitions,

ColumnFamilyStore store = cfs();
if (store != null)
{
store.metric.coordinatorReadSize.update(result.readRowsSize());
RequestSensors sensors = RequestTracker.instance.get();
// sensors are not initialized for queries executed internally
if (sensors != null)
sensors.incrementSensor(Context.from(table), Type.IN_MEMORY_BYTES, result.readRowsSize());
}

return result.build();
}
Expand Down
24 changes: 24 additions & 0 deletions src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,15 @@
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.sensors.SensorsCustomParams;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.commons.lang3.builder.ToStringBuilder;
Expand Down Expand Up @@ -110,6 +117,23 @@ public void addUpdateForKey(PartitionUpdate.Builder update, Slice slice, UpdateP
throw new UnsupportedOperationException();
}

@Override
public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime)
{
ResultMessage result = super.execute(state, options, queryStartNanoTime);

if (result == null)
result = new ResultMessage.Void();

RequestSensors sensors = RequestTracker.instance.get();
Context context = Context.from(this.metadata());
SensorsCustomParams.addSensorToCQLResponse(result, options.getProtocolVersion(), sensors, context, Type.WRITE_BYTES);
// CAS updates incorporate read sensors
SensorsCustomParams.addSensorToCQLResponse(result, options.getProtocolVersion(), sensors, context, Type.READ_BYTES);

return result;
}

public static class ParsedInsert extends ModificationStatement.Parsed
{
private final List<ColumnIdentifier> columnNames;
Expand Down
55 changes: 5 additions & 50 deletions src/java/org/apache/cassandra/db/CounterMutationCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,12 @@

package org.apache.cassandra.db;

import java.util.Collection;
import java.util.Optional;
import java.util.function.Function;

import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.SensorsRegistry;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.sensors.SensorsCustomParams;

/**
* A counter mutation callback that encapsulates {@link RequestSensors} and replica count
Expand All @@ -39,14 +32,14 @@ public class CounterMutationCallback implements Runnable
{
private final Message<CounterMutation> requestMessage;
private final InetAddressAndPort respondToAddress;
private final RequestSensors requestSensors;
private final RequestSensors sensors;
private int replicaCount = 0;

public CounterMutationCallback(Message<CounterMutation> requestMessage, InetAddressAndPort respondToAddress, RequestSensors requestSensors)
public CounterMutationCallback(Message<CounterMutation> requestMessage, InetAddressAndPort respondToAddress, RequestSensors sensors)
{
this.requestMessage = requestMessage;
this.respondToAddress = respondToAddress;
this.requestSensors = requestSensors;
this.sensors = sensors;
}

/**
Expand All @@ -64,45 +57,7 @@ public void run()
int replicaMultiplier = replicaCount == 0 ?
1 : // replica count was not explicitly set (default). At the bare minimum, we should send the response accomodating for the local replica (aka. mutation leader) sensor values
replicaCount;
addSensorsToResponse(responseBuilder, replicaMultiplier);
SensorsCustomParams.addSensorsToInternodeResponse(sensors, s -> s.getValue() * replicaMultiplier, responseBuilder);
MessagingService.instance().send(responseBuilder.build(), respondToAddress);
}

private void addSensorsToResponse(Message.Builder<NoPayload> response, int replicaMultiplier)
{
// There is no need to increment INTERNODE_BYTES to accommodate for outbound bytes because the response payload
// is of type NoPayload which has zero size
Collection<Sensor> sensors = this.requestSensors.getSensors(s -> s.getType() == Type.INTERNODE_BYTES);
Function<String, String> requestParam = SensorsCustomParams::encodeTableInInternodeBytesRequestParam;
Function<String, String> tableParam = SensorsCustomParams::encodeTableInInternodeBytesTableParam;
addSensorsToResponse(sensors, requestParam, tableParam, response, replicaMultiplier);

// Add write bytes sensors to the response
requestParam = SensorsCustomParams::encodeTableInWriteBytesRequestParam;
tableParam = SensorsCustomParams::encodeTableInWriteBytesTableParam;
sensors = this.requestSensors.getSensors(s -> s.getType() == Type.WRITE_BYTES);
addSensorsToResponse(sensors, requestParam, tableParam, response, replicaMultiplier);
}

private static void addSensorsToResponse(Collection<Sensor> sensors,
Function<String, String> requestParamSupplier,
Function<String, String> tableParamSupplier,
Message.Builder<NoPayload> response,
int replicaMultiplier)
{
for (Sensor sensor : sensors)
{
String requestBytesParam = requestParamSupplier.apply(sensor.getContext().getTable());
byte[] requestBytes = SensorsCustomParams.sensorValueAsBytes(sensor.getValue() * replicaMultiplier);
response.withCustomParam(requestBytesParam, requestBytes);

// for each table in the mutation, send the global per table counter write bytes as recorded by the registry
Optional<Sensor> registrySensor = SensorsRegistry.instance.getSensor(sensor.getContext(), sensor.getType());
registrySensor.ifPresent(s -> {
String tableBytesParam = tableParamSupplier.apply(s.getContext().getTable());
byte[] tableBytes = SensorsCustomParams.sensorValueAsBytes(s.getValue() * replicaMultiplier);
response.withCustomParam(tableBytesParam, tableBytes);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestSensorsFactory;
import org.apache.cassandra.sensors.SensorsFactory;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.StorageProxy;

Expand All @@ -49,7 +49,7 @@ public void doVerb(final Message<CounterMutation> message)
logger.trace("Applying forwarded {}", cm);

// Initialize the sensor and set ExecutorLocals
RequestSensors requestSensors = RequestSensorsFactory.instance.create(message.payload.getKeyspaceName());
RequestSensors requestSensors = SensorsFactory.instance.createRequestSensors(message.payload.getKeyspaceName());
Collection<TableMetadata> tables = message.payload.getPartitionUpdates().stream().map(PartitionUpdate::metadata).collect(Collectors.toSet());
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);
Expand Down
8 changes: 4 additions & 4 deletions src/java/org/apache/cassandra/db/MutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.ParamType;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.SensorsCustomParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestSensorsFactory;
import org.apache.cassandra.sensors.SensorsFactory;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.tracing.Tracing;

Expand All @@ -48,7 +48,7 @@ private void respond(RequestSensors requestSensors, Message<Mutation> respondToM

Message.Builder<NoPayload> response = respondToMessage.emptyResponseBuilder();
// no need to calculate outbound internode bytes because the response is NoPayload
SensorsCustomParams.addSensorsToResponse(requestSensors, response);
SensorsCustomParams.addSensorsToInternodeResponse(requestSensors, response);
MessagingService.instance().send(response.build(), respondToAddress);
}

Expand Down Expand Up @@ -76,7 +76,7 @@ public void doVerb(Message<Mutation> message)
try
{
// Initialize the sensor and set ExecutorLocals
RequestSensors requestSensors = RequestSensorsFactory.instance.create(message.payload.getKeyspaceName());
RequestSensors requestSensors = SensorsFactory.instance.createRequestSensors(message.payload.getKeyspaceName());
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

Expand Down
8 changes: 4 additions & 4 deletions src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.SensorsCustomParams;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestSensorsFactory;
import org.apache.cassandra.sensors.SensorsFactory;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.tracing.Tracing;

Expand All @@ -56,7 +56,7 @@ public void doVerb(Message<ReadCommand> message)
validateTransientStatus(message);

// Initialize the sensor and set ExecutorLocals
RequestSensors requestSensors = RequestSensorsFactory.instance.create(command.metadata().keyspace);
RequestSensors requestSensors = SensorsFactory.instance.createRequestSensors(command.metadata().keyspace);
Context context = Context.from(command);
requestSensors.registerSensor(context, Type.READ_BYTES);
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
Expand Down Expand Up @@ -87,7 +87,7 @@ public void doVerb(Message<ReadCommand> message)
int size = reply.currentPayloadSize(MessagingService.current_version);
requestSensors.incrementSensor(context, Type.INTERNODE_BYTES, size);
requestSensors.syncAllSensors();
SensorsCustomParams.addSensorsToResponse(requestSensors, reply);
SensorsCustomParams.addSensorsToInternodeResponse(requestSensors, reply);

Tracing.trace("Enqueuing response to {}", message.from());
MessagingService.instance().send(reply.build(), message.from());
Expand Down
5 changes: 4 additions & 1 deletion src/java/org/apache/cassandra/net/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -1410,9 +1410,12 @@ private int extractParamsSizePre40(ByteBuffer buf, int readerIndex, int readerLi

private <T> int payloadSize(Message<T> message, int version)
{
long payloadSize = message.payload != null && message.payload != NoPayload.noPayload
long payloadSize = message.payload != null && message.payload != NoPayload.noPayload && message.getPayloadSerializer() != null
? message.getPayloadSerializer().serializedSize(message.payload, version)
: 0;
// TODO: remove and properly fix NPE when calling message.serializedSize(MessagingService.current_version)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed this is legacy verb REQUEST_RSP:

No payload serializer found for verb REQUEST_RSP, responseVerb null, payload org.apache.cassandra.db.ReadResponse$RemoteDataResponse@c9f4b69

if (message.payload != null && message.payload != NoPayload.noPayload && message.getPayloadSerializer() == null)
logger.warn("No payload serializer found for verb {}, responseVerb {}, payload {}", message.verb(), message.verb().responseVerb, message.payload);
return Ints.checkedCast(payloadSize);
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/java/org/apache/cassandra/net/RequestCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
*/
package org.apache.cassandra.net;

import javax.annotation.Nullable;

import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.sensors.NoOpRequestSensors;
import org.apache.cassandra.sensors.RequestSensors;

/**
* implementors of {@link RequestCallback} need to make sure that any public methods
Expand Down Expand Up @@ -56,4 +60,14 @@ default boolean trackLatencyForSnitch()
{
return false;
}

/**
* @return the {@link RequestSensors} associated with the request to track sensors as reported by response replicas.
* If null, sensor tracking will be disabled for this request.
*/
@Nullable
default RequestSensors getRequestSensors()
{
return null;
}
}
Loading