Skip to content

Commit

Permalink
Add coordinator MEMORY_BYTES sensor for reads
Browse files Browse the repository at this point in the history
  • Loading branch information
aymkhalil committed Dec 4, 2024
1 parent 98c0731 commit f588a38
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/net/ResponseVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ else if (callbackInfo.callback instanceof ReadCallback)
{
ReadCallback<?, ?> readCallback = (ReadCallback<?, ?>) callbackInfo.callback;
Context context = Context.from(readCallback.command());
sensors.incrementSensor(context, Type.MEMORY_BYTES, message.serializedSize(MessagingService.current_version));
incrementSensor(sensors, context, Type.READ_BYTES, message);
}
// Covers Paxos Prepare and Propose callbacks. Paxos Commit callback is a regular WriteCallbackInfo
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/sensors/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ public enum Type
READ_BYTES,

WRITE_BYTES,
INDEX_WRITE_BYTES
INDEX_WRITE_BYTES,
MEMORY_BYTES
}
21 changes: 16 additions & 5 deletions src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -1977,14 +1977,20 @@ public static PartitionIterator read(SinglePartitionReadCommand.Group group,
group.queries,
consistencyLevel);
// Request sensors are utilized to track usages from replicas serving a read request
RequestSensors requestSensors = SensorsFactory.instance.createRequestSensors(group.metadata().keyspace);
RequestSensors sensors = SensorsFactory.instance.createRequestSensors(group.metadata().keyspace);
Context context = Context.from(group.metadata());
requestSensors.registerSensor(context, Type.READ_BYTES);
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
sensors.registerSensor(context, Type.READ_BYTES);
sensors.registerSensor(context, Type.MEMORY_BYTES);
for (SinglePartitionReadCommand command : group.queries)
sensors.incrementSensor(context, Type.MEMORY_BYTES, ReadCommand.serializer.serializedSize(command, MessagingService.current_version));
ExecutorLocals locals = ExecutorLocals.create(sensors);
ExecutorLocals.set(locals);
PartitionIterator partitions = read(group, consistencyLevel, queryState, queryStartNanoTime, readTracker);
partitions = PartitionIterators.filteredRowTrackingIterator(partitions, readTracker::onFilteredPartition, readTracker::onFilteredRow, readTracker::onFilteredRow);
return PartitionIterators.doOnClose(partitions, readTracker::onDone);
return PartitionIterators.doOnClose(partitions, () -> {
readTracker.onDone();
sensors.syncAllSensors();
});
}

/**
Expand Down Expand Up @@ -2354,13 +2360,18 @@ public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command,
RequestSensors sensors = SensorsFactory.instance.createRequestSensors(command.metadata().keyspace);
Context context = Context.from(command);
sensors.registerSensor(context, Type.READ_BYTES);
sensors.registerSensor(context, Type.MEMORY_BYTES);
sensors.incrementSensor(context, Type.MEMORY_BYTES, ReadCommand.serializer.serializedSize(command, MessagingService.current_version));
ExecutorLocals locals = ExecutorLocals.create(sensors);
ExecutorLocals.set(locals);

PartitionIterator partitions = RangeCommands.partitions(command, consistencyLevel, queryStartNanoTime, readTracker);
partitions = PartitionIterators.filteredRowTrackingIterator(partitions, readTracker::onFilteredPartition, readTracker::onFilteredRow, readTracker::onFilteredRow);

return PartitionIterators.doOnClose(partitions, readTracker::onDone);
return PartitionIterators.doOnClose(partitions, () -> {
readTracker.onDone();
sensors.syncAllSensors();
});
}

public Map<String, List<String>> getSchemaVersions()
Expand Down
81 changes: 81 additions & 0 deletions test/unit/org/apache/cassandra/sensors/CoordinatorSensorsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.sensors;

import java.util.Optional;

import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.cql3.CQLTester;

import static org.assertj.core.api.Assertions.assertThat;

public class CoordinatorSensorsTest extends CQLTester
{
@BeforeClass
public static void setupClass()
{
CassandraRelevantProperties.SENSORS_FACTORY.setString(ActiveSensorsFactory.class.getName());
// a workaround to force sensors registry to initialize (i.e. subscribe to SchemaChangeListener) before the
// test creates the keyspace and tables
SensorsRegistry.instance.clear();
}

@Test
public void testReadSensors()
{
createTable("create table %s (pk int, ck int, v text, primary key(pk, ck))");
Context context = Context.from(currentTableMetadata());
Optional<Sensor> memorySensor = SensorsRegistry.instance.getSensor(context, Type.MEMORY_BYTES);
assertThat(memorySensor).isEmpty();

executeNet("insert into %s (pk, ck, v) values (1, 1, 'v1')");
executeNet("select * from %s where pk = 1");
memorySensor = SensorsRegistry.instance.getSensor(context, Type.MEMORY_BYTES);
assertThat(memorySensor).isPresent();
double memoryValue = memorySensor.get().getValue();
assertThat(memoryValue).isGreaterThan(0);

executeNet("select * from %s where pk = 1");
assertThat(memorySensor.get().getValue()).isEqualTo(memoryValue * 2);
}

@Test
public void testRangeReadSensors()
{
createTable("create table %s (pk int, ck int, v text, primary key(pk, ck))");
Context context = Context.from(currentTableMetadata());
Optional<Sensor> memorySensor = SensorsRegistry.instance.getSensor(context, Type.MEMORY_BYTES);
assertThat(memorySensor).isEmpty();

executeNet("insert into %s (pk, ck, v) values (1, 1, 'v1')");
executeNet("insert into %s (pk, ck, v) values (1, 2, 'v2')");
executeNet("insert into %s (pk, ck, v) values (1, 3, 'v3')");
executeNet("select * from %s");
memorySensor = SensorsRegistry.instance.getSensor(context, Type.MEMORY_BYTES);
assertThat(memorySensor).isPresent();
double memoryValue = memorySensor.get().getValue();
assertThat(memoryValue).isGreaterThan(0);

executeNet("select * from %s");
assertThat(memorySensor.get().getValue()).isEqualTo(memoryValue * 2);
}
}

0 comments on commit f588a38

Please sign in to comment.