Skip to content

Commit

Permalink
Update the grpc config interface methods
Browse files Browse the repository at this point in the history
Change getMaxMessageSize to getMaxReceivedMessageSize in
IGrpcConfiguration.

Change the return types of the IGrpcConfiguration methods that return
lengths of time from integer to Duration.

Change the internal representation of the grpc configuration values from
Optionals to nullable objects, because Optional in Java is only meant
for method return types.
  • Loading branch information
nand4011 committed Jan 9, 2025
1 parent 0862151 commit 9f05137
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
import java.time.Duration;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import momento.sdk.internal.GrpcChannelOptions;

/** Abstracts away the gRPC configuration tunables. */
public class GrpcConfiguration implements IGrpcConfiguration {

private final Duration deadline;
private final @Nonnull Duration deadline;
private final int minNumGrpcChannels;
private final Optional<Integer> maxMessageSize;
private final Optional<Boolean> keepAliveWithoutCalls;
private final Optional<Integer> keepAliveTimeoutMs;
private final Optional<Integer> keepAliveTimeMs;
private final @Nullable Integer maxMessageSize;
private final @Nullable Boolean keepAliveWithoutCalls;
private final @Nullable Duration keepAliveTimeout;
private final @Nullable Duration keepAliveTime;

/**
* Constructs a GrpcConfiguration.
Expand All @@ -26,10 +27,10 @@ public GrpcConfiguration(@Nonnull Duration deadline) {
this(
deadline,
1,
Optional.of(GrpcChannelOptions.DEFAULT_MAX_MESSAGE_SIZE),
Optional.of(GrpcChannelOptions.DEFAULT_KEEPALIVE_WITHOUT_STREAM),
Optional.of(GrpcChannelOptions.DEFAULT_KEEPALIVE_TIMEOUT_MS),
Optional.of(GrpcChannelOptions.DEFAULT_KEEPALIVE_TIME_MS));
GrpcChannelOptions.DEFAULT_MAX_MESSAGE_SIZE,
GrpcChannelOptions.DEFAULT_KEEPALIVE_WITHOUT_STREAM,
GrpcChannelOptions.DEFAULT_KEEPALIVE_TIMEOUT,
GrpcChannelOptions.DEFAULT_KEEPALIVE_TIME);
}

/**
Expand All @@ -40,9 +41,9 @@ public GrpcConfiguration(@Nonnull Duration deadline) {
* @param maxMessageSize The maximum size of a message (in bytes) that can be received by the
* client.
* @param keepAliveWithoutCalls Whether to send keepalive pings without any active calls.
* @param keepAliveTimeout The time to wait for a keepalive ping response before considering the
* connection dead.
* @param keepAliveTime The time to wait between keepalive pings.
* @param keepAliveTimeout The time in milliseconds to wait for a keepalive ping response before
* considering the connection dead.
* @param keepAliveTime The time in milliseconds to wait between keepalive pings.
*/
public GrpcConfiguration(
@Nonnull Duration deadline,
Expand All @@ -51,13 +52,41 @@ public GrpcConfiguration(
Optional<Boolean> keepAliveWithoutCalls,
Optional<Integer> keepAliveTimeout,
Optional<Integer> keepAliveTime) {
this(
deadline,
minNumGrpcChannels,
maxMessageSize.orElse(null),
keepAliveWithoutCalls.orElse(null),
keepAliveTimeout.map(Duration::ofMillis).orElse(null),
keepAliveTime.map(Duration::ofMillis).orElse(null));
}

/**
* Constructs a GrpcConfiguration.
*
* @param deadline The maximum duration of a gRPC call.
* @param minNumGrpcChannels The minimum number of gRPC channels to keep open at any given time.
* @param maxMessageSize The maximum size of a message (in bytes) that can be received by the
* client.
* @param keepAliveWithoutCalls Whether to send keepalive pings without any active calls.
* @param keepAliveTimeout The time to wait for a keepalive ping response before considering the
* connection dead.
* @param keepAliveTime The time to wait between keepalive pings.
*/
public GrpcConfiguration(
@Nonnull Duration deadline,
int minNumGrpcChannels,
@Nullable Integer maxMessageSize,
@Nullable Boolean keepAliveWithoutCalls,
@Nullable Duration keepAliveTimeout,
@Nullable Duration keepAliveTime) {
ensureRequestDeadlineValid(deadline);
this.deadline = deadline;
this.minNumGrpcChannels = minNumGrpcChannels;
this.maxMessageSize = maxMessageSize;
this.keepAliveWithoutCalls = keepAliveWithoutCalls;
this.keepAliveTimeoutMs = keepAliveTimeout;
this.keepAliveTimeMs = keepAliveTime;
this.keepAliveTimeout = keepAliveTimeout;
this.keepAliveTime = keepAliveTime;
}

@Override
Expand All @@ -77,8 +106,8 @@ public GrpcConfiguration withDeadline(Duration deadline) {
minNumGrpcChannels,
maxMessageSize,
keepAliveWithoutCalls,
keepAliveTimeoutMs,
keepAliveTimeMs);
keepAliveTimeout,
keepAliveTime);
}

@Override
Expand All @@ -98,13 +127,22 @@ public GrpcConfiguration withMinNumGrpcChannels(int minNumGrpcChannels) {
minNumGrpcChannels,
maxMessageSize,
keepAliveWithoutCalls,
keepAliveTimeoutMs,
keepAliveTimeMs);
keepAliveTimeout,
keepAliveTime);
}

