diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 31698c3049cf..78e4fa76b9da 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -95,6 +95,7 @@ else if (callbackInfo.callback instanceof ReadCallback) { ReadCallback readCallback = (ReadCallback) callbackInfo.callback; Context context = Context.from(readCallback.command()); + sensors.incrementSensor(context, Type.MEMORY_BYTES, message.serializedSize(MessagingService.current_version)); incrementSensor(sensors, context, Type.READ_BYTES, message); } // Covers Paxos Prepare and Propose callbacks. Paxos Commit callback is a regular WriteCallbackInfo diff --git a/src/java/org/apache/cassandra/sensors/Type.java b/src/java/org/apache/cassandra/sensors/Type.java index 25fad4e2e2bd..a4a03f4d1518 100644 --- a/src/java/org/apache/cassandra/sensors/Type.java +++ b/src/java/org/apache/cassandra/sensors/Type.java @@ -28,5 +28,6 @@ public enum Type READ_BYTES, WRITE_BYTES, - INDEX_WRITE_BYTES + INDEX_WRITE_BYTES, + MEMORY_BYTES } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index b0bbfadffdf0..0720bd80df6d 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1977,14 +1977,20 @@ public static PartitionIterator read(SinglePartitionReadCommand.Group group, group.queries, consistencyLevel); // Request sensors are utilized to track usages from replicas serving a read request - RequestSensors requestSensors = SensorsFactory.instance.createRequestSensors(group.metadata().keyspace); + RequestSensors sensors = SensorsFactory.instance.createRequestSensors(group.metadata().keyspace); Context context = Context.from(group.metadata()); - requestSensors.registerSensor(context, Type.READ_BYTES); - ExecutorLocals locals = ExecutorLocals.create(requestSensors); + sensors.registerSensor(context, Type.READ_BYTES); + sensors.registerSensor(context, Type.MEMORY_BYTES); + for (SinglePartitionReadCommand command : group.queries) + sensors.incrementSensor(context, Type.MEMORY_BYTES, ReadCommand.serializer.serializedSize(command, MessagingService.current_version)); + ExecutorLocals locals = ExecutorLocals.create(sensors); ExecutorLocals.set(locals); PartitionIterator partitions = read(group, consistencyLevel, queryState, queryStartNanoTime, readTracker); partitions = PartitionIterators.filteredRowTrackingIterator(partitions, readTracker::onFilteredPartition, readTracker::onFilteredRow, readTracker::onFilteredRow); - return PartitionIterators.doOnClose(partitions, readTracker::onDone); + return PartitionIterators.doOnClose(partitions, () -> { + readTracker.onDone(); + sensors.syncAllSensors(); + }); } /** @@ -2354,13 +2360,18 @@ public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, RequestSensors sensors = SensorsFactory.instance.createRequestSensors(command.metadata().keyspace); Context context = Context.from(command); sensors.registerSensor(context, Type.READ_BYTES); + sensors.registerSensor(context, Type.MEMORY_BYTES); + sensors.incrementSensor(context, Type.MEMORY_BYTES, ReadCommand.serializer.serializedSize(command, MessagingService.current_version)); ExecutorLocals locals = ExecutorLocals.create(sensors); ExecutorLocals.set(locals); PartitionIterator partitions = RangeCommands.partitions(command, consistencyLevel, queryStartNanoTime, readTracker); partitions = PartitionIterators.filteredRowTrackingIterator(partitions, readTracker::onFilteredPartition, readTracker::onFilteredRow, readTracker::onFilteredRow); - return PartitionIterators.doOnClose(partitions, readTracker::onDone); + return PartitionIterators.doOnClose(partitions, () -> { + readTracker.onDone(); + sensors.syncAllSensors(); + }); } public Map> getSchemaVersions() diff --git a/test/unit/org/apache/cassandra/sensors/CoordinatorSensorsTest.java b/test/unit/org/apache/cassandra/sensors/CoordinatorSensorsTest.java new file mode 100644 index 000000000000..a2c776dccb63 --- /dev/null +++ b/test/unit/org/apache/cassandra/sensors/CoordinatorSensorsTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sensors; + +import java.util.Optional; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.cql3.CQLTester; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CoordinatorSensorsTest extends CQLTester +{ + @BeforeClass + public static void setupClass() + { + CassandraRelevantProperties.SENSORS_FACTORY.setString(ActiveSensorsFactory.class.getName()); + // a workaround to force sensors registry to initialize (i.e. subscribe to SchemaChangeListener) before the + // test creates the keyspace and tables + SensorsRegistry.instance.clear(); + } + + @Test + public void testReadSensors() + { + createTable("create table %s (pk int, ck int, v text, primary key(pk, ck))"); + Context context = Context.from(currentTableMetadata()); + Optional memorySensor = SensorsRegistry.instance.getSensor(context, Type.MEMORY_BYTES); + assertThat(memorySensor).isEmpty(); + + executeNet("insert into %s (pk, ck, v) values (1, 1, 'v1')"); + executeNet("select * from %s where pk = 1"); + memorySensor = SensorsRegistry.instance.getSensor(context, Type.MEMORY_BYTES); + assertThat(memorySensor).isPresent(); + double memoryValue = memorySensor.get().getValue(); + assertThat(memoryValue).isGreaterThan(0); + + executeNet("select * from %s where pk = 1"); + assertThat(memorySensor.get().getValue()).isEqualTo(memoryValue * 2); + } + + @Test + public void testRangeReadSensors() + { + createTable("create table %s (pk int, ck int, v text, primary key(pk, ck))"); + Context context = Context.from(currentTableMetadata()); + Optional memorySensor = SensorsRegistry.instance.getSensor(context, Type.MEMORY_BYTES); + assertThat(memorySensor).isEmpty(); + + executeNet("insert into %s (pk, ck, v) values (1, 1, 'v1')"); + executeNet("insert into %s (pk, ck, v) values (1, 2, 'v2')"); + executeNet("insert into %s (pk, ck, v) values (1, 3, 'v3')"); + executeNet("select * from %s"); + memorySensor = SensorsRegistry.instance.getSensor(context, Type.MEMORY_BYTES); + assertThat(memorySensor).isPresent(); + double memoryValue = memorySensor.get().getValue(); + assertThat(memoryValue).isGreaterThan(0); + + executeNet("select * from %s"); + assertThat(memorySensor.get().getValue()).isEqualTo(memoryValue * 2); + } +}