Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

census: add interceptor APIs for configuring census features with custom settings on client side #7153

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions api/src/main/java/io/grpc/InternalCensus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

/**
* Internal accessor for configuring Census features. Do not use this.
*/
@Internal
public final class InternalCensus {

/**
* Key to access the configuration if the default client side census features are disabled.
*/
@Internal
public static final CallOptions.Key<Boolean> DISABLE_CLIENT_DEFAULT_CENSUS =
CallOptions.Key.create("io.grpc.DISABLE_CLIENT_DEFAULT_CENSUS_STATS");
}
146 changes: 146 additions & 0 deletions census/src/main/java/io/grpc/census/CensusClientInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.census;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.InternalCensus;
import io.grpc.MethodDescriptor;
import java.util.ArrayList;
import java.util.List;

/**
* A {@link ClientInterceptor} for configuring client side OpenCensus features with
* custom settings. Note OpenCensus stats and tracing features are turned on by default
* if grpc-census artifact is in the runtime classpath. The gRPC core
* library does not provide public APIs for customized OpenCensus configurations.
* Use this interceptor to do so. Intended for advanced usages.
*
* <p>Applying this interceptor disables the channel's default stats and tracing
* features. The effectively OpenCensus features are determined by configurations in this
* interceptor.
*
* <p>For the current release, applying this interceptor may have the side effect that
* effectively disables retry.
*/
// TODO(chengyuanzhang): add ExperimentalApi annotation.
public final class CensusClientInterceptor implements ClientInterceptor {

private final List<ClientInterceptor> interceptors = new ArrayList<>();

private CensusClientInterceptor(
boolean statsEnabled,
boolean recordStartedRpcs,
boolean recordFinishedRpcs,
boolean recordRealTimeMetrics,
boolean tracingEnabled) {
if (statsEnabled) {
CensusStatsModule censusStats =
new CensusStatsModule(recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics);
interceptors.add(censusStats.getClientInterceptor());
}
if (tracingEnabled) {
CensusTracingModule censusTracing = new CensusTracingModule();
interceptors.add(censusTracing.getClientInterceptor());
}
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
callOptions = callOptions.withOption(InternalCensus.DISABLE_CLIENT_DEFAULT_CENSUS, true);
if (!interceptors.isEmpty()) {
next =
ClientInterceptors.intercept(next, interceptors.toArray(new ClientInterceptor[0]));
}
return next.newCall(method, callOptions);
}

/**
* Creates a new builder for a {@link CensusClientInterceptor}.
*/
public static Builder newBuilder() {
return new Builder();
}

/**
* A builder for a {@link CensusClientInterceptor}.
*/
public static class Builder {

private boolean statsEnabled;
private boolean recordStartedRpcs;
private boolean recordFinishedRpcs;
private boolean recordRealTimeMetrics;
private boolean tracingEnabled;

/**
* Disable or enable stats features. Disabled by default.
*/
public Builder setStatsEnabled(boolean value) {
statsEnabled = value;
return this;
}

/**
* Disable or enable real-time metrics recording. Effective only if {@link #setStatsEnabled}
* is set to true. Disabled by default.
*/
public Builder setRecordStartedRpcs(boolean value) {
recordStartedRpcs = value;
return this;
}

/**
* Disable or enable stats recording for RPC completions. Effective only if {@link
* #setStatsEnabled} is set to true. Disabled by default.
*/
public Builder setRecordFinishedRpcs(boolean value) {
recordFinishedRpcs = value;
return this;
}

/**
* Disable or enable stats recording for RPC upstarts. Effective only if {@link
* #setStatsEnabled} is set to true. Disabled by default.
*/
public Builder setRecordRealTimeMetrics(boolean value) {
recordRealTimeMetrics = value;
return this;
}

/**
* Disable or enable tracing features. Disabled by default.
*/
public Builder setTracingEnabled(boolean value) {
tracingEnabled = value;
return this;
}

/**
* Builds the {@link CensusClientInterceptor}.
*/
public CensusClientInterceptor build() {
return new CensusClientInterceptor(
statsEnabled, recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics,
tracingEnabled);
}
}
}
27 changes: 18 additions & 9 deletions census/src/main/java/io/grpc/census/CensusStatsModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,37 @@
final class CensusStatsModule {
private static final Logger logger = Logger.getLogger(CensusStatsModule.class.getName());
private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS.toNanos(1);
private static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = new Supplier<Stopwatch>() {
@Override
public Stopwatch get() {
return Stopwatch.createUnstarted();
}
};

private final Tagger tagger;
private final StatsRecorder statsRecorder;
private final Supplier<Stopwatch> stopwatchSupplier;
@VisibleForTesting
final Metadata.Key<TagContext> statsHeader;
private final boolean propagateTags;
private final boolean recordStartedRpcs;
private final boolean recordFinishedRpcs;
private final boolean recordRealTimeMetrics;
@VisibleForTesting
final boolean recordStartedRpcs;
@VisibleForTesting
final boolean recordFinishedRpcs;
@VisibleForTesting
final boolean recordRealTimeMetrics;

/**
* Creates a {@link CensusStatsModule} with the default OpenCensus implementation.
*/
CensusStatsModule(Supplier<Stopwatch> stopwatchSupplier,
boolean propagateTags, boolean recordStartedRpcs, boolean recordFinishedRpcs,
boolean recordRealTimeMetrics) {
CensusStatsModule(
boolean recordStartedRpcs, boolean recordFinishedRpcs, boolean recordRealTimeMetrics) {
this(
Tags.getTagger(),
Tags.getTagPropagationComponent().getBinarySerializer(),
Stats.getStatsRecorder(),
stopwatchSupplier,
propagateTags, recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics);
STOPWATCH_SUPPLIER,
true, recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics);
}

/**
Expand Down Expand Up @@ -345,7 +353,8 @@ static final class ClientCallTracer extends ClientStreamTracer.Factory {
callEndedUpdater = tmpCallEndedUpdater;
}

private final CensusStatsModule module;
@VisibleForTesting
final CensusStatsModule module;
private final Stopwatch stopwatch;
private volatile ClientTracer streamTracer;
private volatile int callEnded;
Expand Down
5 changes: 5 additions & 0 deletions census/src/main/java/io/grpc/census/CensusTracingModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.Status;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.propagation.BinaryFormat;
import io.opencensus.trace.unsafe.ContextUtils;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -92,6 +93,10 @@ final class CensusTracingModule {
private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor();
private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();

CensusTracingModule() {
this(Tracing.getTracer(), Tracing.getPropagationComponent().getBinaryFormat());
}

CensusTracingModule(
Tracer censusTracer, final BinaryFormat censusPropagationBinaryFormat) {
this.censusTracer = checkNotNull(censusTracer, "censusTracer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@

import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Internal;
import io.grpc.InternalCensus;
import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.Tagger;
Expand All @@ -32,13 +37,6 @@
@Internal
public final class InternalCensusStatsAccessor {

private static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = new Supplier<Stopwatch>() {
@Override
public Stopwatch get() {
return Stopwatch.createUnstarted();
}
};

// Prevent instantiation.
private InternalCensusStatsAccessor() {
}
Expand All @@ -51,13 +49,8 @@ public static ClientInterceptor getClientInterceptor(
boolean recordFinishedRpcs,
boolean recordRealTimeMetrics) {
CensusStatsModule censusStats =
new CensusStatsModule(
STOPWATCH_SUPPLIER,
true, /* propagateTags */
recordStartedRpcs,
recordFinishedRpcs,
recordRealTimeMetrics);
return censusStats.getClientInterceptor();
new CensusStatsModule(recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics);
return getClientInterceptor(censusStats);
}

