Skip to content

Commit

Permalink
feat: introduce java.time variables and methods
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarquezp committed Nov 20, 2024
1 parent 5f15e28 commit 64a3fc8
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -48,9 +51,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;
import org.threeten.bp.temporal.ChronoUnit;

/**
* Dispatches messages to a message receiver while handling the messages acking and lease
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -73,7 +74,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;

/**
* A Cloud Pub/Sub <a href="https://cloud.google.com/pubsub/docs/publisher">publisher</a>, that is
Expand Down Expand Up @@ -198,7 +198,7 @@ private Publisher(Builder builder) throws IOException {
// key?
retrySettingsBuilder
.setMaxAttempts(Integer.MAX_VALUE)
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE));
.setTotalTimeoutDuration(Duration.ofNanos(Long.MAX_VALUE));
}

PublisherStubSettings.Builder stubSettings =
Expand Down Expand Up @@ -740,7 +740,7 @@ public static final class Builder {
private static final double DEFAULT_MULTIPLIER = 4;
static final BatchingSettings DEFAULT_BATCHING_SETTINGS =
BatchingSettings.newBuilder()
.setDelayThreshold(DEFAULT_DELAY_THRESHOLD)
.setDelayThresholdDuration(DEFAULT_DELAY_THRESHOLD)
.setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD)
.setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD)
.setFlowControlSettings(
Expand All @@ -750,13 +750,13 @@ public static final class Builder {
.build();
static final RetrySettings DEFAULT_RETRY_SETTINGS =
RetrySettings.newBuilder()
.setTotalTimeout(DEFAULT_TOTAL_TIMEOUT)
.setInitialRetryDelay(DEFAULT_INITIAL_RETRY_DELAY)
.setTotalTimeoutDuration(DEFAULT_TOTAL_TIMEOUT)
.setInitialRetryDelayDuration(DEFAULT_INITIAL_RETRY_DELAY)
.setRetryDelayMultiplier(DEFAULT_MULTIPLIER)
.setMaxRetryDelay(DEFAULT_MAX_RETRY_DELAY)
.setInitialRpcTimeout(DEFAULT_INITIAL_RPC_TIMEOUT)
.setMaxRetryDelayDuration(DEFAULT_MAX_RETRY_DELAY)
.setInitialRpcTimeoutDuration(DEFAULT_INITIAL_RPC_TIMEOUT)
.setRpcTimeoutMultiplier(DEFAULT_MULTIPLIER)
.setMaxRpcTimeout(DEFAULT_MAX_RPC_TIMEOUT)
.setMaxRpcTimeoutDuration(DEFAULT_MAX_RPC_TIMEOUT)
.build();
static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false;
private static final int THREADS_PER_CPU = 5;
Expand Down Expand Up @@ -876,9 +876,9 @@ public Builder setBatchingSettings(BatchingSettings batchingSettings) {
/** Configures the Publisher's retry parameters. */
public Builder setRetrySettings(RetrySettings retrySettings) {
Preconditions.checkArgument(
retrySettings.getTotalTimeout().compareTo(MIN_TOTAL_TIMEOUT) >= 0);
retrySettings.getTotalTimeoutDuration().compareTo(MIN_TOTAL_TIMEOUT) >= 0);
Preconditions.checkArgument(
retrySettings.getInitialRpcTimeout().compareTo(MIN_RPC_TIMEOUT) >= 0);
retrySettings.getInitialRpcTimeoutDuration().compareTo(MIN_RPC_TIMEOUT) >= 0);
this.retrySettings = retrySettings;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
import io.opentelemetry.api.trace.Span;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -68,7 +69,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/** Implementation of {@link AckProcessor} based on Cloud Pub/Sub streaming pull. */
final class StreamingSubscriberConnection extends AbstractApiService implements AckProcessor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package com.google.cloud.pubsub.v1;

import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeDuration;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiService;
import com.google.api.core.BetaApi;
import com.google.api.core.CurrentMillisClock;
import com.google.api.core.InternalApi;
import com.google.api.core.ObsoleteApi;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
Expand Down Expand Up @@ -55,7 +58,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* A Cloud Pub/Sub <a href="https://cloud.google.com/pubsub/docs/subscriber">subscriber</a> that is
Expand Down Expand Up @@ -98,24 +100,37 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private static final int MAX_INBOUND_METADATA_SIZE =
4 * 1024 * 1024; // 4MB API maximum metadata size

@InternalApi static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60);
@InternalApi
static final java.time.Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD =
java.time.Duration.ofMinutes(60);

@InternalApi
static final Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY =
Duration.ofMinutes(1);
static final java.time.Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY =
java.time.Duration.ofMinutes(1);

@InternalApi static final Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION = Duration.ofMinutes(0);
@InternalApi static final Duration DEFAULT_MAX_ACK_DEADLINE_EXTENSION = Duration.ofSeconds(0);
@InternalApi
static final java.time.Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION =
java.time.Duration.ofMinutes(0);

@InternalApi static final Duration MIN_STREAM_ACK_DEADLINE = Duration.ofSeconds(10);
@InternalApi static final Duration MAX_STREAM_ACK_DEADLINE = Duration.ofSeconds(600);
@InternalApi
static final java.time.Duration DEFAULT_MAX_ACK_DEADLINE_EXTENSION =
java.time.Duration.ofSeconds(0);

@InternalApi static final Duration STREAM_ACK_DEADLINE_DEFAULT = Duration.ofSeconds(60);
@InternalApi
static final java.time.Duration MIN_STREAM_ACK_DEADLINE = java.time.Duration.ofSeconds(10);

@InternalApi
static final Duration STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT = Duration.ofSeconds(60);
static final java.time.Duration MAX_STREAM_ACK_DEADLINE = java.time.Duration.ofSeconds(600);

