diff --git a/etc/requirements.txt b/etc/requirements.txt index 626270f36a..7417e32a5b 100644 --- a/etc/requirements.txt +++ b/etc/requirements.txt @@ -14,7 +14,7 @@ google-auth-oauthlib==0.4.6 google-cloud-pubsub==2.12.1 googleapis-common-protos==1.54.0 grpc-google-iam-v1==0.12.3 -grpcio==1.53.0 +grpcio==1.53.2 grpcio-status==1.48.2 htmlmin==0.1.12 httplib2==0.20.4 diff --git a/udmif/api/package-lock.json b/udmif/api/package-lock.json index d7197557c0..f6256c5790 100644 --- a/udmif/api/package-lock.json +++ b/udmif/api/package-lock.json @@ -4529,9 +4529,9 @@ } }, "node_modules/ip": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/ip/-/ip-2.0.0.tgz", - "integrity": "sha512-WKa+XuLG1A1R0UWhl2+1XQSi+fZWMsYKffMZTTYsiZaUD8k2yDAj5atimTUD2TZkyCkNEeYE5NhFZmupOGtjYQ==", + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/ip/-/ip-2.0.1.tgz", + "integrity": "sha512-lJUL9imLTNi1ZfXT+DU6rBBdbiKGBuay9B6xGSPVjUeQwaH1RIGqef8RZkUtHioLmSNpPR5M4HVKJGm1j8FWVQ==", "dev": true, "optional": true }, @@ -12750,9 +12750,9 @@ "integrity": "sha512-Ju0Bz/cEia55xDwUWEa8+olFpCiQoypjnQySseKtmjNrnps3P+xfpUmGr90T7yjlVJmOtybRvPXhKMbHr+fWnw==" }, "ip": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/ip/-/ip-2.0.0.tgz", - "integrity": "sha512-WKa+XuLG1A1R0UWhl2+1XQSi+fZWMsYKffMZTTYsiZaUD8k2yDAj5atimTUD2TZkyCkNEeYE5NhFZmupOGtjYQ==", + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/ip/-/ip-2.0.1.tgz", + "integrity": "sha512-lJUL9imLTNi1ZfXT+DU6rBBdbiKGBuay9B6xGSPVjUeQwaH1RIGqef8RZkUtHioLmSNpPR5M4HVKJGm1j8FWVQ==", "dev": true, "optional": true }, diff --git a/udmif/event-handler/package-lock.json b/udmif/event-handler/package-lock.json index a7e58b9c50..8b07c32f28 100644 --- a/udmif/event-handler/package-lock.json +++ b/udmif/event-handler/package-lock.json @@ -4743,9 +4743,9 @@ } }, "node_modules/ip": { - "version": "1.1.5", - "resolved": "https://registry.npmjs.org/ip/-/ip-1.1.5.tgz", - "integrity": "sha1-vd7XARQpCCjAoDnnLvJfWq7ENUo=", + "version": "1.1.9", + "resolved": "https://registry.npmjs.org/ip/-/ip-1.1.9.tgz", + "integrity": "sha512-cyRxvOEpNHNtchU3Ln9KC/auJgup87llfQpQ+t5ghoC/UhL16SWzbueiCsdTnWmqAWl7LadfuwhlqmtOaqMHdQ==", "dev": true, "optional": true }, @@ -13162,9 +13162,9 @@ "integrity": "sha512-Ju0Bz/cEia55xDwUWEa8+olFpCiQoypjnQySseKtmjNrnps3P+xfpUmGr90T7yjlVJmOtybRvPXhKMbHr+fWnw==" }, "ip": { - "version": "1.1.5", - "resolved": "https://registry.npmjs.org/ip/-/ip-1.1.5.tgz", - "integrity": "sha1-vd7XARQpCCjAoDnnLvJfWq7ENUo=", + "version": "1.1.9", + "resolved": "https://registry.npmjs.org/ip/-/ip-1.1.9.tgz", + "integrity": "sha512-cyRxvOEpNHNtchU3Ln9KC/auJgup87llfQpQ+t5ghoC/UhL16SWzbueiCsdTnWmqAWl7LadfuwhlqmtOaqMHdQ==", "dev": true, "optional": true }, diff --git a/udmif/web/package-lock.json b/udmif/web/package-lock.json index 4b5c6694ab..92beff291e 100644 --- a/udmif/web/package-lock.json +++ b/udmif/web/package-lock.json @@ -7062,9 +7062,9 @@ } }, "node_modules/ip": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/ip/-/ip-2.0.0.tgz", - "integrity": "sha512-WKa+XuLG1A1R0UWhl2+1XQSi+fZWMsYKffMZTTYsiZaUD8k2yDAj5atimTUD2TZkyCkNEeYE5NhFZmupOGtjYQ==", + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/ip/-/ip-2.0.1.tgz", + "integrity": "sha512-lJUL9imLTNi1ZfXT+DU6rBBdbiKGBuay9B6xGSPVjUeQwaH1RIGqef8RZkUtHioLmSNpPR5M4HVKJGm1j8FWVQ==", "dev": true }, "node_modules/ipaddr.js": { @@ -17687,9 +17687,9 @@ } }, "ip": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/ip/-/ip-2.0.0.tgz", - "integrity": "sha512-WKa+XuLG1A1R0UWhl2+1XQSi+fZWMsYKffMZTTYsiZaUD8k2yDAj5atimTUD2TZkyCkNEeYE5NhFZmupOGtjYQ==", + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/ip/-/ip-2.0.1.tgz", + "integrity": "sha512-lJUL9imLTNi1ZfXT+DU6rBBdbiKGBuay9B6xGSPVjUeQwaH1RIGqef8RZkUtHioLmSNpPR5M4HVKJGm1j8FWVQ==", "dev": true }, "ipaddr.js": { diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/LocalIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/LocalIotAccessProvider.java index f7964ec3fa..bcdf8c8122 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/LocalIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/LocalIotAccessProvider.java @@ -17,6 +17,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.jetbrains.annotations.TestOnly; +import org.jetbrains.annotations.VisibleForTesting; import udmi.schema.CloudModel; import udmi.schema.Envelope.SubFolder; import udmi.schema.IotAccess; @@ -28,6 +29,7 @@ public class LocalIotAccessProvider extends IotAccessBase { private static final Map> DEVICE_CONFIGS = new HashMap<>(); + @VisibleForTesting BlockingQueue sentCommands = new LinkedBlockingQueue<>(); private boolean failActivation; @@ -38,27 +40,6 @@ public LocalIotAccessProvider(IotAccess iotAccess) { super(iotAccess); } - @Override - public String updateConfig(String registryId, String deviceId, String config, Long version) { - Entry entry = DEVICE_CONFIGS.get(deviceId); - if (version != null && !entry.getKey().equals(version)) { - throw new IllegalStateException("Config version mismatch"); - } - Long previous = Optional.ofNullable(entry).orElse(new SimpleEntry<>(0L, "")).getKey(); - DEVICE_CONFIGS.put(deviceId, new SimpleEntry<>(previous + 1, config)); - return config; - } - - @Override - public boolean isEnabled() { - return true; - } - - @Override - public Set getRegistriesForRegion(String region) { - return ImmutableSet.of(); - } - @Override public void activate() { debug("activate"); @@ -75,6 +56,11 @@ public CloudModel fetchDevice(String deviceRegistryId, String deviceId) { throw new RuntimeException("Not yet implemented"); } + @Override + public String fetchRegistryMetadata(String registryId, String metadataKey) { + throw new RuntimeException("Not yet implemented"); + } + @Override public String fetchState(String deviceRegistryId, String deviceId) { throw new RuntimeException("Not yet implemented"); @@ -84,6 +70,16 @@ public List getCommands() { return using(new ArrayList<>(), sentCommands::drainTo); } + @Override + public Set getRegistriesForRegion(String region) { + return ImmutableSet.of(); + } + + @Override + public boolean isEnabled() { + return true; + } + @Override public CloudModel listDevices(String deviceRegistryId) { throw new RuntimeException("Not yet implemented"); @@ -111,7 +107,13 @@ public void shutdown() { } @Override - public String fetchRegistryMetadata(String registryId, String metadataKey) { - throw new RuntimeException("Not yet implemented"); + public String updateConfig(String registryId, String deviceId, String config, Long version) { + Entry entry = DEVICE_CONFIGS.get(deviceId); + if (version != null && !entry.getKey().equals(version)) { + throw new IllegalStateException("Config version mismatch"); + } + Long previous = Optional.ofNullable(entry).orElse(new SimpleEntry<>(0L, "")).getKey(); + DEVICE_CONFIGS.put(deviceId, new SimpleEntry<>(previous + 1, config)); + return config; } } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java index 4dfa5e7829..43c261ba97 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java @@ -39,7 +39,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -145,9 +145,6 @@ protected Bundle makeHelloBundle() { return bundle; } - protected void pauseSubscribers() { - } - protected abstract void publishRaw(Bundle bundle); protected void pushQueueEntry(BlockingQueue queue, String stringBundle) { @@ -183,9 +180,6 @@ protected void receiveMessage(Envelope envelope, Map messageMap) { receiveMessage(toStringMap(envelope), stringify(messageMap)); } - protected void resumeSubscribers() { - } - protected void setSourceQueue(BlockingQueue queueForScope) { sourceQueue = queueForScope; } @@ -206,16 +200,14 @@ protected void throttleQueue() { boolean releaseReceiver = receiveQueueSize < QUEUE_THROTTLE_MARK / 2.0; boolean releasePublisher = publishQueueSize < QUEUE_THROTTLE_MARK / 2.0; - String message = format("Message queues at %.03f/%.03f", receiveQueueSize, publishQueueSize); + String message = messageQueueMessage(); if (blockReceiver || blockPublisher) { if (!subscriptionsThrottled.getAndSet(true)) { - warn(message + ", pausing subscribers"); - pauseSubscribers(); + warn(message + ", crossing high-water mark"); } } else if (releaseReceiver && releasePublisher) { if (subscriptionsThrottled.getAndSet(false)) { - warn(message + ", resuming subscribers"); - resumeSubscribers(); + warn(message + ", below high-water mark"); } } } @@ -233,8 +225,8 @@ private synchronized void accumulateStats(String statsBucket, private synchronized void ensureSourceQueue() { if (sourceQueue == null) { - notice(format("Creating new source queue with capacity " + queueCapacity)); - sourceQueue = new LinkedBlockingDeque<>(queueCapacity); + notice(format("Creating new source queue %s with capacity %s", pipeId, queueCapacity)); + sourceQueue = new LinkedBlockingQueue<>(queueCapacity); } } @@ -322,6 +314,12 @@ private void messageLoop(String id) { } } + private String messageQueueMessage() { + double receiveQueue = getReceiveQueueSize(); + double publishQueue = getPublishQueueSize(); + return format("Message queue %s at %.03f/%.03f", pipeId, receiveQueue, publishQueue); + } + private void receiveBundle(Bundle bundle) { receiveBundle(stringify(bundle)); } @@ -408,6 +406,8 @@ private void shutdownExecutor() { @Override public void activate(Consumer bundleConsumer) { + debug("Activating message pipe %s as %s => %s", pipeId, queueIdentifier(), + Objects.hash(dispatcher)); dispatcher = bundleConsumer; ensureSourceQueue(); debug("Handling %s", this); @@ -435,7 +435,7 @@ public synchronized Map extractStats() { double receiveQueue = getReceiveQueueSize(); double publishQueue = getPublishQueueSize(); if (subscriptionsThrottled.get()) { - warn("Message queues at %.03f/%.03f, currently paused", receiveQueue, publishQueue); + warn(messageQueueMessage() + ", currently paused"); } return ImmutableMap.of( RECEIVE_STATS, extractStat(receiveStats, receiveQueue), @@ -492,7 +492,7 @@ public void terminate() { @Override public String toString() { - return format("%s %s => %s", pipeId, queueIdentifier(), Objects.hash(dispatcher)); + return pipeId; } /** diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java index 54b6e9c158..1bbbf0e24c 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java @@ -17,6 +17,7 @@ import com.google.api.core.ApiService; import com.google.api.core.ApiService.Listener; import com.google.api.core.ApiService.State; +import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.FixedTransportChannelProvider; @@ -138,14 +139,6 @@ private void checkPublisher() { @Override public void activate(Consumer bundleConsumer) { super.activate(bundleConsumer); - resumeSubscribers(); - } - - @Override - protected void resumeSubscribers() { - if (subscribers == null) { - initializeSubscribers(); - } subscribers.forEach(Subscriber::startAsync); } @@ -203,11 +196,6 @@ public void shutdown() { super.shutdown(); } - @VisibleForTesting - protected void pauseSubscribers() { - stopAsyncSubscribers(); - } - private List stopAsyncSubscribers() { List apiServices = subscribers.stream().map(AbstractApiService::stopAsync).toList(); subscribers = null; @@ -235,12 +223,15 @@ Publisher getPublisher(String topicName) { Subscriber getSubscriber(String subName) { try { ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subName); - Subscriber.Builder builder = Subscriber.newBuilder(subscriptionName, this); + FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount((long) queueCapacity).build(); + Subscriber.Builder builder = Subscriber.newBuilder(subscriptionName, this) + .setParallelPullCount(EXECUTION_THREADS) + .setFlowControlSettings(flowControlSettings); String emu = getEmulatorHost(); ifNullThen(emu, () -> checkSubscription(subscriptionName)); ifNotNullThen(emu, host -> builder.setChannelProvider(getTransportChannelProvider(host))); ifNotNullThen(emu, host -> builder.setCredentialsProvider(NoCredentialsProvider.create())); - builder.setParallelPullCount(EXECUTION_THREADS); Subscriber built = builder.build(); info(format("Subscriber %s:%s", Optional.ofNullable(emu).orElse(GCP_HOST), subscriptionName)); built.addListener(new Listener() {