Skip to content

Commit

Permalink
feat: TLS implementation in gRPC sink (#47)
Browse files Browse the repository at this point in the history
Implemented TLS support in gRPC sink.
also added 2 configurations for gRPC sink:-

SINK_GRPC_TLS_ENABLE → indicates whether the sink needs to be connected over TLS. Default value is false
SINK_GRPC_ROOT_CA → The ca-certificate

* feat: TLS support in gRPC sink

* restore gradle build to previous version

* cert file location changed

* feat: TLS implementation in gRPC sink

* refactor: removing print statements

* resolved comments

* managed channel builder refactor

* gRPC sink docs updated

* reformat: formatting

* files added in gitignore and version changed

* bump up version 0.10.5

* deleted test resource binary

* resolve comments
  • Loading branch information
Vaishnavi190900 authored Jul 30, 2024
1 parent ba3220e commit 1e2b0e0
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ generated
.project
.settings
bin
src/test/resources
.DS_Store
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.10.4'
version '0.10.5'

def projName = "firehose"

Expand Down Expand Up @@ -293,4 +293,4 @@ jacocoTestCoverageVerification {
}
}
}
check.dependsOn jacocoTestCoverageVerification
check.dependsOn jacocoTestCoverageVerification
15 changes: 15 additions & 0 deletions docs/docs/sinks/grpc-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ Defines the amount of time (in milliseconds) gRPC clients are willing to wait fo
- Example value: `1000`
- Type: `optional`

### `SINK_GRPC_TLS_ENABLE`

Indicates whether the sink needs to be connected over TLS. If set to true, the Firehose should establish a TLS connection with the SINK_GRPC_SERVICE_HOST.

- Example value: `true`
- Type: `optional`
- Default value: `false`

### `SINK_GRPC_ROOT_CA`

The CA certificates for the domain *.gojek.gcp.

- Example value: `base64 encoded string`
- Type: `required if SINK_GRPC_TLS_ENABLE is set to true.`

### `SINK_GRPC_RESPONSE_RETRY_CEL_EXPRESSION`

Defines the CEL(Common Expression Language) expression used to evaluate whether gRPC sink call should be retried or not based on the gRPC response.
Expand Down
6 changes: 5 additions & 1 deletion env/local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id
# SINK_GRPC_SERVICE_PORT=8500
# SINK_GRPC_METHOD_URL=com.tests.SampleServer/SomeMethod
# SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS=com.tests.SampleGrpcResponse
#SINK_GRPC_TLS_ENABLE=false
#SINK_GRPC_ROOT_CA=""

