Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: minor fix with metric #17

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/metrics-display.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ By default, 2 files are generated in the program's working directory
| Histogram | insert_rows_success_num | Statistics on the number of successful writes. |
| Histogram | serializing_executor_drain_num\_${name} | Serializing executor. Statistics on the number of draining tasks. |
| Histogram | write_limiter_acquire_available_permits | Statistics on the number of available permits for write data(insert/delete). |
| Histogram | write_stream_limiter_acquire_wait_time | Statistics on the time spent acquiring write data (insert/delete) permits when using `StreamWriter`,<br/>note that it does not include the time spent writing, only the time spent acquiring the permit. |
| Meter | connection_failure | Statistics on the number of failed connections. |
| Meter | write_by_retries_${n} | QPS for the nth retry write, n == 0 for the first write (non-retry), n > 3 will be counted as n == 3 |
| Meter | write_failure_num | Statistics on the number of failed writes. |
| Meter | write_qps | Write Request QPS |
| Timer | write_stream_limiter_acquire_wait_time | Statistics on the time spent acquiring write data (insert/delete) permits when using `StreamWriter`,<br/>note that it does not include the time spent writing, only the time spent acquiring the permit. |
| Timer | async_write_pool.time | Asynchronous pool time statistics for asynchronous write tasks in SDK, this is important and it is recommended to focus on it. |
| Timer | direct_executor_timer_rpc_direct_pool | he appearance of this metric means that we are using the current thread to execute the asynchronous callback of the rpc client, which is the default configuration.<br/> This is usually sufficient and very resource-saving, but it needs attention. When there are problems, replace it with a thread pool in time. |
| Timer | req_rt_${service_name}/${method_name} | The time consumption statistics of the request, the service name and method name are the names of the service and method of the grpc request. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class Clock {
*/
public abstract long getTick();

public long duration(final long startTick) {
public long duration(long startTick) {
return getTick() - startTick;
}

Expand Down
20 changes: 11 additions & 9 deletions ingester-protocol/src/main/java/io/greptime/WriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.RateLimiter;
import io.greptime.common.Display;
import io.greptime.common.Endpoint;
Expand Down Expand Up @@ -49,6 +50,7 @@
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
* Default Write API impl.
Expand Down Expand Up @@ -254,8 +256,8 @@ static final class InnerMetricHelper {
static final Histogram DELETE_ROWS_SUCCESS_NUM = MetricsUtil.histogram("delete_rows_success_num");
static final Histogram INSERT_ROWS_FAILURE_NUM = MetricsUtil.histogram("insert_rows_failure_num");
static final Histogram DELETE_ROWS_FAILURE_NUM = MetricsUtil.histogram("delete_rows_failure_num");
static final Histogram WRITE_STREAM_LIMITER_ACQUIRE_WAIT_TIME = MetricsUtil
.histogram("write_stream_limiter_acquire_wait_time");
static final Timer WRITE_STREAM_LIMITER_ACQUIRE_WAIT_TIME = MetricsUtil
.timer("write_stream_limiter_acquire_wait_time");
static final Meter WRITE_FAILURE_NUM = MetricsUtil.meter("write_failure_num");
static final Meter WRITE_QPS = MetricsUtil.meter("write_qps");

Expand All @@ -281,7 +283,7 @@ static Histogram writeRowsFailureNum(WriteOp writeOp) {
}
}

static Histogram writeStreamLimiterAcquireWaitTime() {
static Timer writeStreamLimiterAcquireWaitTime() {
return WRITE_STREAM_LIMITER_ACQUIRE_WAIT_TIME;
}

Expand Down Expand Up @@ -340,14 +342,14 @@ static abstract class RateLimitingStreamWriter implements StreamWriter<Table, Wr
public StreamWriter<Table, WriteOk> write(Table table, WriteOp writeOp) {
Ensures.ensureNonNull(table, "null `table`");

WriteTables writeTables = new WriteTables(table, writeOp);

if (this.rateLimiter != null) {
double timeSpent = this.rateLimiter.acquire(table.pointCount());
InnerMetricHelper.writeStreamLimiterAcquireWaitTime().update((long) timeSpent);
int permits = table.pointCount();
if (this.rateLimiter != null && permits > 0) {
double millisToWait = this.rateLimiter.acquire(permits) * 1000;
InnerMetricHelper.writeStreamLimiterAcquireWaitTime()
.update((long) millisToWait, TimeUnit.MILLISECONDS);
}

this.observer.onNext(writeTables);
this.observer.onNext(new WriteTables(table, writeOp));
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ public Builder maxInFlightWriteRows(int maxInFlightWriteRows) {
* - `LimitedPolicy.DiscardPolicy`: discard the data if the limiter is full.
* - `LimitedPolicy.AbortPolicy`: abort if the limiter is full.
* - `LimitedPolicy.BlockingPolicy`: blocks if the limiter is full.
* - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if the limiter is full.
* - `LimitedPolicy.AbortOnBlockingTimeoutPolicy`: blocks the specified time if
* the limiter is full, abort if timeout.
* The default is `LimitedPolicy.AbortOnBlockingTimeoutPolicy`
*
* @param writeLimitedPolicy write limited policy
Expand All @@ -229,8 +230,8 @@ public Builder writeLimitedPolicy(LimitedPolicy writeLimitedPolicy) {
}

/**
* The default rate limit for `StreamWriter`. It only takes effect when we do not specify the
* `maxPointsPerSecond` when creating a `StreamWriter`.
* The default rate limit value(points per second) for `StreamWriter`. It only takes
* effect when we do not specify the `maxPointsPerSecond` when creating a `StreamWriter`.
* The default is 10 * 65536
*
* @param defaultStreamMaxWritePointsPerSecond default max write points per second
Expand Down
Loading