Skip to content

Commit

Permalink
Merge branch 'master' into documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Feb 27, 2024
2 parents 78b5864 + a4d99f1 commit 6d2d83f
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 73 deletions.
2 changes: 1 addition & 1 deletion etc/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ google-auth-oauthlib==0.4.6
google-cloud-pubsub==2.12.1
googleapis-common-protos==1.54.0
grpc-google-iam-v1==0.12.3
grpcio==1.53.0
grpcio==1.53.2
grpcio-status==1.48.2
htmlmin==0.1.12
httplib2==0.20.4
Expand Down
12 changes: 6 additions & 6 deletions udmif/api/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions udmif/event-handler/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions udmif/web/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.jetbrains.annotations.TestOnly;
import org.jetbrains.annotations.VisibleForTesting;
import udmi.schema.CloudModel;
import udmi.schema.Envelope.SubFolder;
import udmi.schema.IotAccess;
Expand All @@ -28,6 +29,7 @@
public class LocalIotAccessProvider extends IotAccessBase {

private static final Map<String, Entry<Long, String>> DEVICE_CONFIGS = new HashMap<>();
@VisibleForTesting
BlockingQueue<String> sentCommands = new LinkedBlockingQueue<>();
private boolean failActivation;

Expand All @@ -38,27 +40,6 @@ public LocalIotAccessProvider(IotAccess iotAccess) {
super(iotAccess);
}

@Override
public String updateConfig(String registryId, String deviceId, String config, Long version) {
Entry<Long, String> entry = DEVICE_CONFIGS.get(deviceId);
if (version != null && !entry.getKey().equals(version)) {
throw new IllegalStateException("Config version mismatch");
}
Long previous = Optional.ofNullable(entry).orElse(new SimpleEntry<>(0L, "")).getKey();
DEVICE_CONFIGS.put(deviceId, new SimpleEntry<>(previous + 1, config));
return config;
}

@Override
public boolean isEnabled() {
return true;
}

@Override
public Set<String> getRegistriesForRegion(String region) {
return ImmutableSet.of();
}

@Override
public void activate() {
debug("activate");
Expand All @@ -75,6 +56,11 @@ public CloudModel fetchDevice(String deviceRegistryId, String deviceId) {
throw new RuntimeException("Not yet implemented");
}

@Override
public String fetchRegistryMetadata(String registryId, String metadataKey) {
throw new RuntimeException("Not yet implemented");
}

@Override
public String fetchState(String deviceRegistryId, String deviceId) {
throw new RuntimeException("Not yet implemented");
Expand All @@ -84,6 +70,16 @@ public List<String> getCommands() {
return using(new ArrayList<>(), sentCommands::drainTo);
}

@Override
public Set<String> getRegistriesForRegion(String region) {
return ImmutableSet.of();
}

@Override
public boolean isEnabled() {
return true;
}

@Override
public CloudModel listDevices(String deviceRegistryId) {
throw new RuntimeException("Not yet implemented");
Expand Down Expand Up @@ -111,7 +107,13 @@ public void shutdown() {
}

@Override
public String fetchRegistryMetadata(String registryId, String metadataKey) {
throw new RuntimeException("Not yet implemented");
public String updateConfig(String registryId, String deviceId, String config, Long version) {
Entry<Long, String> entry = DEVICE_CONFIGS.get(deviceId);
if (version != null && !entry.getKey().equals(version)) {
throw new IllegalStateException("Config version mismatch");
}
Long previous = Optional.ofNullable(entry).orElse(new SimpleEntry<>(0L, "")).getKey();
DEVICE_CONFIGS.put(deviceId, new SimpleEntry<>(previous + 1, config));
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -145,9 +145,6 @@ protected Bundle makeHelloBundle() {
return bundle;
}

protected void pauseSubscribers() {
}

protected abstract void publishRaw(Bundle bundle);

protected void pushQueueEntry(BlockingQueue<QueueEntry> queue, String stringBundle) {
Expand Down Expand Up @@ -183,9 +180,6 @@ protected void receiveMessage(Envelope envelope, Map<?, ?> messageMap) {
receiveMessage(toStringMap(envelope), stringify(messageMap));
}

protected void resumeSubscribers() {
}

protected void setSourceQueue(BlockingQueue<QueueEntry> queueForScope) {
sourceQueue = queueForScope;
}
Expand All @@ -206,16 +200,14 @@ protected void throttleQueue() {
boolean releaseReceiver = receiveQueueSize < QUEUE_THROTTLE_MARK / 2.0;
boolean releasePublisher = publishQueueSize < QUEUE_THROTTLE_MARK / 2.0;

String message = format("Message queues at %.03f/%.03f", receiveQueueSize, publishQueueSize);
String message = messageQueueMessage();
if (blockReceiver || blockPublisher) {
if (!subscriptionsThrottled.getAndSet(true)) {
warn(message + ", pausing subscribers");
pauseSubscribers();
warn(message + ", crossing high-water mark");
}
} else if (releaseReceiver && releasePublisher) {
if (subscriptionsThrottled.getAndSet(false)) {
warn(message + ", resuming subscribers");
resumeSubscribers();
warn(message + ", below high-water mark");
}
}
}
Expand All @@ -233,8 +225,8 @@ private synchronized void accumulateStats(String statsBucket,

private synchronized void ensureSourceQueue() {
if (sourceQueue == null) {
notice(format("Creating new source queue with capacity " + queueCapacity));
sourceQueue = new LinkedBlockingDeque<>(queueCapacity);
notice(format("Creating new source queue %s with capacity %s", pipeId, queueCapacity));
sourceQueue = new LinkedBlockingQueue<>(queueCapacity);
}
}

Expand Down Expand Up @@ -322,6 +314,12 @@ private void messageLoop(String id) {
}
}

private String messageQueueMessage() {
double receiveQueue = getReceiveQueueSize();
double publishQueue = getPublishQueueSize();
return format("Message queue %s at %.03f/%.03f", pipeId, receiveQueue, publishQueue);
}

private void receiveBundle(Bundle bundle) {
receiveBundle(stringify(bundle));
}
Expand Down Expand Up @@ -408,6 +406,8 @@ private void shutdownExecutor() {

@Override
public void activate(Consumer<Bundle> bundleConsumer) {
debug("Activating message pipe %s as %s => %s", pipeId, queueIdentifier(),
Objects.hash(dispatcher));
dispatcher = bundleConsumer;
ensureSourceQueue();
debug("Handling %s", this);
Expand Down Expand Up @@ -435,7 +435,7 @@ public synchronized Map<String, PipeStats> extractStats() {
double receiveQueue = getReceiveQueueSize();
double publishQueue = getPublishQueueSize();
if (subscriptionsThrottled.get()) {
warn("Message queues at %.03f/%.03f, currently paused", receiveQueue, publishQueue);
warn(messageQueueMessage() + ", currently paused");
}
return ImmutableMap.of(
RECEIVE_STATS, extractStat(receiveStats, receiveQueue),
Expand Down Expand Up @@ -492,7 +492,7 @@ public void terminate() {

@Override
public String toString() {
return format("%s %s => %s", pipeId, queueIdentifier(), Objects.hash(dispatcher));
return pipeId;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.api.core.ApiService;
import com.google.api.core.ApiService.Listener;
import com.google.api.core.ApiService.State;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
Expand Down Expand Up @@ -138,14 +139,6 @@ private void checkPublisher() {
@Override
public void activate(Consumer<Bundle> bundleConsumer) {
super.activate(bundleConsumer);
resumeSubscribers();
}

@Override
protected void resumeSubscribers() {
if (subscribers == null) {
initializeSubscribers();
}
subscribers.forEach(Subscriber::startAsync);
}

Expand Down Expand Up @@ -203,11 +196,6 @@ public void shutdown() {
super.shutdown();
}

@VisibleForTesting
protected void pauseSubscribers() {
stopAsyncSubscribers();
}

private List<ApiService> stopAsyncSubscribers() {
List<ApiService> apiServices = subscribers.stream().map(AbstractApiService::stopAsync).toList();
subscribers = null;
Expand Down Expand Up @@ -235,12 +223,15 @@ Publisher getPublisher(String topicName) {
Subscriber getSubscriber(String subName) {
try {
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subName);
Subscriber.Builder builder = Subscriber.newBuilder(subscriptionName, this);
FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount((long) queueCapacity).build();
Subscriber.Builder builder = Subscriber.newBuilder(subscriptionName, this)
.setParallelPullCount(EXECUTION_THREADS)
.setFlowControlSettings(flowControlSettings);
String emu = getEmulatorHost();
ifNullThen(emu, () -> checkSubscription(subscriptionName));
ifNotNullThen(emu, host -> builder.setChannelProvider(getTransportChannelProvider(host)));
ifNotNullThen(emu, host -> builder.setCredentialsProvider(NoCredentialsProvider.create()));
builder.setParallelPullCount(EXECUTION_THREADS);
Subscriber built = builder.build();
info(format("Subscriber %s:%s", Optional.ofNullable(emu).orElse(GCP_HOST), subscriptionName));
built.addListener(new Listener() {
Expand Down

0 comments on commit 6d2d83f

Please sign in to comment.