#
#
#############################################
Expand Down Expand Up @@ -199,6 +202,7 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id
# INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING={"1":"orderID", "2":"orderURL"}
# SINK_REDIS_LIST_DATA_PROTO_INDEX=2
# SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX=2
# INK_REDIS_TTL_TYPE=DISABLE
# SINK_REDIS_TTL_TYPE=DISABLE
# SINK_REDIS_TTL_VALUE=0
# SINK_REDIS_DEPLOYMENT_TYPE=Standalone

Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,10 @@ public interface GrpcSinkConfig extends AppConfig {
@ConverterClass(GrpcMetadataConverter.class)
Map<String, String> getSinkGrpcMetadata();

@Config.Key("SINK_GRPC_TLS_ENABLE")
@DefaultValue("false")
boolean getSinkGrpcTlsEnable();

@Config.Key("SINK_GRPC_ROOT_CA")
String getSinkGrpcRootCA();
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
package com.gotocompany.firehose.sink.grpc;



import com.gotocompany.depot.metrics.StatsDReporter;

import com.google.protobuf.Message;
import com.gotocompany.firehose.config.AppConfig;

import com.gotocompany.firehose.config.GrpcSinkConfig;
import com.gotocompany.firehose.evaluator.GrpcResponseCelPayloadEvaluator;
import com.gotocompany.firehose.evaluator.PayloadEvaluator;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;

import com.gotocompany.firehose.proto.ProtoToMetadataMapper;
import com.gotocompany.firehose.sink.grpc.client.GrpcClient;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.firehose.sink.AbstractSink;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import com.gotocompany.stencil.client.StencilClient;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import lombok.extern.slf4j.Slf4j;
import org.aeonbits.owner.ConfigFactory;

import javax.net.ssl.SSLException;
import java.io.ByteArrayInputStream;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand All @@ -26,6 +36,7 @@
* using the configurations supplied and invoke {@see #create(Map < String, String > configuration, StatsDClient client)}
* to obtain the GrpcSink sink implementation.
*/
@Slf4j
public class GrpcSinkFactory {

public static AbstractSink create(Map<String, String> configuration, StatsDReporter statsDReporter, StencilClient stencilClient) {
Expand All @@ -34,22 +45,46 @@ public static AbstractSink create(Map<String, String> configuration, StatsDRepor
String grpcSinkConfig = String.format("\n\tService host: %s\n\tService port: %s\n\tMethod url: %s\n\tResponse proto schema: %s",
grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort(), grpcConfig.getSinkGrpcMethodUrl(), grpcConfig.getSinkGrpcResponseSchemaProtoClass());
firehoseInstrumentation.logDebug(grpcSinkConfig);
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort())
boolean isTlsEnabled = grpcConfig.getSinkGrpcTlsEnable();
NettyChannelBuilder managedChannelBuilder = NettyChannelBuilder
.forAddress(grpcConfig.getSinkGrpcServiceHost(), grpcConfig.getSinkGrpcServicePort())
.keepAliveTime(grpcConfig.getSinkGrpcArgKeepaliveTimeMS(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS(), TimeUnit.MILLISECONDS)
.usePlaintext().build();
.keepAliveTimeout(grpcConfig.getSinkGrpcArgKeepaliveTimeoutMS(), TimeUnit.MILLISECONDS);
if (isTlsEnabled) {
String base64Cert = grpcConfig.getSinkGrpcRootCA();
SslContext sslContext = buildClientSslContext(base64Cert);
firehoseInstrumentation.logInfo("SSL Context created successfully.");
managedChannelBuilder.sslContext(sslContext);
} else {
managedChannelBuilder.usePlaintext();
}
AppConfig appConfig = ConfigFactory.create(AppConfig.class, configuration);
ProtoToMetadataMapper protoToMetadataMapper = new ProtoToMetadataMapper(stencilClient.get(appConfig.getInputSchemaProtoClass()), grpcConfig.getSinkGrpcMetadata());
GrpcClient grpcClient = new GrpcClient(new FirehoseInstrumentation(statsDReporter, GrpcClient.class), grpcConfig, managedChannel, stencilClient, protoToMetadataMapper);
firehoseInstrumentation.logInfo("GRPC connection established");
GrpcClient grpcClient = new GrpcClient(
new FirehoseInstrumentation(statsDReporter, GrpcClient.class),
grpcConfig,
managedChannelBuilder.build(),
stencilClient, protoToMetadataMapper);
firehoseInstrumentation.logInfo("gRPC Client created successfully.");
PayloadEvaluator<Message> grpcResponseRetryEvaluator = instantiatePayloadEvaluator(grpcConfig, stencilClient);
return new GrpcSink(new FirehoseInstrumentation(statsDReporter, GrpcSink.class), grpcClient, stencilClient, grpcConfig, grpcResponseRetryEvaluator);
}

private static SslContext buildClientSslContext(String base64Cert) {
try {
byte[] decodedBytes = Base64.getDecoder().decode(base64Cert);
ByteArrayInputStream certInputStream = new ByteArrayInputStream(decodedBytes);
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().trustManager(certInputStream);
return GrpcSslContexts.configure(sslContextBuilder).build();
} catch (SSLException e) {
throw new RuntimeException(e);
}
}

private static PayloadEvaluator<Message> instantiatePayloadEvaluator(GrpcSinkConfig grpcSinkConfig, StencilClient stencilClient) {
return new GrpcResponseCelPayloadEvaluator(
stencilClient.get(grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass()),
grpcSinkConfig.getSinkGrpcResponseRetryCELExpression());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.grpc.MethodDescriptor;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.MetadataUtils;
import io.grpc.Status;
import com.gotocompany.stencil.client.StencilClient;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.common.header.Header;
Expand Down Expand Up @@ -63,18 +64,20 @@ public DynamicMessage execute(byte[] logMessage, Headers headers) {
Metadata metadata = buildMetadata(headers, logMessage);
try {
Channel decoratedChannel = ClientInterceptors.intercept(managedChannel,
MetadataUtils.newAttachHeadersInterceptor(metadata));
MetadataUtils.newAttachHeadersInterceptor(metadata));
firehoseInstrumentation.logDebug("Calling gRPC with metadata: {}", metadata.toString());
byte[] response = ClientCalls.blockingUnaryCall(
decoratedChannel,
methodDescriptor,
decoratedDefaultCallOptions(),
logMessage);

return stencilClient.parse(grpcSinkConfig.getSinkGrpcResponseSchemaProtoClass(), response);

} catch (StatusRuntimeException sre) {
firehoseInstrumentation.logError("gRPC call failed with error message: {}", sre.getMessage());
if (sre.getStatus().getCode() == Status.Code.UNAVAILABLE) {
firehoseInstrumentation.logError("gRPC configurations are incorrect: {}", sre.getMessage());
} else {
firehoseInstrumentation.logError("gRPC call failed with error message: {}", sre.getMessage());
}
firehoseInstrumentation.incrementCounter(Metrics.SINK_GRPC_ERROR_TOTAL, "status=" + sre.getStatus().getCode());
} catch (Exception e) {
firehoseInstrumentation.logError("gRPC call failed with error message: {}", e.getMessage());
Expand Down

0 comments on commit 1e2b0e0

Please sign in to comment.