Skip to content
This repository has been archived by the owner on Aug 7, 2023. It is now read-only.

Commit

Permalink
Set fatal failure monitor (#119)
Browse files Browse the repository at this point in the history
* https://github.com/ChronicleEnterprise/Chronicle-FIX/issues/707 Allow setting of FatalFailureMonitor from config

* added "ClientConnectionMonitor clientConnectionMonitor"

Co-authored-by: Rob Austin <[email protected]>
  • Loading branch information
RobAustin and Rob Austin authored Jun 23, 2021
1 parent eeed1e0 commit 6351895
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package net.openhft.chronicle.network;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.network.connection.ClientConnectionMonitor;
import net.openhft.chronicle.network.connection.FatalFailureMonitor;
import net.openhft.chronicle.network.connection.SocketAddressSupplier;
import net.openhft.chronicle.network.tcp.ChronicleSocketChannel;
Expand Down Expand Up @@ -48,6 +49,13 @@ public class AlwaysStartOnPrimaryConnectionStrategy extends SelfDescribingMarsha
private int pausePeriodMs = Integer.getInteger("client.timeout", 500);
private int socketConnectionTimeoutMs = Integer.getInteger("connectionStrategy.socketConnectionTimeoutMs", 1);
private long pauseMillisBeforeReconnect = Integer.getInteger("connectionStrategy.pauseMillisBeforeReconnect", 500);
private ClientConnectionMonitor clientConnectionMonitor = new VanillaClientConnectionMonitor();


public AlwaysStartOnPrimaryConnectionStrategy clientConnectionMonitor(ClientConnectionMonitor fatalFailureMonitor) {
this.clientConnectionMonitor = fatalFailureMonitor;
return this;
}

@Nullable
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package net.openhft.chronicle.network;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.network.connection.ClientConnectionMonitor;
import net.openhft.chronicle.network.connection.FatalFailureMonitor;
import net.openhft.chronicle.network.connection.SocketAddressSupplier;
import net.openhft.chronicle.network.tcp.ChronicleSocket;
Expand All @@ -39,6 +40,10 @@
@FunctionalInterface
public interface ConnectionStrategy extends Marshallable {

default ClientConnectionMonitor clientConnectionMonitor() {
return new VanillaClientConnectionMonitor();
}

@Nullable
static ChronicleSocketChannel socketChannel(@NotNull InetSocketAddress socketAddress, int tcpBufferSize, int socketConnectionTimeoutMs) throws IOException {

Expand Down Expand Up @@ -98,14 +103,14 @@ static ChronicleSocketChannel socketChannel(@NotNull InetSocketAddress socketAdd
* @throws InterruptedException if the channel is interrupted.
*/
ChronicleSocketChannel connect(@NotNull String name,
@NotNull SocketAddressSupplier socketAddressSupplier,
boolean didLogIn,
@NotNull FatalFailureMonitor fatalFailureMonitor) throws InterruptedException;
@NotNull SocketAddressSupplier socketAddressSupplier,
boolean didLogIn,
@NotNull FatalFailureMonitor fatalFailureMonitor) throws InterruptedException;

@Nullable
default ChronicleSocketChannel openSocketChannel(@NotNull InetSocketAddress socketAddress,
int tcpBufferSize,
long timeoutMs) throws IOException, InterruptedException {
int tcpBufferSize,
long timeoutMs) throws IOException, InterruptedException {

return openSocketChannel(socketAddress,
tcpBufferSize,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package net.openhft.chronicle.network;

import net.openhft.chronicle.core.io.AbstractCloseable;
import net.openhft.chronicle.network.connection.ClientConnectionMonitor;
import org.jetbrains.annotations.NotNull;

import java.net.SocketAddress;

import static net.openhft.chronicle.core.Jvm.debug;

public class VanillaClientConnectionMonitor extends AbstractCloseable implements ClientConnectionMonitor {

@Override
public void onConnected(String name, @NotNull SocketAddress socketAddress) {
throwExceptionIfClosed();
debug().on(this.getClass(), "onConnected name=" + name + ",socketAddress=" + socketAddress);
}

@Override
public void onDisconnected(String name, @NotNull SocketAddress socketAddress) {
throwExceptionIfClosed();
debug().on(this.getClass(), "onDisconnected name=" + name + ",socketAddress=" + socketAddress);
}

@Override
protected void performClose() throws IllegalStateException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
*/
package net.openhft.chronicle.network.connection;

import net.openhft.chronicle.wire.Marshallable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.SocketAddress;

public interface ClientConnectionMonitor extends FatalFailureMonitor {
public interface ClientConnectionMonitor extends FatalFailureMonitor, Marshallable {

/**
* Call just after the client as successfully established a connection to the server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package net.openhft.chronicle.network.connection;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.network.AlwaysStartOnPrimaryConnectionStrategy;
import net.openhft.chronicle.network.ConnectionStrategy;
import net.openhft.chronicle.network.VanillaClientConnectionMonitor;
import net.openhft.chronicle.network.tcp.ChronicleSocketChannel;
import net.openhft.chronicle.wire.SelfDescribingMarshallable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -54,13 +57,24 @@
* --h. Connection attempt no 3 with DR1: failed
* --i. Connection attempt no 3 with DR2: failed implies: Attempt 3 finished. Fatal Failure is raised
*/
public class FatalFailureConnectionStrategy implements ConnectionStrategy {
public class FatalFailureConnectionStrategy extends SelfDescribingMarshallable implements ConnectionStrategy {

private static final long PAUSE = TimeUnit.MILLISECONDS.toNanos(300);
private final int attempts;
private final boolean blocking;
private int tcpBufferSize = Integer.getInteger("tcp.client.buffer.size", TCP_BUFFER);
private boolean hasSentFatalFailure;
private ClientConnectionMonitor clientConnectionMonitor = new VanillaClientConnectionMonitor();

@Override
public ClientConnectionMonitor clientConnectionMonitor() {
return clientConnectionMonitor;
}

public FatalFailureConnectionStrategy clientConnectionMonitor(ClientConnectionMonitor fatalFailureMonitor) {
this.clientConnectionMonitor = fatalFailureMonitor;
return this;
}

/**
* @param attempts the number of attempts before a onFatalFailure() reported
Expand Down

0 comments on commit 6351895

Please sign in to comment.