/**
Expand All @@ -76,7 +69,22 @@ public static ClientInterceptor getClientInterceptor(
new CensusStatsModule(
tagger, tagCtxSerializer, statsRecorder, stopwatchSupplier,
propagateTags, recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics);
return censusStats.getClientInterceptor();
return getClientInterceptor(censusStats);
}

private static ClientInterceptor getClientInterceptor(CensusStatsModule module) {
final ClientInterceptor interceptor = module.getClientInterceptor();
return new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
if (callOptions.getOption(
InternalCensus.DISABLE_CLIENT_DEFAULT_CENSUS) != null) {
return next.newCall(method, callOptions);
}
return interceptor.interceptCall(method, callOptions, next);
}
};
}

/**
Expand All @@ -87,12 +95,7 @@ public static ServerStreamTracer.Factory getServerStreamTracerFactory(
boolean recordFinishedRpcs,
boolean recordRealTimeMetrics) {
CensusStatsModule censusStats =
new CensusStatsModule(
STOPWATCH_SUPPLIER,
true, /* propagateTags */
recordStartedRpcs,
recordFinishedRpcs,
recordRealTimeMetrics);
new CensusStatsModule(recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics);
return censusStats.getServerTracerFactory();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@

package io.grpc.census;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Internal;
import io.grpc.InternalCensus;
import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer;
import io.opencensus.trace.Tracing;

/**
* Accessor for getting {@link ClientInterceptor} or {@link ServerStreamTracer.Factory} with
Expand All @@ -37,20 +41,27 @@ private InternalCensusTracingAccessor() {
*/
public static ClientInterceptor getClientInterceptor() {
CensusTracingModule censusTracing =
new CensusTracingModule(
Tracing.getTracer(),
Tracing.getPropagationComponent().getBinaryFormat());
return censusTracing.getClientInterceptor();
new CensusTracingModule();
final ClientInterceptor interceptor = censusTracing.getClientInterceptor();
return new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
if (callOptions.getOption(
InternalCensus.DISABLE_CLIENT_DEFAULT_CENSUS) != null) {
return next.newCall(method, callOptions);
}
return interceptor.interceptCall(method, callOptions, next);
}
};
}

/**
* Returns a {@link ServerStreamTracer.Factory} with default stats implementation.
*/
public static ServerStreamTracer.Factory getServerStreamTracerFactory() {
CensusTracingModule censusTracing =
new CensusTracingModule(
Tracing.getTracer(),
Tracing.getPropagationComponent().getBinaryFormat());
new CensusTracingModule();
return censusTracing.getServerTracerFactory();
}
}
Loading