@Override
/**
* The maximum size of a message (in bytes) that can be received by the client.
*
* @return the maximum message size, or empty if there is no specified maximum.
*/
public Optional<Integer> getMaxMessageSize() {
return maxMessageSize;
return getMaxReceivedMessageSize();
}

@Override
public Optional<Integer> getMaxReceivedMessageSize() {
return Optional.ofNullable(maxMessageSize);
}

/**
Expand All @@ -117,15 +155,15 @@ public GrpcConfiguration withMaxMessageSize(int maxMessageSize) {
return new GrpcConfiguration(
deadline,
minNumGrpcChannels,
Optional.of(maxMessageSize),
maxMessageSize,
keepAliveWithoutCalls,
keepAliveTimeoutMs,
keepAliveTimeMs);
keepAliveTimeout,
keepAliveTime);
}

@Override
public Optional<Boolean> getKeepAliveWithoutCalls() {
return keepAliveWithoutCalls;
return Optional.ofNullable(keepAliveWithoutCalls);
}

/**
Expand All @@ -144,18 +182,46 @@ public Optional<Boolean> getKeepAliveWithoutCalls() {
* @return The updated GrpcConfiguration.
*/
public GrpcConfiguration withKeepAliveWithoutCalls(Optional<Boolean> keepAliveWithoutCalls) {
return withKeepAliveWithoutCalls(keepAliveWithoutCalls.orElse(null));
}

/**
* Copy constructor that updates whether keepalive will be performed when there are no outstanding
* requests on a connection.
*
* <p>NOTE: keep-alives are very important for long-lived server environments where there may be
* periods of time when the connection is idle. However, they are very problematic for lambda
* environments where the lambda runtime is continuously frozen and unfrozen, because the lambda
* may be frozen before the "ACK" is received from the server. This can cause the keep-alive to
* timeout even though the connection is completely healthy. Therefore, keep-alives should be
* disabled in lambda and similar environments.
*
* @param keepAliveWithoutCalls The boolean indicating whether to send keepalive pings without any
* active calls.
* @return The updated GrpcConfiguration.
*/
public GrpcConfiguration withKeepAliveWithoutCalls(@Nullable Boolean keepAliveWithoutCalls) {
return new GrpcConfiguration(
deadline,
minNumGrpcChannels,
maxMessageSize,
keepAliveWithoutCalls,
keepAliveTimeoutMs,
keepAliveTimeMs);
keepAliveTimeout,
keepAliveTime);
}

@Override
/**
* The time to wait for a keepalive ping response before considering the connection dead.
*
* @return the time to wait for a keepalive ping response before considering the connection dead.
*/
public Optional<Integer> getKeepAliveTimeoutMs() {
return keepAliveTimeoutMs;
return getKeepAliveTimeout().map(d -> (int) d.toMillis());
}

@Override
public Optional<Duration> getKeepAliveTimeout() {
return Optional.ofNullable(keepAliveTimeout);
}

/**
Expand All @@ -178,13 +244,22 @@ public GrpcConfiguration withKeepAliveTimeout(int keepAliveTimeoutMs) {
minNumGrpcChannels,
maxMessageSize,
keepAliveWithoutCalls,
Optional.of(keepAliveTimeoutMs),
keepAliveTimeMs);
Duration.ofMillis(keepAliveTimeoutMs),
keepAliveTime);
}

@Override
/**
* The time to wait between keepalive pings.
*
* @return the time to wait between keepalive pings.
*/
public Optional<Integer> getKeepAliveTimeMs() {
return keepAliveTimeMs;
return getKeepAliveTime().map(d -> (int) d.toMillis());
}

@Override
public Optional<Duration> getKeepAliveTime() {
return Optional.ofNullable(keepAliveTime);
}

/**
Expand All @@ -206,8 +281,8 @@ public GrpcConfiguration withKeepAliveTime(int keepAliveTimeMs) {
minNumGrpcChannels,
maxMessageSize,
keepAliveWithoutCalls,
keepAliveTimeoutMs,
Optional.of(keepAliveTimeMs));
keepAliveTimeout,
Duration.ofMillis(keepAliveTimeMs));
}

/**
Expand All @@ -223,12 +298,6 @@ public GrpcConfiguration withKeepAliveTime(int keepAliveTimeMs) {
* @return The updated GrpcConfiguration.
*/
public GrpcConfiguration withKeepAliveDisabled() {
return new GrpcConfiguration(
deadline,
minNumGrpcChannels,
maxMessageSize,
Optional.empty(),
Optional.empty(),
Optional.empty());
return new GrpcConfiguration(deadline, minNumGrpcChannels, maxMessageSize, null, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public interface IGrpcConfiguration {
*
* @return the maximum message size, or empty if there is no specified maximum.
*/
Optional<Integer> getMaxMessageSize();
Optional<Integer> getMaxReceivedMessageSize();

/**
* Whether keepalive will be performed when there are no outstanding requests on a connection.
Expand All @@ -38,12 +38,12 @@ public interface IGrpcConfiguration {
*
* @return the time to wait for a keepalive ping response before considering the connection dead.
*/
Optional<Integer> getKeepAliveTimeoutMs();
Optional<Duration> getKeepAliveTimeout();

/**
* The time to wait between keepalive pings.
*
* @return the time to wait between keepalive pings.
*/
Optional<Integer> getKeepAliveTimeMs();
Optional<Duration> getKeepAliveTime();
}
Loading

0 comments on commit 9f05137

Please sign in to comment.