Skip to content

Commit

Permalink
Merge branch 'deephaven:main' into upgrade-barrage-java-client
Browse files Browse the repository at this point in the history
  • Loading branch information
stanbrub authored Oct 28, 2024
2 parents 79315a0 + 3878744 commit 7ed68c9
Showing 1 changed file with 33 additions and 25 deletions.
58 changes: 33 additions & 25 deletions src/main/java/io/deephaven/benchmark/connect/BarrageConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
package io.deephaven.benchmark.connect;

import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
Expand All @@ -14,15 +11,7 @@
import org.apache.arrow.memory.RootAllocator;
import io.deephaven.benchmark.metric.Metrics;
import io.deephaven.benchmark.metric.MetricsFuture;
import io.deephaven.client.impl.BarrageSession;
import io.deephaven.client.impl.BarrageSessionFactory;
import io.deephaven.client.impl.BarrageSnapshot;
import io.deephaven.client.impl.BarrageSubscription;
import io.deephaven.client.impl.ConsoleSession;
import io.deephaven.client.impl.DaggerDeephavenBarrageRoot;
import io.deephaven.client.impl.FieldInfo;
import io.deephaven.client.impl.TableHandle;
import io.deephaven.client.impl.TableHandleManager;
import io.deephaven.client.impl.*;
import io.deephaven.client.impl.script.Changes;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.Table;
Expand Down Expand Up @@ -63,13 +52,19 @@ public class BarrageConnector implements AutoCloseable {
* @param hostPort a host and port string for connecting to a Deephaven worker (ex. localhost:10000)
*/
public BarrageConnector(String hostPort) {
String[] split = hostPort.split(":");
this.channel = getManagedChannel(split[0], Integer.parseInt(split[1]));
this.session = getSession(channel);
String[] split;
try {
split = hostPort.split(":");
} catch (Exception ex) {
throw new RuntimeException("Failed to parse connection hostPort: " + hostPort, ex);
}
try {
this.channel = getManagedChannel(split[0], Integer.parseInt(split[1]));
this.session = getSession(channel);
this.console = session.session().console("python").get();
} catch (Exception ex) {
throw new RuntimeException("Failed to get console for session on host: " + hostPort);
close();
throw new RuntimeException("Failed to get console for session on host: " + hostPort, ex);
}
}

Expand Down Expand Up @@ -165,8 +160,11 @@ public Future<Metrics> fetchTickingData(String table, Function<ResultTable, Bool
}

/**
* Close the connector session and associated resources. Note: Because of the nature of the Deephaven Community Core
* worker, closing the connector session does not close the session on the server.
* Make a best effort to close the connector session and all associated resources. No exception is thrown if the
* close fails.
* <p>
* Note: Because of the nature of the Deephaven Community Core worker, closing the connector session does not close
* the session on the server.
*/
public void close() {
try {
Expand All @@ -178,17 +176,27 @@ public void close() {
});
subscriptions.clear();
variableNames.clear();
} catch (Exception ex) {
}
try {
console.close();
} catch (Exception ex) {
}
try {
session.close();
} catch (Exception ex) {
throw new RuntimeException("Failed to close Session", ex);
} finally {
try {
session.session().closeFuture().get(5, TimeUnit.SECONDS);
} catch (Exception ex) {
}
}
try {
session.session().closeFuture().get(5, TimeUnit.SECONDS);
} catch (Exception ex) {
}
try {
scheduler.shutdownNow();
} catch (Exception ex) {
}
try {
channel.shutdownNow();
} catch (Exception ex) {
}
}

Expand Down

0 comments on commit 7ed68c9

Please sign in to comment.