@InternalApi static final Duration ACK_EXPIRATION_PADDING_DEFAULT = Duration.ofSeconds(5);
@InternalApi
static final java.time.Duration STREAM_ACK_DEADLINE_DEFAULT = java.time.Duration.ofSeconds(60);

@InternalApi
static final java.time.Duration STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT =
java.time.Duration.ofSeconds(60);

@InternalApi
static final java.time.Duration ACK_EXPIRATION_PADDING_DEFAULT = java.time.Duration.ofSeconds(5);

private static final Logger logger = Logger.getLogger(Subscriber.class.getName());

Expand All @@ -124,10 +139,10 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;
private final Duration maxAckExtensionPeriod;
private final Duration maxDurationPerAckExtension;
private final java.time.Duration maxAckExtensionPeriod;
private final java.time.Duration maxDurationPerAckExtension;
private final boolean maxDurationPerAckExtensionDefaultUsed;
private final Duration minDurationPerAckExtension;
private final java.time.Duration minDurationPerAckExtension;
private final boolean minDurationPerAckExtensionDefaultUsed;

// The ExecutorProvider used to generate executors for processing messages.
Expand Down Expand Up @@ -490,10 +505,10 @@ public static final class Builder {
private MessageReceiver receiver;
private MessageReceiverWithAckResponse receiverWithAckResponse;

private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
private Duration minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION;
private java.time.Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
private java.time.Duration minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION;
private boolean minDurationPerAckExtensionDefaultUsed = true;
private Duration maxDurationPerAckExtension = DEFAULT_MAX_ACK_DEADLINE_EXTENSION;
private java.time.Duration maxDurationPerAckExtension = DEFAULT_MAX_ACK_DEADLINE_EXTENSION;
private boolean maxDurationPerAckExtensionDefaultUsed = true;

private boolean useLegacyFlowControl = false;
Expand All @@ -505,7 +520,7 @@ public static final class Builder {
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_INBOUND_METADATA_SIZE)
.setKeepAliveTime(Duration.ofMinutes(5))
.setKeepAliveTimeDuration(java.time.Duration.ofMinutes(5))
.build();
private HeaderProvider headerProvider = new NoHeaderProvider();
private CredentialsProvider credentialsProvider =
Expand Down Expand Up @@ -596,6 +611,15 @@ public Builder setUseLegacyFlowControl(boolean value) {
return this;
}

/**
* This method is obsolete. Use {@link #setMaxAckExtensionPeriodDuration(java.time.Duration)}
* instead.
*/
@ObsoleteApi("Use setMaxAckExtensionPeriodDuration(java.time.Duration) instead")
public Builder setMaxAckExtensionPeriod(org.threeten.bp.Duration maxAckExtensionPeriod) {
return setMaxAckExtensionPeriodDuration(toJavaTimeDuration(maxAckExtensionPeriod));
}

/**
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
*
Expand All @@ -605,12 +629,22 @@ public Builder setUseLegacyFlowControl(boolean value) {
*
* <p>A zero duration effectively disables auto deadline extensions.
*/
public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
public Builder setMaxAckExtensionPeriodDuration(java.time.Duration maxAckExtensionPeriod) {
Preconditions.checkArgument(maxAckExtensionPeriod.toMillis() >= 0);
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
return this;
}

/**
* This method is obsolete. Use {@link
* #setMaxDurationPerAckExtensionDuration(java.time.Duration)} instead.
*/
@ObsoleteApi("Use setMaxDurationPerAckExtensionDuration(java.time.Duration) instead")
public Builder setMaxDurationPerAckExtension(
org.threeten.bp.Duration maxDurationPerAckExtension) {
return setMaxDurationPerAckExtensionDuration(toJavaTimeDuration(maxDurationPerAckExtension));
}

/**
* Set the upper bound for a single mod ack extention period.
*
Expand All @@ -621,7 +655,8 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
*
* <p>MaxDurationPerAckExtension configuration can be disabled by specifying a zero duration.
*/
public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) {
public Builder setMaxDurationPerAckExtensionDuration(
java.time.Duration maxDurationPerAckExtension) {
// If a non-default min is set, make sure min is less than max
Preconditions.checkArgument(
maxDurationPerAckExtension.toMillis() >= 0
Expand All @@ -633,6 +668,16 @@ public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension
return this;
}

/**
* This method is obsolete. Use {@link
* #setMinDurationPerAckExtensionDuration(java.time.Duration)} instead.
*/
@ObsoleteApi("Use setMinDurationPerAckExtensionDuration(java.time.Duration) instead")
public Builder setMinDurationPerAckExtension(
org.threeten.bp.Duration minDurationPerAckExtension) {
return setMinDurationPerAckExtensionDuration(toJavaTimeDuration(minDurationPerAckExtension));
}

/**
* Set the lower bound for a single mod ack extention period.
*
Expand All @@ -643,7 +688,8 @@ public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension
*
* <p>MinDurationPerAckExtension configuration can be disabled by specifying a zero duration.
*/
public Builder setMinDurationPerAckExtension(Duration minDurationPerAckExtension) {
public Builder setMinDurationPerAckExtensionDuration(
java.time.Duration minDurationPerAckExtension) {
// If a non-default max is set, make sure min is less than max
Preconditions.checkArgument(
minDurationPerAckExtension.toMillis() >= 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.threeten.bp.Duration;

/**
* A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a Cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.SettableFuture;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
Expand All @@ -32,8 +34,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;

/**
* Fake implementation of {@link ScheduledExecutorService} that allows tests control the reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.threeten.bp.Duration;

public class MessageDispatcherTest {
private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data");
Expand Down
Loading

0 comments on commit 64a3fc8

Please sign in to comment.