diff --git a/README.md b/README.md index b580e1f..73a6aec 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,17 @@ # tcp2ivshmem -tcp2ivshmem is a java program to tunnel tcp connections through qemu ivshmem shared memory without using a network interface to do so. +tcp2ivshmem is a java command line application to tunnel tcp connections through qemu ivshmem shared memory without using a network interface to do so. ## License tcp2ivshmem is released under the GNU General Public License Version 3.
A copy of the GNU General Public License Version 3 can be found in the COPYING file.
The file "mvnw" is part of the maven-wrapper project, released under the Apache License Version 2.
See https://github.com/takari/maven-wrapper for more information regarding maven-wrapper. ## Building -If you do not want to build tcp2ivshmem yourself then you may also use the standalone executable jar file inside the examples folder. +If you do not want to build tcp2ivshmem yourself then you may also use the standalone executable jar file inside the +examples folder or one of the releases provided on github. Requirements: * JDK 7 or newer (Oracle or OpenJDK both work fine.) -Install Ivshmem4j 1.0 to your local maven repository (see https://github.com/AlexanderSchuetz97/Ivshmem4j)
-Once Ivshmem4j is installed to your local maven repository run: ```` ./mvnw clean package ```` diff --git a/examples/guestToHostSSHGuest.bat b/examples/guestToHostSSHGuest.bat index cffe540..20dde5a 100755 --- a/examples/guestToHostSSHGuest.bat +++ b/examples/guestToHostSSHGuest.bat @@ -1,2 +1,2 @@ -java -jar tcp2ivshmem-0.1.jar -s -d 134217728 +java -jar tcp2ivshmem-0.2.jar -s -d 134217728 pause diff --git a/examples/guestToHostSSHHost.sh b/examples/guestToHostSSHHost.sh index fbfc9ee..c34b8bb 100755 --- a/examples/guestToHostSSHHost.sh +++ b/examples/guestToHostSSHHost.sh @@ -1,2 +1,2 @@ #!/bin/sh -java -jar tcp2ivshmem-0.1.jar -m -d /dev/shm/tcp2ivshmem -b 134217728 -R 8000:127.0.0.1:22 +java -jar tcp2ivshmem-0.2.jar -m -d /dev/shm/tcp2ivshmem -b 134217728 -R 8000:127.0.0.1:22 diff --git a/examples/tcp2ivshmem-0.1.jar b/examples/tcp2ivshmem-0.1.jar deleted file mode 100644 index a0f27d6..0000000 Binary files a/examples/tcp2ivshmem-0.1.jar and /dev/null differ diff --git a/examples/tcp2ivshmem-0.2.jar b/examples/tcp2ivshmem-0.2.jar new file mode 100644 index 0000000..2b0fc31 Binary files /dev/null and b/examples/tcp2ivshmem-0.2.jar differ diff --git a/pom.xml b/pom.xml index 25a26af..e3f71a0 100644 --- a/pom.xml +++ b/pom.xml @@ -1,29 +1,10 @@ - 4.0.0 - de.aschuetz + io.github.alexanderschuetz97 tcp2ivshmem 0.2-SNAPSHOT @@ -71,7 +52,7 @@ If not, see . - de.aschuetz.tcp2ivshmem.Main + io.github.alexanderschuetz97.tcp2ivshmem.Main tcp2ivshmem @@ -87,9 +68,15 @@ If not, see . - de.aschuetz + io.github.alexanderschuetz97 ivshmem4j - 1.1 + 1.2 + + + junit + junit + 4.13.2 + test diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/Main.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/Main.java similarity index 76% rename from src/main/java/de/aschuetz/tcp2ivshmem/Main.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/Main.java index 7628c92..30109c9 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/Main.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/Main.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,24 +17,22 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem; - -import de.aschuetz.ivshmem4j.api.SharedMemory; -import de.aschuetz.ivshmem4j.api.SharedMemoryException; -import de.aschuetz.ivshmem4j.common.NativeLibraryLoaderHelper; -import de.aschuetz.ivshmem4j.linux.doorbell.IvshmemLinuxClient; -import de.aschuetz.ivshmem4j.linux.plain.LinuxMappedFileSharedMemory; -import de.aschuetz.ivshmem4j.util.RingBuffer; -import de.aschuetz.ivshmem4j.windows.IvshmemWindowsDevice; -import de.aschuetz.tcp2ivshmem.config.Configuration; -import de.aschuetz.tcp2ivshmem.config.Forwarding; -import de.aschuetz.tcp2ivshmem.config.OS; -import de.aschuetz.tcp2ivshmem.ivshmem.Constants; -import de.aschuetz.tcp2ivshmem.ivshmem.IvhsmemMasterBridge; -import de.aschuetz.tcp2ivshmem.ivshmem.IvshmemBridge; -import de.aschuetz.tcp2ivshmem.ivshmem.IvshmemSlaveBridge; -import de.aschuetz.tcp2ivshmem.packets.PacketUtil; -import de.aschuetz.tcp2ivshmem.servers.Socks5Server; +package io.github.alexanderschuetz97.tcp2ivshmem; + + +import io.github.alexanderschuetz97.tcp2ivshmem.config.Configuration; +import io.github.alexanderschuetz97.tcp2ivshmem.config.Forwarding; +import io.github.alexanderschuetz97.tcp2ivshmem.config.OS; +import io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.Constants; +import io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.IvhsmemMasterBridge; +import io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.IvshmemBridge; +import io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.IvshmemSlaveBridge; +import io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.RingBuffer; +import io.github.alexanderschuetz97.ivshmem4j.api.Ivshmem; +import io.github.alexanderschuetz97.ivshmem4j.api.IvshmemMemory; +import io.github.alexanderschuetz97.ivshmem4j.api.WindowsIvshmemPCIDevice; +import io.github.alexanderschuetz97.nativeutils.api.NativeMemory; +import io.github.alexanderschuetz97.nativeutils.api.NativeUtils; import java.io.IOException; import java.net.ServerSocket; @@ -46,7 +44,9 @@ public class Main { public static Configuration config; - public static SharedMemory memory; + public static IvshmemMemory shmemory; + + public static NativeMemory memory; public static boolean useInterrupts = false; @@ -92,34 +92,24 @@ public static void printUsageAndExit() { System.exit(-1); } - public static void setupJNI() { - try { - NativeLibraryLoaderHelper.loadNativeLibraries(); - } catch (LinkageError err) { - System.out.println("Unable to load native libraries for Ivshmem4j."); - err.printStackTrace(); - System.exit(-1); - } - } - public static void listWindowsDevices() { if (OS.detect() != OS.WINDOWS) { System.out.println("-ls is only supported on windows."); System.exit(-1); } - Collection devices = null; + Collection devices = null; try { - devices = IvshmemWindowsDevice.getSharedMemoryDevices(); - } catch (SharedMemoryException e) { + devices = Ivshmem.windowsListPCI(); + } catch (Exception e) { System.out.println("Couldn't enumerate ivhshmem pci devices"); e.printStackTrace(); System.exit(-1); } System.out.println("Available devices: " + devices.size()); - for (IvshmemWindowsDevice device : devices) { - System.out.println("Size: " + device.getSharedMemorySize() + " bytes Device: " + device.getNameAsString()); + for (WindowsIvshmemPCIDevice device : devices) { + System.out.println("Size: " + device.getSize() + " bytes Device: " + device.getName()); } System.out.println(); @@ -127,40 +117,40 @@ public static void listWindowsDevices() { } - public static void createMemory() throws SharedMemoryException { + public static void createMemory() { if (config.getOperatingSystem() == OS.LINUX) { if (Boolean.TRUE.equals(config.getLinuxIsPlain())) { if (config.getSize() == null) { - memory = LinuxMappedFileSharedMemory.open(config.getDevice()); + shmemory = Ivshmem.plain(config.getDevice()); return; } - memory = LinuxMappedFileSharedMemory.createOrOpen(config.getDevice(), config.getSize()); + shmemory = Ivshmem.plain(config.getDevice(), config.getSize()); return; } - memory = IvshmemLinuxClient.connect(config.getDevice()); + shmemory = Ivshmem.doorbell(config.getDevice(), 5000, Main.ex); return; } - Collection devices = IvshmemWindowsDevice.getSharedMemoryDevices(); + Collection devices = Ivshmem.windowsListPCI(); - for (IvshmemWindowsDevice device : devices) { + for (WindowsIvshmemPCIDevice device : devices) { if (config.getDevice() == null) { - memory = device.open(); + shmemory = Ivshmem.windowsPCI(device, Main.ex); return; } - if (config.getDevice().equals(String.valueOf(device.getSharedMemorySize()))) { - System.out.println("Will open device " + device.getNameAsString()); - memory = device.open(); + if (config.getDevice().equals(String.valueOf(device.getSize()))) { + System.out.println("Will open device " + device.getName()); + shmemory = Ivshmem.windowsPCI(device, Main.ex); return; } - if(device.isNameValid() && device.getNameAsString().contains(config.getDevice())) { - System.out.println("Will open device " + device.getNameAsString()); - memory = device.open(); + if(device.getName() != null && device.getName().contains(config.getDevice())) { + System.out.println("Will open device " + device.getName()); + shmemory = Ivshmem.windowsPCI(device, Main.ex); return; } @@ -187,7 +177,11 @@ public static void main(String[] args) { return; } - setupJNI(); + if (!NativeUtils.isLinux() && !NativeUtils.isWindows()) { + System.out.println("Unsupported OS or CPU architecture."); + System.exit(-1); + return; + } if (args.length == 1 && "-ls".equalsIgnoreCase(args[0])) { listWindowsDevices(); @@ -214,7 +208,8 @@ public static void main(String[] args) { try { createMemory(); - } catch (SharedMemoryException e) { + memory = shmemory.getMemory(); + } catch (Exception e) { System.out.println("Error creating shared memory."); e.printStackTrace(); System.exit(-1); @@ -227,12 +222,12 @@ public static void main(String[] args) { System.out.println("Using shared memory: " + memory); - if (memory.getSharedMemorySize() < Constants.MIN_REQUIRED_MEMORY_SIZE) { + if (memory.size() < Constants.MIN_REQUIRED_MEMORY_SIZE) { System.out.println("Shared memory is too small."); System.exit(-1); } - long tempRes = memory.getSharedMemorySize() - Constants.MEMORY_OVERHEAD - (2* RingBuffer.OVERHEAD); + long tempRes = memory.size() - Constants.MEMORY_OVERHEAD - (2* RingBuffer.OVERHEAD); tempRes -= tempRes % 32; ringBufferSize = tempRes / 2; if (ringBufferSize <= 0) { @@ -241,15 +236,14 @@ public static void main(String[] args) { } clientToServerOffset = serverToClientOffset + RingBuffer.OVERHEAD + ringBufferSize; - memory.startNecessaryThreads(Main.ex); - if (!memory.supportsInterrupts() && Boolean.TRUE.equals(config.useInterrupts())) { + if (!shmemory.supportsInterrupts() && Boolean.TRUE.equals(config.useInterrupts())) { System.out.println("Interrupts not supported by shared memory."); System.exit(-1); return; } - if (memory.supportsInterrupts() && !Boolean.FALSE.equals(config.useInterrupts())) { + if (shmemory.supportsInterrupts() && !Boolean.FALSE.equals(config.useInterrupts())) { useInterrupts = true; } @@ -257,7 +251,7 @@ public static void main(String[] args) { if (useInterrupts) { - System.out.println("Own Peer ID " + memory.getOwnPeerID()); + System.out.println("Own Peer ID " + shmemory.getOwnPeerID()); System.out.println("Will use interrupts if other side supports them too."); } else { System.out.println("We do not support interrupts."); diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/config/Configuration.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/config/Configuration.java similarity index 97% rename from src/main/java/de/aschuetz/tcp2ivshmem/config/Configuration.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/config/Configuration.java index b18cd4a..a4891d6 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/config/Configuration.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/config/Configuration.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,11 +17,10 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.config; +package io.github.alexanderschuetz97.tcp2ivshmem.config; -import de.aschuetz.ivshmem4j.api.SharedMemoryException; -import de.aschuetz.ivshmem4j.windows.IvshmemWindowsDevice; -import de.aschuetz.tcp2ivshmem.ivshmem.Constants; +import io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.Constants; +import io.github.alexanderschuetz97.ivshmem4j.api.Ivshmem; import java.util.*; @@ -312,11 +311,11 @@ private void validateArgs() throws IllegalArgumentException { errors.add("Device is missing. Use -d."); } else { try { - if (IvshmemWindowsDevice.getSharedMemoryDevices().size() > 1) { + if (Ivshmem.windowsListPCI().size() > 1) { errors.add("Device is missing and there is more than 1 PCI Device attached to this vm."); } - } catch (SharedMemoryException e) { - errors.add("Couldn't enumerate ivhshmem pci devices err:" + e.getMessage()); + } catch (Exception e) { + errors.add("Couldn't enumerate ivhshmem pci devices err:" + e.getClass().getName() + " " +e.getMessage()); } } } diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/config/Forwarding.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/config/Forwarding.java similarity index 92% rename from src/main/java/de/aschuetz/tcp2ivshmem/config/Forwarding.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/config/Forwarding.java index 1525cd6..da89efe 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/config/Forwarding.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/config/Forwarding.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,7 +17,7 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.config; +package io.github.alexanderschuetz97.tcp2ivshmem.config; public class Forwarding { diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/config/OS.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/config/OS.java similarity index 92% rename from src/main/java/de/aschuetz/tcp2ivshmem/config/OS.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/config/OS.java index d1823c7..63bdcc8 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/config/OS.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/config/OS.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,7 +17,7 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.config; +package io.github.alexanderschuetz97.tcp2ivshmem.config; public enum OS { WINDOWS, LINUX, OTHER; diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/ClearWatchdogShutdownHook.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/ClearWatchdogShutdownHook.java similarity index 79% rename from src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/ClearWatchdogShutdownHook.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/ClearWatchdogShutdownHook.java index 3ee7400..6f32f44 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/ClearWatchdogShutdownHook.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/ClearWatchdogShutdownHook.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,12 +17,12 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.ivshmem; +package io.github.alexanderschuetz97.tcp2ivshmem.ivshmem; -import de.aschuetz.tcp2ivshmem.Main; +import io.github.alexanderschuetz97.tcp2ivshmem.Main; -import static de.aschuetz.tcp2ivshmem.ivshmem.Constants.ADDRESS_STATE; -import static de.aschuetz.tcp2ivshmem.ivshmem.Constants.ADDRESS_WATCHDOG; +import static io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.Constants.ADDRESS_STATE; +import static io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.Constants.ADDRESS_WATCHDOG; public class ClearWatchdogShutdownHook extends Thread { diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/ConnectRingBufferForInput.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/ConnectRingBufferForInput.java similarity index 72% rename from src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/ConnectRingBufferForInput.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/ConnectRingBufferForInput.java index b8e7761..aed14db 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/ConnectRingBufferForInput.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/ConnectRingBufferForInput.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,17 +17,14 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.ivshmem; +package io.github.alexanderschuetz97.tcp2ivshmem.ivshmem; -import de.aschuetz.ivshmem4j.util.RingBuffer; -import de.aschuetz.tcp2ivshmem.Main; +import io.github.alexanderschuetz97.tcp2ivshmem.Main; import java.io.InputStream; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import static de.aschuetz.tcp2ivshmem.ivshmem.Constants.*; - public class ConnectRingBufferForInput implements Callable { private long address; @@ -38,7 +35,7 @@ public ConnectRingBufferForInput(long address) { @Override public InputStream call() throws Exception { System.out.println("Connecting shared memory ring buffer for input at address " + address +"."); - RingBuffer tempBuf = new RingBuffer(Main.memory, address , Main.config.getSpinWithoutInterrupts(), Main.config.getSpinWithInterrupts()); - return tempBuf.connectInputStream(TIMEOUT_CONNECT, SPIN_CONNECT, TimeUnit.MILLISECONDS); + RingBuffer tempBuf = new RingBuffer(Main.shmemory, address , Main.config.getSpinWithoutInterrupts(), Main.config.getSpinWithInterrupts()); + return tempBuf.connectInputStream(Constants.TIMEOUT_CONNECT, Constants.SPIN_CONNECT, TimeUnit.MILLISECONDS); } } diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/ConnectRingBufferForOutput.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/ConnectRingBufferForOutput.java similarity index 55% rename from src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/ConnectRingBufferForOutput.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/ConnectRingBufferForOutput.java index ba496d6..d3d93b0 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/ConnectRingBufferForOutput.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/ConnectRingBufferForOutput.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,16 +17,15 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.ivshmem; +package io.github.alexanderschuetz97.tcp2ivshmem.ivshmem; -import de.aschuetz.ivshmem4j.util.RingBuffer; -import de.aschuetz.tcp2ivshmem.Main; +import io.github.alexanderschuetz97.tcp2ivshmem.Main; import java.io.OutputStream; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import static de.aschuetz.tcp2ivshmem.ivshmem.Constants.*; +import static io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.Constants.*; public class ConnectRingBufferForOutput implements Callable { private final long address; @@ -41,21 +40,25 @@ public ConnectRingBufferForOutput(long address, boolean interrupts) { @Override public OutputStream call() throws Exception { System.out.println("Connecting shared memory ring buffer for output at address " + address +"."); - RingBuffer tempBuf = new RingBuffer(Main.memory, address , Main.config.getSpinWithoutInterrupts(), Main.config.getSpinWithInterrupts()); + RingBuffer tempBuf = new RingBuffer(Main.shmemory, address , Main.config.getSpinWithoutInterrupts(), Main.config.getSpinWithInterrupts()); tempBuf.cleanMemoryArea(); - try { - if (interrupts) { - return tempBuf.connectOutputStream(0, Main.ringBufferSize, TIMEOUT_CONNECT, SPIN_CONNECT, TimeUnit.MILLISECONDS); - } - return tempBuf.connectOutputStream(Main.ringBufferSize, TIMEOUT_CONNECT, SPIN_CONNECT, TimeUnit.MILLISECONDS); - } finally { - if (interrupts && !tempBuf.usesInterrupts()) { - System.out.println("Using interrupts for communication failed! Falling back to spinning."); - } - - if (tempBuf.usesInterrupts()) { - System.out.println("Interrupts will be sent to peer " + tempBuf.getOtherPeer()); - } + boolean succ = false; + + OutputStream outputStream; + if (interrupts) { + outputStream = tempBuf.connectOutputStream(0, Main.ringBufferSize, TIMEOUT_CONNECT, SPIN_CONNECT, TimeUnit.MILLISECONDS); + } else { + outputStream = tempBuf.connectOutputStream(Main.ringBufferSize, TIMEOUT_CONNECT, SPIN_CONNECT, TimeUnit.MILLISECONDS); + } + + if (interrupts && !tempBuf.usesInterrupts()) { + System.out.println("Using interrupts for communication failed! Falling back to spinning."); } + + if (tempBuf.usesInterrupts()) { + System.out.println("Interrupts will be sent to peer " + tempBuf.getOtherPeer()); + } + + return outputStream; } } diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/Constants.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/Constants.java similarity index 94% rename from src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/Constants.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/Constants.java index 93b2727..f5527e5 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/Constants.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/Constants.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,9 +17,8 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.ivshmem; +package io.github.alexanderschuetz97.tcp2ivshmem.ivshmem; -import de.aschuetz.ivshmem4j.util.RingBuffer; public class Constants { //Timeouts diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/IvhsmemMasterBridge.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/IvhsmemMasterBridge.java similarity index 65% rename from src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/IvhsmemMasterBridge.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/IvhsmemMasterBridge.java index 0b15f24..72c5e3c 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/IvhsmemMasterBridge.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/IvhsmemMasterBridge.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,9 +17,9 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.ivshmem; +package io.github.alexanderschuetz97.tcp2ivshmem.ivshmem; -import de.aschuetz.tcp2ivshmem.Main; +import io.github.alexanderschuetz97.tcp2ivshmem.Main; import java.io.InputStream; import java.io.OutputStream; @@ -27,9 +27,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import static de.aschuetz.tcp2ivshmem.ivshmem.Constants.*; -import static de.aschuetz.tcp2ivshmem.ivshmem.Constants.TIMEOUT_CONNECT; - public class IvhsmemMasterBridge extends IvshmemBridge { protected void connectToIvshmem() throws Exception { @@ -37,51 +34,51 @@ protected void connectToIvshmem() throws Exception { int masterNr = Math.abs(new Random().nextInt()); System.out.println("Master NR "+ masterNr +" is here."); - Main.memory.write(ADDRESS_STATE, STATE_CONNECTING); + Main.memory.write(Constants.ADDRESS_STATE, Constants.STATE_CONNECTING); System.out.println("Waiting for ivshmem connection to slave..."); while(true) { - byte state = Main.memory.read(ADDRESS_STATE); - if (state == 0 || state == STATE_DISCONNECTED) { + byte state = Main.memory.read(Constants.ADDRESS_STATE); + if (state == 0 || state == Constants.STATE_DISCONNECTED) { System.out.println("External interference detected, is there a another master running? (This must not always be the case, will keep trying.)"); - Main.memory.write(ADDRESS_STATE, STATE_CONNECTING); + Main.memory.write(Constants.ADDRESS_STATE, Constants.STATE_CONNECTING); continue; } - if (state == STATE_HANDSHAKE) { + if (state == Constants.STATE_HANDSHAKE) { break; } - if (state == STATE_CONNECTED || state == STATE_HANDSHAKE_RESPONSE) { + if (state == Constants.STATE_CONNECTED || state == Constants.STATE_HANDSHAKE_RESPONSE) { System.out.println("There is a another master running, exiting."); System.exit(-1); return; } - Thread.sleep(SPIN_CONNECT); + Thread.sleep(Constants.SPIN_CONNECT); } Main.maxConcurrentTcpConnections = Main.config.getMaxTcpConnections(); init(0, Main.maxConcurrentTcpConnections); - Main.memory.write(ADDRESS_MASTER_INTERRUPTS, Main.useInterrupts ? USE_INTERRUPTS : DONT_USE_INTERRUPTS); - Main.memory.write(ADDRESS_MAX_TCP_CONNECTIONS, Main.maxConcurrentTcpConnections); - Main.memory.write(ADDRESS_WATCHDOG, masterNr); + Main.memory.write(Constants.ADDRESS_MASTER_INTERRUPTS, Main.useInterrupts ? Constants.USE_INTERRUPTS : Constants.DONT_USE_INTERRUPTS); + Main.memory.write(Constants.ADDRESS_MAX_TCP_CONNECTIONS, Main.maxConcurrentTcpConnections); + Main.memory.write(Constants.ADDRESS_WATCHDOG, masterNr); IvshmemConnectionWatchdog.getInstance().start(masterNr); - if (!Main.memory.compareAndSet(ADDRESS_STATE, STATE_HANDSHAKE, STATE_HANDSHAKE_RESPONSE)) { + if (!Main.memory.compareAndSet(Constants.ADDRESS_STATE, Constants.STATE_HANDSHAKE, Constants.STATE_HANDSHAKE_RESPONSE)) { System.out.println("Could not respond to handshake. Are multiple masters running?"); System.exit(-1); } System.out.println("Handshake response send to slave. Waiting for ACK from slave..."); - if (!Main.memory.spin(ADDRESS_STATE, STATE_CONNECTED, SPIN_CONNECT, TIMEOUT_CONNECT, TimeUnit.MILLISECONDS)) { + if (!Main.memory.spin(Constants.ADDRESS_STATE, Constants.STATE_CONNECTED, Constants.SPIN_CONNECT, Constants.TIMEOUT_CONNECT, TimeUnit.MILLISECONDS)) { System.out.println("Timeout."); System.exit(-1); } - Main.useInterrupts &= Main.memory.read(ADDRESS_SLAVE_INTERRUPTS) == USE_INTERRUPTS; + Main.useInterrupts &= Main.memory.read(Constants.ADDRESS_SLAVE_INTERRUPTS) == Constants.USE_INTERRUPTS; System.out.println("...Ack received, connecting ring buffers..."); if (Main.useInterrupts) { @@ -93,8 +90,8 @@ protected void connectToIvshmem() throws Exception { Future inputStreamFuture = Main.ex.submit(new ConnectRingBufferForInput(Main.clientToServerOffset)); Future outputStreamFuture = Main.ex.submit(new ConnectRingBufferForOutput(Main.serverToClientOffset, Main.useInterrupts)); - fromIvshmem = inputStreamFuture.get(TIMEOUT_CONNECT, TimeUnit.MILLISECONDS); - toIvshmem = outputStreamFuture.get(TIMEOUT_CONNECT, TimeUnit.MILLISECONDS); + fromIvshmem = inputStreamFuture.get(Constants.TIMEOUT_CONNECT, TimeUnit.MILLISECONDS); + toIvshmem = outputStreamFuture.get(Constants.TIMEOUT_CONNECT, TimeUnit.MILLISECONDS); System.out.println("...Ring buffers connected. Master is ready for operation."); System.out.println("Will accept " + Main.maxConcurrentTcpConnections + " maximum concurrent tcp connections"); diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/IvshmemBridge.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/IvshmemBridge.java similarity index 92% rename from src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/IvshmemBridge.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/IvshmemBridge.java index 636335c..d6ef0bf 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/IvshmemBridge.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/IvshmemBridge.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,14 +17,21 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.ivshmem; - -import de.aschuetz.tcp2ivshmem.Main; -import de.aschuetz.tcp2ivshmem.packets.*; -import de.aschuetz.tcp2ivshmem.servers.Socks5Server; -import de.aschuetz.tcp2ivshmem.servers.TcpServer; -import de.aschuetz.tcp2ivshmem.sockets.TcpSocket; -import de.aschuetz.tcp2ivshmem.sockets.TcpSocketContainer; +package io.github.alexanderschuetz97.tcp2ivshmem.ivshmem; + +import io.github.alexanderschuetz97.tcp2ivshmem.Main; +import io.github.alexanderschuetz97.tcp2ivshmem.packets.AbstractPacket; +import io.github.alexanderschuetz97.tcp2ivshmem.packets.Packet1Connect; +import io.github.alexanderschuetz97.tcp2ivshmem.packets.Packet3Rst; +import io.github.alexanderschuetz97.tcp2ivshmem.packets.Packet4Data; +import io.github.alexanderschuetz97.tcp2ivshmem.packets.Packet6OpenServerResult; +import io.github.alexanderschuetz97.tcp2ivshmem.servers.Socks5Server; +import io.github.alexanderschuetz97.tcp2ivshmem.servers.TcpServer; +import io.github.alexanderschuetz97.tcp2ivshmem.sockets.TcpSocket; +import io.github.alexanderschuetz97.tcp2ivshmem.sockets.TcpSocketContainer; +import io.github.alexanderschuetz97.tcp2ivshmem.packets.Packet2Fin; +import io.github.alexanderschuetz97.tcp2ivshmem.packets.Packet5OpenServer; +import io.github.alexanderschuetz97.tcp2ivshmem.packets.PacketUtil; import java.io.*; import java.net.InetAddress; @@ -32,8 +39,6 @@ import java.net.Socket; import java.util.concurrent.*; -import static de.aschuetz.tcp2ivshmem.ivshmem.Constants.*; - public abstract class IvshmemBridge { @@ -78,7 +83,7 @@ public void sendUrgentPacket(AbstractPacket packet) throws IOException { public void sendPacket(AbstractPacket packet) throws IOException { synchronized (mutex) { - while (toIvshmemQueue.size() > PACKET_QUEUE_SIZE) { + while (toIvshmemQueue.size() > Constants.PACKET_QUEUE_SIZE) { try { mutex.wait(); } catch (InterruptedException e) { diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/IvshmemConnectionWatchdog.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/IvshmemConnectionWatchdog.java similarity index 79% rename from src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/IvshmemConnectionWatchdog.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/IvshmemConnectionWatchdog.java index d87989b..429ae2d 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/IvshmemConnectionWatchdog.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/IvshmemConnectionWatchdog.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,11 +17,9 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.ivshmem; +package io.github.alexanderschuetz97.tcp2ivshmem.ivshmem; -import de.aschuetz.ivshmem4j.api.SharedMemoryException; -import de.aschuetz.ivshmem4j.common.ErrorCodeEnum; -import de.aschuetz.tcp2ivshmem.Main; +import io.github.alexanderschuetz97.tcp2ivshmem.Main; public class IvshmemConnectionWatchdog { @@ -64,7 +62,7 @@ private void run() { Thread.currentThread().setName("IvshmemConnectionWatchdog Thread"); try { - while(Main.memory.readInt(Constants.ADDRESS_WATCHDOG) == masterNr) { + while (Main.memory.readInt(Constants.ADDRESS_WATCHDOG) == masterNr) { Thread.sleep(Constants.SPIN_WATCHDOG); } @@ -74,13 +72,10 @@ private void run() { System.out.println("Master is gone."); } changed = true; - } catch (SharedMemoryException e) { - if (e.getCode() == ErrorCodeEnum.INVALID_CONNECTION_POINTER) { - return; - } + } catch (NullPointerException e) { + return; + } catch (Exception e) { System.out.println("Watchdog error: " + e.getMessage()); - } catch (InterruptedException e) { - e.printStackTrace(); } finally { System.exit(0); } diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/IvshmemSlaveBridge.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/IvshmemSlaveBridge.java similarity index 94% rename from src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/IvshmemSlaveBridge.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/IvshmemSlaveBridge.java index a96fe86..d1e4c7e 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/ivshmem/IvshmemSlaveBridge.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/IvshmemSlaveBridge.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,15 +17,15 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.ivshmem; +package io.github.alexanderschuetz97.tcp2ivshmem.ivshmem; -import de.aschuetz.tcp2ivshmem.Main; +import io.github.alexanderschuetz97.tcp2ivshmem.Main; import java.io.*; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import static de.aschuetz.tcp2ivshmem.ivshmem.Constants.*; +import static io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.Constants.*; public class IvshmemSlaveBridge extends IvshmemBridge { diff --git a/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/RingBuffer.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/RingBuffer.java new file mode 100644 index 0000000..491755b --- /dev/null +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/ivshmem/RingBuffer.java @@ -0,0 +1,1142 @@ +/* + * Copyright Alexander Schütz, 2020-2022 + * + * This file is part of tcp2ivshmem. + * + * tcp2ivshmem is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * tcp2ivshmem is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * A copy of the GNU General Public License should be provided + * in the COPYING file in top level directory of tcp2ivshmem. + * If not, see . + */ + +package io.github.alexanderschuetz97.tcp2ivshmem.ivshmem; + +import io.github.alexanderschuetz97.ivshmem4j.api.InterruptServiceRoutine; +import io.github.alexanderschuetz97.ivshmem4j.api.IvshmemMemory; +import io.github.alexanderschuetz97.nativeutils.api.NativeMemory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Shared Memory Ring Buffer. + * Requires Interrupts. + * This Object is single use only. If it was used and closed then a new RingBuffer Object is required in order to + * reuse the address for a new RingBuffer. + */ +public class RingBuffer implements Closeable { + + + public static final int STATE_OFFSET = 0; + + public static final int INTERRUPT_FLAG_WRITE_OFFSET = STATE_OFFSET + 1; + + public static final int INTERRUPT_FLAG_READ_OFFSET = STATE_OFFSET + 2; + + /** + * Offset where the vector int will be located in the SharedMemory. + */ + public static final int VECTOR_OFFSET = 4; + + /** + * Offset where the buffer size will be located in the Shared Memory. + */ + public static final int SIZE_OFFSET = VECTOR_OFFSET + 4; + + /** + * Offset where the write index will be located in the Shared Memory. + * The write index will contain the offset relative to the BUFFER_START_OFFSET where the next byte will be written to. + */ + public static final int WRITE_INDEX_OFFSET = SIZE_OFFSET + 8; + + /** + * Offset where the read index will be located in the Shared Memory. + * The read index will contain the offset relative to the BUFFER_START_OFFSET where the next byte will be read from. + */ + public static final int READ_INDEX_OFFSET = WRITE_INDEX_OFFSET + 8; + + public static final int WRITE_PEER_OFFSET = READ_INDEX_OFFSET + 8; + + public static final int READ_PEER_OFFSET = WRITE_PEER_OFFSET + 4; + + /** + * Amount of bytes at the start of every RingBuffer that will be used for control bytes. + */ + public static final int OVERHEAD = READ_PEER_OFFSET + 4; + + /** + * Offset of the actual data of the Ring Buffer. + */ + public static final int BUFFER_START_OFFSET = OVERHEAD; + + protected static final byte STATE_UNCONNECTED = 0; + + protected static final byte STATE_CONNECTING = 1; + + protected static final byte STATE_CONNECTED = 2; + + protected static final byte STATE_CLOSED = 3; + + /** + * The shared memory to use to write/read the bytes to/from. + */ + protected final IvshmemMemory shmemory; + + protected final NativeMemory memory; + /** + * the base address where the ring buffer is located in inside the shared memory. + */ + protected final long address; + + + /** + * The vector that is sued for this RingBuffer Object. + */ + protected int vector = -1; + + + /** + * holds a offset relative to address that the current RingBuffer will access to either read from or write to. + */ + protected volatile long localIndex = 0; + + /** + * flag to indicate if the RingBuffer is closed. + */ + protected volatile boolean closedFlag = false; + + + /** + * Lock that prevents concurrent access to the input stream. + * Note that this lock and the interrupt lock may be held together by the same thread. + */ + protected final ReentrantLock accessLock = new ReentrantLock(); + + /** + * Lock that provides a condition for interrupt waiting. + */ + protected final ReentrantLock interruptLock = new ReentrantLock(); + + /** + * Condition that is signaled when the state is changed either by closing or by signaling an interrupt. + */ + protected final Condition interruptCondition = interruptLock.newCondition(); + + /** + * ISR instance that will signal the interruptCondition. + */ + protected final InterruptServiceRoutine interruptServiceRoutine = new InterruptServiceRoutine() { + + @Override + public void onInterrupt(int aInterrupt) { + interruptLock.lock(); + interruptCondition.signalAll(); + interruptLock.unlock(); + } + }; + + /** + * Peer ID of the other Peer we are talking to (for the purpose of sending interrupts. + */ + protected int otherPeer; + + /** + * size of this ring buffer. determined by the writer. This size is without control bytes. + */ + protected long size = -1; + + /** + * flag if this RingBuffer object is reading or writing to the RingBuffer at address. + */ + protected boolean isReadFlag = false; + + /** + * Spin time if either this side or the other side does not support interrupt based communication. + */ + protected long spinTimeWithoutInterrupts; + + /** + * Spin time that is used when interrupts are used. + * This is not needed if interrupts are received reliably. + * If however an interrupt is lost then this is the maximum time we will wait for one before manually checking if the + * state has changed. If the state has not changed after spinTimeWithoutInterrupts has elapsed then we will simply wait for interrupts again in a loop. + */ + protected long spinTimeWithInterrupts; + + protected boolean wasOpened = false; + + protected boolean useInterrupts = false; + + protected long timeout = -1; + + + /** + * Constructor for creating a spin based RingBuffer. + */ + public RingBuffer(IvshmemMemory aSharedMemory, long aAddress, long spinTime) { + this(aSharedMemory, aAddress, spinTime, spinTime); + + } + + /** + * Constructor for interrupt based RingBuffer. Be aware that both ends need to support interrupts! + * The parameter spinTimeWithoutInterrupts only becomes relevant if the other end reports not supporting interrupts during connecting. + */ + public RingBuffer(IvshmemMemory aSharedMemory, long aAddress, long spinTimeWithoutInterrupts, long spinTimeWithInterrupts) { + address = aAddress; + shmemory = aSharedMemory; + memory = shmemory.getMemory(); + this.spinTimeWithoutInterrupts = spinTimeWithoutInterrupts; + this.spinTimeWithInterrupts = spinTimeWithInterrupts; + + if (!memory.isValid(aAddress)) { + throw new IllegalArgumentException("Shared memory is too small to hold the buffer with the given size at the given offset!"); + } + } + + /** + * closes this ring buffer. + * Does not throw an exception. + */ + public void close() { + accessLock.lock(); + try { + if (closedFlag) { + return; + } + closedFlag = true; + + if (!shmemory.isClosed()) { + if (useInterrupts) { + shmemory.removeInterruptServiceRoutine(vector, interruptServiceRoutine); + } + try { + memory.write(address, STATE_CLOSED); + triggerInterrupt(); + } catch (Exception e) { + //DC. + } + } + interruptLock.lock(); + interruptCondition.signalAll(); + interruptLock.unlock(); + } finally { + accessLock.unlock(); + } + } + + /** + * Sets the timeout for either read or write operations. Set to -1 to disable timeout. + * After timout a reading call or writing call will throw an IOException if no bytes can be read. + * Be aware that after a timout exception occures the stream can be used again normally if you simply wish to + * try again. + */ + public void setTimeout(long timeout) { + accessLock.lock(); + try { + this.timeout = timeout; + } finally { + accessLock.unlock(); + } + } + + /** + * Sets the spin time. + */ + public void setSpinTime(long spinTime) { + setSpinTime(spinTime, spinTime); + } + + + /** + * Sets the spin time + */ + public void setSpinTime(long spinTimeWithoutInterrupts, long spinTimeWithInterrupts) { + this.spinTimeWithoutInterrupts = spinTimeWithoutInterrupts; + this.spinTimeWithInterrupts = spinTimeWithInterrupts; + } + + /** + * Retuns true if interrupts are used for signaling changes in the ring buffer. + *

+ * throws IllegalStateException on an unconnected ring buffer as interrupts can only be used if + * both sides support them. + */ + public boolean usesInterrupts() { + if (!wasOpened()) { + throw new IllegalStateException("Not open!"); + } + + return useInterrupts; + } + + /** + * Will write the memory area to Zero. + * Should only be called by the Application that is going to createOrOpen the + * Output Stream and only when its certain that there is no longer an Active RingBuffer at the address. + */ + public void cleanMemoryArea() { + memory.set(address, (byte) 0, OVERHEAD); + } + + /** + * Will read the vector from where it is supposed to be stored without performing any check if the ring buffer is connected. + * This may be usefull to send a interrupt to the other side if it is known to be "stuck". + * Be aware that this method may return garbage data. + */ + public int readVector() { + return memory.readInt(address + VECTOR_OFFSET); + } + + public int getOtherPeer() { + if (!usesInterrupts()) { + throw new IllegalStateException("Doesnt use interrupts."); + } + + return otherPeer; + } + + /** + * Call this method to attempt to close another RingBuffer at this address if you suspect that it may be lingering around after an application crash. + * This method should be called before any connection attempts were made. + */ + public void tryCloseExternally(int aPeer, int aVector) { + memory.write(address, STATE_CLOSED); + shmemory.sendInterrupt(aPeer, aVector); + } + + + /** + * returns the amount of actual data bytes that can be in the buffer at any given time. + * This will differ from the size specified by the constructor of the OutputStream by OVERHEAD as the amount of bytes + * needed for control data will be subtracted from that amount. + */ + public long getBufferSize() throws IOException { + if (size == -1) { + throw new IllegalStateException("Unconnected"); + } + return size; + } + + + /** + * gets the current write index. The write will point to a offset relative from address+BUFFER_START_OFFSET where the next byte will be written to. + */ + protected long getWriteIndex() { + if (isReadFlag) { + return memory.readLong(address + WRITE_INDEX_OFFSET); + } + return localIndex; + + } + + /** + * Changes the write index in the shared memory. + * Will Close the RingBuffer if the write index that is stored inside the local index field + * does not match the one in the shared memory. + * If this call succeeds then the local index field is automatically updated to reflect the new write index. + */ + protected void setWriteIndex(long aNewIndex) { + if (isReadFlag) { + throw new RuntimeException("Cannot call this method while reading!"); + } + + if (!memory.compareAndSet(address + WRITE_INDEX_OFFSET, localIndex, aNewIndex)) { + close(); + throw new RuntimeException("Write index was modified externally!"); + } + + localIndex = aNewIndex; + } + + /** + * gets the current read index. The read index will point to a offset relative from address+BUFFER_START_OFFSET where the next byte will be read from. + */ + protected long getReadIndex() { + if (isReadFlag) { + return localIndex; + } + + return memory.readLong(address + READ_INDEX_OFFSET); + } + + protected long getSpinTimeToUse() { + return useInterrupts ? spinTimeWithInterrupts : spinTimeWithoutInterrupts; + } + + /** + * Changes the read index in the shared memory. + * Will Close the RingBuffer if the read index that is stored inside the local index field + * does not match the one in the shared memory. + * If this call succeeds then the local index field is automatically updated to reflect the new read index. + */ + protected void setReadIndex(long aNewIndex) { + if (!isReadFlag) { + throw new RuntimeException("Cannot call this method while writing!"); + } + + if (!memory.compareAndSet(address + READ_INDEX_OFFSET, localIndex, aNewIndex)) { + close(); + throw new RuntimeException("Read index was modified externally!"); + } + + localIndex = aNewIndex; + } + + + /* + * Will trigger an interrupt on the other peer. This is used to tell the other peer to recheck its state as + * new bytes have either been read or can be read or the RingBuffer was closed. + */ + protected void triggerInterrupt() { + if (useInterrupts) { + shmemory.sendInterrupt(otherPeer, vector); + } + } + + /** + * Returns the current state byte. + */ + protected byte getState() { + return memory.read(STATE_OFFSET + address); + } + + protected boolean setState(byte expect, byte newstate) { + try { + return memory.compareAndSet(STATE_OFFSET + address, expect, newstate); + } catch (Exception e) { + return false; + } + } + + /** + * Will check if we are still in STATE_CONNECTED. + */ + protected void checkStateConnected() { + if (!wasOpened()) { + throw new RuntimeException("Not open!"); + } + if (isClosed()) { + throw new RuntimeException("Closed!"); + } + if (getState() != STATE_CONNECTED) { + close(); + throw new RuntimeException("Closed!"); + } + } + + /** + * Returns true if we are able to createOrOpen a OutputStream. + */ + public boolean canConnectOutputStream() { + if (isClosed()) { + return false; + } + + if (wasOpened()) { + return false; + } + + if (getState() != STATE_UNCONNECTED) { + return false; + } + + return true; + } + + /** + * Returns true if we are able to createOrOpen a input stream without blocking. + */ + public boolean canConnectInputStream() { + if (closedFlag) { + return false; + } + if (wasOpened()) { + return false; + } + + byte tempState = getState(); + if (tempState == STATE_UNCONNECTED || tempState == STATE_CLOSED) { + return false; + } + + return true; + } + + /** + * returns true if RingBuffer is closed. + */ + public boolean isClosed() { + if (!closedFlag && shmemory.isClosed()) { + close(); + } + return closedFlag; + } + + /** + * returns true if this RingBuffer was connected at some point. + */ + public boolean wasOpened() { + return wasOpened; + } + + /** + * returns true if this RingBuffer is currently open. + */ + public boolean isOpen() { + try { + return !isClosed() && wasOpened() && getState() == STATE_CONNECTED; + } catch (Exception e) { + close(); + return false; + } + } + + /** + * Will attempt to createOrOpen and connect the output stream using spinning. + * Keep in mind that this output stream can be interrupted during writing by calling Thread.interrupt(). + * If this is done then partial data may be truncated. The amount of bytes truncated can be retrieved from the + * RingBufferInterruptedException that is thrown when an interrupt occurs. + * + * @param aBufferSize the size of the buffer in bytes. (Note: this must include OVERHEAD) + * @param aTimeout the connect timeout in which a input stream must be created to connect to this output stream. If it expires a SharedMemoryException will be thrown. + * @param aSpinTime during the connection process this thread will spin. + * If this value is write to a low value then the output stream may connect faster but it will also tax the CPU more during the connection process. + * This value should be at least 10 times smaller than your timeout. It is only used during the connecting and has no effect after the connection was established. + * @return the Output Stream. + * @throws InterruptedException If the thread was interrupted while connecting. + */ + public OutputStream connectOutputStream(long aBufferSize, long aTimeout, long aSpinTime, TimeUnit aUnit) throws InterruptedException { + accessLock.lock(); + try { + if (isClosed()) { + throw new RuntimeException("Already closed!"); + } + + if (wasOpened()) { + throw new RuntimeException("Was already opened!s"); + } + + vector = -1; + useInterrupts = false; + return connectOutputStreamInternal(aBufferSize, aTimeout, aSpinTime, aUnit); + } finally { + accessLock.unlock(); + } + } + + /** + * Will attempt to createOrOpen and connect the output stream using interrupts (if the other side supports this). + * Keep in mind that this output stream can be interrupted during writing by calling Thread.interrupt(). + * If this is done then partial data may be truncated. The amount of bytes truncated can be retrieved from the + * RingBufferInterruptedException that is thrown when an interrupt occurs. + * + * @param aVector the vector to use for communication. This will be transmitted to the InputStream via SharedMemory. + * @param aBufferSize the size of the buffer in bytes. (Note: this must include OVERHEAD) + * @param aTimeout the connect timeout in which a input stream must be created to connect to this output stream. If it expires a SharedMemoryException will be thrown. + * @param aSpinTime during the connection process this thread will spin. + * If this value is write to a low value then the output stream may connect faster but it will also tax the CPU more during the connection process. + * This value should be at least 10 times smaller than your timeout. It is only used during the connecting and has no effect after the connection was established. + * @return the Output Stream. + * @throws InterruptedException If the thread was interrupted while connecting. + */ + public OutputStream connectOutputStream(int aVector, long aBufferSize, long aTimeout, long aSpinTime, TimeUnit aUnit) throws InterruptedException { + accessLock.lock(); + try { + if (isClosed()) { + throw new RuntimeException("Already closed!"); + } + + if (wasOpened()) { + throw new RuntimeException("Was already opened!s"); + } + + if (!shmemory.supportsInterrupts()) { + throw new RuntimeException("memory doesnt support interrupts!"); + } + + if (!shmemory.hasOwnPeerID()) { + throw new RuntimeException("memory doesnt support peers!"); + } + + if (!shmemory.isVectorValid(aVector)) { + throw new RuntimeException("invalid vector"); + } + + vector = aVector; + useInterrupts = true; + + return connectOutputStreamInternal(aBufferSize, aTimeout, aSpinTime, aUnit); + } finally { + accessLock.unlock(); + } + } + + /** + * Internal method to connect the output stream. + */ + protected OutputStream connectOutputStreamInternal(long aBufferSize, long aTimeout, long aSpinTime, TimeUnit aUnit) throws InterruptedException { + long tempTimeout = TimeUnit.MILLISECONDS.convert(aTimeout, aUnit); + long tempSpinTime = TimeUnit.MILLISECONDS.convert(aSpinTime, aUnit); + + if (getState() != STATE_UNCONNECTED) { + throw new RuntimeException("The memory area already contains data! Set the first " + OVERHEAD + " bytes to 0 if you are sure that it is safe to do so!"); + } + + if (aBufferSize < OVERHEAD + 1) { + throw new RuntimeException("Buffer must be at least " + (OVERHEAD + 2) + " bytes big to store overhead and at least 2 bytes for transfer!"); + } + + if (!memory.isValid(address, aBufferSize)) { + throw new RuntimeException("Shared memory is too small to hold the buffer with the given size at the given offset!"); + } + + if (useInterrupts) { + memory.write(address + WRITE_PEER_OFFSET, shmemory.getOwnPeerID()); + memory.write(address + INTERRUPT_FLAG_WRITE_OFFSET, (byte) 1); + memory.write(address + VECTOR_OFFSET, vector); + + } else { + memory.write(address + INTERRUPT_FLAG_WRITE_OFFSET, (byte) 0); + } + + memory.write(address + SIZE_OFFSET, aBufferSize); + memory.write(address + WRITE_INDEX_OFFSET, localIndex); + + + isReadFlag = false; + wasOpened = true; + size = aBufferSize - OVERHEAD; + if (useInterrupts) { + shmemory.registerInterruptServiceRoutine(vector, interruptServiceRoutine); + } + if (!setState(STATE_UNCONNECTED, STATE_CONNECTING)) { + close(); + throw new RuntimeException("Setting state to Connecting failed! Was another Output Stream created?"); + } + + if (tempTimeout < 0) { + tempTimeout = Long.MAX_VALUE; + } + + long tempStart = System.currentTimeMillis(); + while (true) { + + if (System.currentTimeMillis() - tempStart > tempTimeout) { + close(); + throw new RuntimeException("Timeout while waiting for input stream to open, the ring buffer is now closed!"); + } + + byte tempState; + + try { + tempState = getState(); + } catch (Exception exc) { + close(); + throw exc; + } + + if (tempState == 0 || closedFlag) { + close(); + throw new RuntimeException("Ring Buffer closed before it could open!"); + } + + if (tempState == 1) { + try { + Thread.sleep(tempSpinTime); + } catch (InterruptedException exc) { + close(); + throw exc; + } + continue; + } + + if (tempState == 2) { + try { + if (useInterrupts) { + if (memory.read(address + INTERRUPT_FLAG_READ_OFFSET) != 1) { + shmemory.removeInterruptServiceRoutine(vector, interruptServiceRoutine); + useInterrupts = false; + } else { + otherPeer = memory.readInt(address + READ_PEER_OFFSET); + if (shmemory.knowsOtherPeers() && !shmemory.isOtherPeerConnected(otherPeer)) { + throw new RuntimeException("Other peer reported a peer id that is not connected!"); + } + } + } + + } catch (Exception exc) { + close(); + throw exc; + } + return new RingBufferOutputStream(); + } + + close(); + throw new RuntimeException("External write overwrote state byte with garbage data!"); + } + } + + /** + * Try to connect the Input Stream to a Output Stream. + * + * @param aTimeout timeout before throwing a Shared Memory Exception + * @param aSpinTime during the connection process this thread will spin. + * If this value is write to a low value then the output stream may connect faster but it will also tax the CPU more during the connection process. + * This value should be at least 10 times smaller than your timeout. + * @return the connected InputStream that can be read from. + * @throws InterruptedException + */ + public InputStream connectInputStream(long aTimeout, long aSpinTime, TimeUnit aUnit) throws InterruptedException { + long tempTimeout = TimeUnit.MILLISECONDS.convert(aTimeout, aUnit); + long tempSpinTime = TimeUnit.MILLISECONDS.convert(aSpinTime, aUnit); + + accessLock.lock(); + try { + if (isClosed()) { + throw new RuntimeException("Already closed!"); + } + + if (wasOpened()) { + throw new IllegalStateException("Already configured for either output or input!"); + } + + long tempStart = System.currentTimeMillis(); + while (true) { + byte tempState = getState(); + + if (tempState == STATE_CONNECTING) { + break; + } + + if (System.currentTimeMillis() - tempStart > tempTimeout) { + throw new RuntimeException("Timeout while waiting for input stream to open, the ring buffer is now closed!"); + } + + Thread.sleep(tempSpinTime); + } + + long tempSize = memory.readLong(address + SIZE_OFFSET); + + if (tempSize < OVERHEAD + 1) { + throw new IllegalArgumentException("Buffer must be at least " + (OVERHEAD + 2) + " bytes big to store overhead and at least 2 bytes for transfer!"); + } + + if (!memory.isValid(address, tempSize)) { + throw new IllegalArgumentException("Shared memory is too small to hold the buffer with the given size at the given offset!"); + } + + if (memory.read(address + INTERRUPT_FLAG_WRITE_OFFSET) == 1) { + useInterrupts = true; + } else { + useInterrupts = false; + } + + if (!shmemory.supportsInterrupts() || !shmemory.hasOwnPeerID()) { + useInterrupts = false; + } + + int tempVector = -1; + if (useInterrupts) { + tempVector = readVector(); + if (!shmemory.isVectorValid(tempVector)) { + useInterrupts = false; + } + } + int tempPeer = -1; + if (useInterrupts) { + tempPeer = memory.readInt(address + WRITE_PEER_OFFSET); + if (shmemory.knowsOtherPeers()) { + if (!shmemory.isOtherPeerConnected(tempPeer)) { + useInterrupts = false; + } + } + } + + + wasOpened = true; + isReadFlag = true; + size = tempSize - OVERHEAD; + + if (useInterrupts) { + otherPeer = tempPeer; + vector = tempVector; + memory.write(address + READ_PEER_OFFSET, shmemory.getOwnPeerID()); + memory.write(address + INTERRUPT_FLAG_READ_OFFSET, (byte) 1); + shmemory.registerInterruptServiceRoutine(tempVector, interruptServiceRoutine); + } else { + memory.write(address + INTERRUPT_FLAG_READ_OFFSET, (byte) 0); + } + + if (!setState(STATE_CONNECTING, STATE_CONNECTED)) { + close(); + throw new RuntimeException("Setting state to Connected failed! Was another Input Stream created or did the Output Stream just timeout?"); + } + + return new RingBufferInputStream(); + } finally { + accessLock.unlock(); + } + } + + /** + * Returns the amount of byte currently reable in a single read. + * Differs from getAvailableBytes in that is doesnt include readable bytes before the read index + * as those would require a wrap around in the ring buffer. + */ + protected long getReadableBytesInSingleRead() throws IOException { + long tempWriteIndex = getWriteIndex(); + long tempReadIndex = getReadIndex(); + + if (tempReadIndex <= tempWriteIndex) { + return tempWriteIndex - tempReadIndex; + } + return size - tempReadIndex; + } + + /** + * Returns the amount of bytes currently writeable in a single write. + * Differs from getFreeBytes in that it doesnt include the amount of writeable bytes before write index + * as those would require a wrap around in the ring buffer. + */ + protected long getBytesWritableInSingleWrite() throws IOException { + long tempWriteIndex = getWriteIndex(); + long tempReadIndex = getReadIndex(); + + long tempWB; + if (tempReadIndex <= tempWriteIndex) { + tempWB = size - tempWriteIndex; + } else { + tempWB = tempReadIndex - tempWriteIndex; + } + + if ((tempWriteIndex + tempWB) % size == tempReadIndex) { + tempWB--; + } + + return tempWB; + } + + /** + * Returns the total amount of bytes that can be currently written to the ring buffer before writing calls would + * start to block due to the buffer being full. + */ + public long getFreeBytes() throws IOException { + checkStateConnected(); + + long tempWriteIndex = getWriteIndex(); + long tempReadIndex = getReadIndex(); + + long tempWB; + if (tempReadIndex <= tempWriteIndex) { + tempWB = (size - tempWriteIndex) + tempReadIndex; + } else { + tempWB = tempReadIndex - tempWriteIndex; + } + + if ((tempWriteIndex + tempWB) % size == tempReadIndex) { + tempWB--; + } + + return tempWB; + } + + /** + * Returns the total amount of bytes that can be read from the ring buffer without blocking. + */ + public long getAvailableBytes() throws IOException { + checkStateConnected(); + long tempWriteIndex = getWriteIndex(); + long tempReadIndex = getReadIndex(); + + if (tempReadIndex <= tempWriteIndex) { + return tempWriteIndex - tempReadIndex; + } + return (size - tempReadIndex) + tempWriteIndex; + } + + /** + * Waits until at least one readable byte is readable from the buffer. + * Will return the amount of readable bytes which is guaranteed to be at least 1 or greater. + *

+ * Will only return amount of bytes that can be read in a single read operation (i.e. without wraparound in the ring buffer). + */ + protected long waitForAtLeastOneReadableByte() throws IOException { + long tempStart = System.currentTimeMillis(); + + checkStateConnected(); + long tempSpinTime = getSpinTimeToUse(); + long tempBytes = getReadableBytesInSingleRead(); + while (tempBytes == 0) { + checkStateConnected(); + if (tempSpinTime <= 0) { + tempBytes = getReadableBytesInSingleRead(); + continue; + } + + interruptLock.lock(); + try { + tempBytes = getReadableBytesInSingleRead(); + if (tempBytes == 0) { + try { + interruptCondition.await(tempSpinTime, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RingBufferInterruptedException(e); + } + checkStateConnected(); + } else { + return tempBytes; + } + } finally { + interruptLock.unlock(); + } + + tempBytes = getReadableBytesInSingleRead(); + + if (tempBytes == 0 && timeout >= 0 && System.currentTimeMillis() > tempStart + timeout) { + throw new RingBufferTimeoutException("Read Timeout"); + } + } + + return tempBytes; + } + + /** + * Utility method to wait until a certain number of bytes is available to write without blocking. + * The singleWrite parameter toggles if the method should only wait for bytes that be written to + * in a single write operation (i.e. they dont require a wrap around in the ring buffer). + *

+ * Returns the amount of available bytes. + */ + protected long waitForWritableBytes(int count, boolean singleWrite) throws IOException { + long tempStart = System.currentTimeMillis(); + + checkStateConnected(); + long tempSpinTime = getSpinTimeToUse(); + long tempBytes = singleWrite ? getBytesWritableInSingleWrite() : getFreeBytes(); + while (tempBytes < count) { + checkStateConnected(); + if (tempSpinTime <= 0) { + tempBytes = singleWrite ? getBytesWritableInSingleWrite() : getFreeBytes(); + continue; + } + interruptLock.lock(); + try { + tempBytes = singleWrite ? getBytesWritableInSingleWrite() : getFreeBytes(); + if (tempBytes < count) { + try { + interruptCondition.await(tempSpinTime, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RingBufferInterruptedException(e); + } + + checkStateConnected(); + } else { + return tempBytes; + } + } finally { + interruptLock.unlock(); + } + tempBytes = singleWrite ? getBytesWritableInSingleWrite() : getFreeBytes(); + + if (tempBytes < count && timeout >= 0 && System.currentTimeMillis() > tempStart + timeout) { + throw new RingBufferTimeoutException("Write Timeout"); + } + } + + return tempBytes; + } + + + class RingBufferInputStream extends InputStream { + + @Override + public int read() throws IOException { + accessLock.lock(); + try { + waitForAtLeastOneReadableByte(); + long tempIndex = getReadIndex(); + int tempByte = memory.read(address + BUFFER_START_OFFSET + tempIndex) & 0xFF; + setReadIndex((tempIndex + 1) % size); + triggerInterrupt(); + return tempByte; + } finally { + accessLock.unlock(); + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (len <= 0) { + return 0; + } + + accessLock.lock(); + try { + int tempBytes = (int) Math.min(len, waitForAtLeastOneReadableByte()); + long tempIndex = getReadIndex(); + memory.read(address + BUFFER_START_OFFSET + tempIndex, b, off, tempBytes); + setReadIndex((tempIndex + tempBytes) % size); + triggerInterrupt(); + return tempBytes; + } finally { + accessLock.unlock(); + } + } + + @Override + public long skip(long n) throws IOException { + accessLock.lock(); + try { + long tempSkipped = 0; + try { + while (n > 0) { + long tempBytesToSkip = Math.min(n, waitForAtLeastOneReadableByte()); + n -= tempBytesToSkip; + long tempNewReadIndex = getReadIndex() + tempBytesToSkip; + setReadIndex(tempNewReadIndex % size); + triggerInterrupt(); + } + } catch (RingBufferInterruptedException | RingBufferTimeoutException exc) { + //DC. + } + return tempSkipped; + } finally { + accessLock.unlock(); + } + } + + @Override + public int available() throws IOException { + accessLock.lock(); + try { + long tempBytes = getFreeBytes(); + if (tempBytes > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + + return (int) tempBytes; + } finally { + accessLock.unlock(); + } + } + + @Override + public void close() { + RingBuffer.this.close(); + } + } + + + class RingBufferOutputStream extends OutputStream { + + public void write(int b) throws IOException { + accessLock.lock(); + try { + waitForWritableBytes(1, true); + long tempIndex = getWriteIndex(); + memory.write(address + BUFFER_START_OFFSET + tempIndex, (byte) b); + setWriteIndex((tempIndex + 1) % size); + triggerInterrupt(); + } finally { + accessLock.unlock(); + } + } + + + @Override + public void write(byte[] b, int off, int len) throws IOException { + accessLock.lock(); + try { + + if (timeout >= 0) { + if (len >= size) { + throw new IOException("You have write a timeout, to avoid having to do partial writes the entire write operation has to fit into the ring buffer at once. " + + "You intend to write " + len + " bytes but the ring buffer is only " + size + " bytes big. (1 byte is reserved) If you wish to enable partial writes dont write a timeout."); + } + waitForWritableBytes(len, false); + } + + while (len > 0) { + try { + long tempCurrentPass = writePartial(b, off, len); + off += tempCurrentPass; + len -= tempCurrentPass; + } catch (RingBufferInterruptedException exc) { + exc.bytesTruncated = len; + throw exc; + } + } + } finally { + accessLock.unlock(); + } + } + + /** + * writes the byte array partially returning the amount of bytes written. + * This is required when we reach the high edge of the buffer and need to wrap around. + * In this case even if the entire write operation would fit into the buffer we would need + * to make 2 writes regardless. This method is also useful when only a few bytes can be written + * because the buffer is full. + */ + protected int writePartial(byte[] b, int off, int len) throws IOException { + if (len <= 0) { + return 0; + } + + int tempBytes = (int) Math.min(len, waitForWritableBytes(1, true)); + long tempIndex = getWriteIndex(); + memory.write(address + BUFFER_START_OFFSET + tempIndex, b, off, tempBytes); + setWriteIndex((tempIndex + tempBytes) % size); + triggerInterrupt(); + return tempBytes; + } + + @Override + public void close() { + RingBuffer.this.close(); + } + } + + public static class RingBufferTimeoutException extends IOException { + public RingBufferTimeoutException(String message) { + super(message); + } + } + + /** + * Exception thrown when the reading/writing thread is interrupted during write/read operations. + */ + public static class RingBufferInterruptedException extends IOException { + protected int bytesTruncated; + + public RingBufferInterruptedException(InterruptedException exc) { + super(exc); + } + + /** + * If a write operation is interrupted then this method call will return the amount of bytes at the end of the + * buffer that were truncated (as in not written to the ring buffer) in a partial write operation. + * If this returns 0 then this call was either a reading call or no bytes were truncated. + * + * @return + */ + public int getBytesTruncated() { + return bytesTruncated; + } + } +} diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/packets/AbstractPacket.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/AbstractPacket.java similarity index 93% rename from src/main/java/de/aschuetz/tcp2ivshmem/packets/AbstractPacket.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/AbstractPacket.java index 06d5602..02df572 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/packets/AbstractPacket.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/AbstractPacket.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,7 +17,7 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.packets; +package io.github.alexanderschuetz97.tcp2ivshmem.packets; import java.io.DataInputStream; import java.io.DataOutputStream; diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet1Connect.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet1Connect.java similarity index 95% rename from src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet1Connect.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet1Connect.java index 5152eac..aaff56e 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet1Connect.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet1Connect.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,7 +17,7 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.packets; +package io.github.alexanderschuetz97.tcp2ivshmem.packets; import java.io.DataInputStream; import java.io.DataOutputStream; diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet2Fin.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet2Fin.java similarity index 93% rename from src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet2Fin.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet2Fin.java index 006f212..a4f7868 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet2Fin.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet2Fin.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,7 +17,7 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.packets; +package io.github.alexanderschuetz97.tcp2ivshmem.packets; import java.io.DataInputStream; import java.io.DataOutputStream; diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet3Rst.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet3Rst.java similarity index 93% rename from src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet3Rst.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet3Rst.java index fc95d27..98a08c7 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet3Rst.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet3Rst.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,7 +17,7 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.packets; +package io.github.alexanderschuetz97.tcp2ivshmem.packets; import java.io.DataInputStream; import java.io.DataOutputStream; diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet4Data.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet4Data.java similarity index 94% rename from src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet4Data.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet4Data.java index 959c31d..3000f67 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet4Data.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet4Data.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,7 +17,7 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.packets; +package io.github.alexanderschuetz97.tcp2ivshmem.packets; import java.io.DataInputStream; import java.io.DataOutputStream; diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet5OpenServer.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet5OpenServer.java similarity index 96% rename from src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet5OpenServer.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet5OpenServer.java index c6fbe5c..c94fdac 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet5OpenServer.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet5OpenServer.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,7 +17,7 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.packets; +package io.github.alexanderschuetz97.tcp2ivshmem.packets; import java.io.DataInputStream; import java.io.DataOutputStream; diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet6OpenServerResult.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet6OpenServerResult.java similarity index 94% rename from src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet6OpenServerResult.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet6OpenServerResult.java index 3fe37d9..bb4e389 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/packets/Packet6OpenServerResult.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/Packet6OpenServerResult.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,7 +17,7 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.packets; +package io.github.alexanderschuetz97.tcp2ivshmem.packets; import java.io.DataInputStream; import java.io.DataOutputStream; diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/packets/PacketEnum.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/PacketEnum.java similarity index 95% rename from src/main/java/de/aschuetz/tcp2ivshmem/packets/PacketEnum.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/PacketEnum.java index 0968826..989c21a 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/packets/PacketEnum.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/PacketEnum.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,7 +17,7 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.packets; +package io.github.alexanderschuetz97.tcp2ivshmem.packets; public enum PacketEnum { CONNECT(1) { diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/packets/PacketUtil.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/PacketUtil.java similarity index 96% rename from src/main/java/de/aschuetz/tcp2ivshmem/packets/PacketUtil.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/PacketUtil.java index ee030e1..60d1f84 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/packets/PacketUtil.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/packets/PacketUtil.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,7 +17,7 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.packets; +package io.github.alexanderschuetz97.tcp2ivshmem.packets; import java.io.*; import java.util.concurrent.atomic.AtomicInteger; diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/servers/Socks5Server.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/servers/Socks5Server.java similarity index 97% rename from src/main/java/de/aschuetz/tcp2ivshmem/servers/Socks5Server.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/servers/Socks5Server.java index 3a8fba8..9d40fc1 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/servers/Socks5Server.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/servers/Socks5Server.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,10 +17,10 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.servers; +package io.github.alexanderschuetz97.tcp2ivshmem.servers; -import de.aschuetz.tcp2ivshmem.Main; -import de.aschuetz.tcp2ivshmem.ivshmem.IvshmemBridge; +import io.github.alexanderschuetz97.tcp2ivshmem.Main; +import io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.IvshmemBridge; import java.io.*; import java.net.ServerSocket; diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/servers/TcpServer.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/servers/TcpServer.java similarity index 90% rename from src/main/java/de/aschuetz/tcp2ivshmem/servers/TcpServer.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/servers/TcpServer.java index 5ef16bb..27cc9f4 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/servers/TcpServer.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/servers/TcpServer.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,10 +17,10 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.servers; +package io.github.alexanderschuetz97.tcp2ivshmem.servers; -import de.aschuetz.tcp2ivshmem.ivshmem.IvshmemBridge; -import de.aschuetz.tcp2ivshmem.Main; +import io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.IvshmemBridge; +import io.github.alexanderschuetz97.tcp2ivshmem.Main; import java.io.IOException; import java.net.ServerSocket; diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/sockets/TcpSocket.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/sockets/TcpSocket.java similarity index 91% rename from src/main/java/de/aschuetz/tcp2ivshmem/sockets/TcpSocket.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/sockets/TcpSocket.java index e3741f0..b99b435 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/sockets/TcpSocket.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/sockets/TcpSocket.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,11 +17,12 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.sockets; +package io.github.alexanderschuetz97.tcp2ivshmem.sockets; -import de.aschuetz.tcp2ivshmem.ivshmem.IvshmemBridge; -import de.aschuetz.tcp2ivshmem.Main; -import de.aschuetz.tcp2ivshmem.packets.PacketUtil; +import io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.IvshmemBridge; +import io.github.alexanderschuetz97.tcp2ivshmem.Main; +import io.github.alexanderschuetz97.tcp2ivshmem.packets.PacketUtil; +import io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.Constants; import java.io.IOException; import java.io.InputStream; @@ -31,8 +32,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -import static de.aschuetz.tcp2ivshmem.ivshmem.Constants.*; - public class TcpSocket { private final int id; @@ -44,7 +43,7 @@ public class TcpSocket { private final OutputStream output; private volatile boolean running = true; private boolean isFin; - private BlockingQueue toTcpQueue = new ArrayBlockingQueue<>(PACKET_QUEUE_SIZE); + private BlockingQueue toTcpQueue = new ArrayBlockingQueue<>(Constants.PACKET_QUEUE_SIZE); public TcpSocket(int id, TcpSocketContainer container, IvshmemBridge ivshmemBridge, Socket socket) throws IOException { @@ -157,7 +156,7 @@ public void queueData(byte[] data) throws IOException { } private void read() { - byte[] buf = new byte[SOCKET_BUFFER_SIZE]; + byte[] buf = new byte[Constants.SOCKET_BUFFER_SIZE]; while(running) { try { @@ -195,7 +194,7 @@ private void write() { byte[] data; try { - data = toTcpQueue.poll(SPIN_SOCKET_QUEUE, TimeUnit.MILLISECONDS); + data = toTcpQueue.poll(Constants.SPIN_SOCKET_QUEUE, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { closeWithRst(); return; diff --git a/src/main/java/de/aschuetz/tcp2ivshmem/sockets/TcpSocketContainer.java b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/sockets/TcpSocketContainer.java similarity index 94% rename from src/main/java/de/aschuetz/tcp2ivshmem/sockets/TcpSocketContainer.java rename to src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/sockets/TcpSocketContainer.java index c63c1cc..304bdaf 100644 --- a/src/main/java/de/aschuetz/tcp2ivshmem/sockets/TcpSocketContainer.java +++ b/src/main/java/io/github/alexanderschuetz97/tcp2ivshmem/sockets/TcpSocketContainer.java @@ -1,5 +1,5 @@ /* - * Copyright Alexander Schütz, 2020 + * Copyright Alexander Schütz, 2020-2022 * * This file is part of tcp2ivshmem. * @@ -17,10 +17,10 @@ * in the COPYING file in top level directory of tcp2ivshmem. * If not, see . */ -package de.aschuetz.tcp2ivshmem.sockets; +package io.github.alexanderschuetz97.tcp2ivshmem.sockets; -import de.aschuetz.tcp2ivshmem.ivshmem.IvshmemBridge; -import de.aschuetz.tcp2ivshmem.packets.PacketUtil; +import io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.IvshmemBridge; +import io.github.alexanderschuetz97.tcp2ivshmem.packets.PacketUtil; import java.io.IOException; import java.net.Socket; diff --git a/src/test/java/io/github/alexanderschuetz97/tcp2ivshmem/RingBufferTest.java b/src/test/java/io/github/alexanderschuetz97/tcp2ivshmem/RingBufferTest.java new file mode 100644 index 0000000..f6cfe95 --- /dev/null +++ b/src/test/java/io/github/alexanderschuetz97/tcp2ivshmem/RingBufferTest.java @@ -0,0 +1,389 @@ +/* + * Copyright Alexander Schütz, 2020-2022 + * + * This file is part of tcp2ivshmem. + * + * tcp2ivshmem is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * tcp2ivshmem is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * A copy of the GNU General Public License should be provided + * in the COPYING file in top level directory of tcp2ivshmem. + * If not, see . + */ + +package io.github.alexanderschuetz97.tcp2ivshmem; + +import io.github.alexanderschuetz97.tcp2ivshmem.ivshmem.RingBuffer; +import io.github.alexanderschuetz97.ivshmem4j.api.Ivshmem; +import io.github.alexanderschuetz97.ivshmem4j.api.IvshmemMemory; +import io.github.alexanderschuetz97.nativeutils.api.NativeMemory; +import io.github.alexanderschuetz97.nativeutils.impl.NativeLibraryLoaderHelper; +import org.junit.*; + +import java.io.*; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +public class RingBufferTest { + + private Random rng = new Random(); + + private ExecutorService ex; + + private File shmemfile; + + private NativeMemory memoryInput; + + private NativeMemory memoryOutput; + + private IvshmemMemory shmemoryInput; + + private IvshmemMemory shmemoryOutput; + + private RingBuffer bufferInput; + + private RingBuffer bufferOutput; + + private OutputStream outputStream; + + private InputStream inputStream; + + private DataOutputStream dout; + + private DataInputStream din; + + @BeforeClass + public static void setupJNI() { + System.out.println("These Tests might take a lot of time!"); + NativeLibraryLoaderHelper.loadNativeLibraries(); + } + + @Before + public void before() throws Throwable { + rng.setSeed(System.currentTimeMillis()); + shmemfile = new File("/dev/shm/" + getClass().getSimpleName() + Math.abs(rng.nextInt())); + if (shmemfile.exists()) { + shmemfile.delete(); + } + shmemoryInput = Ivshmem.plain(shmemfile.getAbsolutePath(), 4096); + shmemoryOutput = Ivshmem.plain(shmemfile.getAbsolutePath(), 4096); + memoryInput = shmemoryInput.getMemory(); + memoryOutput = shmemoryOutput.getMemory(); + + shmemfile.deleteOnExit(); + rng.setSeed(0); + + for (long i = 0; i < memoryInput.size(); i++) { + memoryInput.write(i, (byte) 0); + } + + bufferInput = new RingBuffer(shmemoryInput, 0, 64); + bufferOutput = new RingBuffer(shmemoryOutput, 0, 64); + ex = Executors.newCachedThreadPool(); + } + + @After + public void after() { + ex.shutdownNow(); + bufferInput.close(); + bufferOutput.close(); + memoryInput.close(); + memoryOutput.close(); + shmemfile.delete(); + } + + @Test + public void connectBuffers() throws Throwable { + Assert.assertFalse(bufferInput.wasOpened()); + Assert.assertFalse(bufferOutput.wasOpened()); + Assert.assertFalse(bufferInput.isClosed()); + Assert.assertFalse(bufferOutput.isClosed()); + Assert.assertFalse(bufferInput.isOpen()); + Assert.assertFalse(bufferOutput.isOpen()); + + Assert.assertFalse(bufferInput.canConnectInputStream()); + Assert.assertTrue(bufferOutput.canConnectOutputStream()); + + + Future tempFutur = ex.submit(new Callable() { + + @Override + public OutputStream call() throws Exception { + return bufferOutput.connectOutputStream(4096, 5000, 64, TimeUnit.MILLISECONDS); + } + }); + + long tempTime = System.currentTimeMillis(); + + while (!bufferInput.canConnectInputStream()) { + if (System.currentTimeMillis() - tempTime > 5000) { + //The Future likely failed print its exception if not + tempFutur.get(); + //Then fail anyways. + Assert.fail("Took to long!"); + } + } + + inputStream = bufferInput.connectInputStream(0, 64, TimeUnit.MILLISECONDS); + outputStream = tempFutur.get(5000, TimeUnit.MILLISECONDS); + + Assert.assertNotNull(inputStream); + Assert.assertNotNull(outputStream); + + dout = new DataOutputStream(outputStream); + din = new DataInputStream(inputStream); + } + + + @Test + public void basic() throws Throwable { + connectBuffers(); + + dout.writeUTF("Hello World"); + Assert.assertEquals("Hello World", din.readUTF()); + + int tempInt = rng.nextInt(); + dout.writeInt(tempInt); + Assert.assertEquals(tempInt, din.readInt()); + + int tempShort = rng.nextInt() & 0xffff; + dout.writeShort(tempShort); + Assert.assertEquals(tempShort, din.readUnsignedShort()); + + + int tempByte = rng.nextInt() & 0xff; + dout.writeByte(tempByte); + Assert.assertEquals(tempByte, din.readUnsignedByte()); + + byte[] tempWrite = new byte[64]; + byte[] tempRead = new byte[64]; + rng.nextBytes(tempWrite); + dout.write(tempWrite); + din.readFully(tempRead); + Assert.assertTrue(Arrays.equals(tempWrite, tempRead)); + } + + + private static final int DURATION = 60000; + private static final int BUFSIZ = 444; + + @Test + public void testReadWriteALot() throws Throwable { + connectBuffers(); + //bufferInput.setSpinTime(0); + //bufferOutput.setSpinTime(0); + final ConcurrentLinkedQueue tempQ = new ConcurrentLinkedQueue<>(); + final AtomicBoolean go = new AtomicBoolean(true); + + Future tempFutur = ex.submit(new Callable() { + + @Override + public Object call() throws Exception { + while (go.get()) { + byte[] tempB = new byte[BUFSIZ]; + rng.nextBytes(tempB); + tempQ.add(tempB); + outputStream.write(tempB); + } + + return null; + } + }); + + Future tempFutur2 = ex.submit(new Callable() { + + @Override + public Object call() throws Exception { + byte[] tempBuf = new byte[BUFSIZ]; + long tempT = 0; + long tempB = 0; + while (go.get()) { + while (tempQ.peek() == null && go.get()) { + Thread.sleep(1); + } + + if (!go.get()) { + return null; + } + long tempCur = System.nanoTime(); + din.readFully(tempBuf); + tempT += (System.nanoTime() - tempCur); + tempB += tempBuf.length; + Assert.assertTrue(Arrays.equals(tempBuf, tempQ.poll())); + //System.out.println("Performance " + ((long) Math.floor(((double)tempB) / (((double)tempT) / 1000000000d))) + " b/s"); + } + return null; + } + }); + + long tempStart = System.currentTimeMillis(); + while (System.currentTimeMillis() - tempStart < DURATION) { + Thread.sleep(200); + if (tempFutur.isDone()) { + tempFutur.get(); + Assert.fail(); + } + + if (tempFutur2.isDone()) { + tempFutur2.get(); + Assert.fail(); + } + } + go.set(false); + + + tempFutur.get(5000, TimeUnit.MILLISECONDS); + tempFutur2.get(5000, TimeUnit.MILLISECONDS); + } + + @Test + public void testReadWriteBlock() throws Throwable { + connectBuffers(); + + Assert.assertEquals(bufferOutput.getBufferSize() - 1, bufferOutput.getFreeBytes()); + Assert.assertEquals(0, bufferInput.getAvailableBytes()); + Future tempFutur = ex.submit(new Callable() { + + @Override + public Integer call() throws Exception { + return din.readInt(); + } + }); + + try { + tempFutur.get(2000, TimeUnit.MILLISECONDS); + Assert.fail(); + } catch (TimeoutException exc) { + + } + + dout.writeInt(25); + Assert.assertEquals(new Integer(25), tempFutur.get()); + + Assert.assertEquals(bufferOutput.getBufferSize() - 1, bufferOutput.getFreeBytes()); + int tempSiz = (int) bufferOutput.getFreeBytes(); + byte[] toFill = new byte[tempSiz]; + rng.nextBytes(toFill); + dout.write(toFill); + Assert.assertEquals(0, bufferOutput.getFreeBytes()); + Assert.assertEquals(bufferInput.getBufferSize() - 1, bufferInput.getAvailableBytes()); + + tempFutur = ex.submit(new Callable() { + + @Override + public Object call() throws Exception { + dout.write(0); + return null; + } + }); + + try { + tempFutur.get(2000, TimeUnit.MILLISECONDS); + Assert.fail(); + } catch (TimeoutException exc) { + + } + + Assert.assertEquals(0, bufferOutput.getFreeBytes()); + Assert.assertEquals(bufferInput.getBufferSize() - 1, bufferInput.getAvailableBytes()); + + din.read(); + + tempFutur.get(2000, TimeUnit.MILLISECONDS); + + Assert.assertEquals(0, bufferOutput.getFreeBytes()); + Assert.assertEquals(bufferInput.getBufferSize() - 1, bufferInput.getAvailableBytes()); + + din.readFully(new byte[128]); + + Assert.assertEquals(128, bufferOutput.getFreeBytes()); + Assert.assertEquals(bufferInput.getBufferSize() - 129, bufferInput.getAvailableBytes()); + } + + @Test + public void testReadWriteTimeout() throws Throwable { + connectBuffers(); + bufferInput.setTimeout(500); + bufferOutput.setTimeout(500); + + Assert.assertEquals(bufferOutput.getBufferSize() - 1, bufferOutput.getFreeBytes()); + Assert.assertEquals(0, bufferInput.getAvailableBytes()); + Callable tempCall = new Callable() { + + @Override + public Integer call() throws Exception { + return din.readInt(); + } + }; + + Future tempFutur = ex.submit(tempCall); + + try { + tempFutur.get(2000, TimeUnit.MILLISECONDS); + Assert.fail(); + } catch (TimeoutException exc) { + Assert.fail(); + } catch (ExecutionException exc) { + Assert.assertTrue(exc.getCause() instanceof RingBuffer.RingBufferTimeoutException); + } + + + Assert.assertEquals(bufferOutput.getBufferSize() - 1, bufferOutput.getFreeBytes()); + Assert.assertEquals(0, bufferInput.getAvailableBytes()); + + tempFutur = ex.submit(tempCall); + Thread.sleep(100); + dout.writeInt(4); + + Assert.assertEquals(new Integer(4), tempFutur.get(2000, TimeUnit.MILLISECONDS)); + Assert.assertEquals(bufferOutput.getBufferSize() - 1, bufferOutput.getFreeBytes()); + Assert.assertEquals(0, bufferInput.getAvailableBytes()); + + int tempSiz = (int) bufferOutput.getFreeBytes(); + byte[] toFill = new byte[tempSiz]; + rng.nextBytes(toFill); + dout.write(toFill); + Assert.assertEquals(0, bufferOutput.getFreeBytes()); + Assert.assertEquals(bufferInput.getBufferSize() - 1, bufferInput.getAvailableBytes()); + + tempCall = new Callable() { + @Override + public Integer call() throws Exception { + dout.writeInt(4); + return null; + } + }; + + tempFutur = ex.submit(tempCall); + + try { + tempFutur.get(2000, TimeUnit.MILLISECONDS); + Assert.fail(); + } catch (TimeoutException exc) { + Assert.fail(); + } catch (ExecutionException exc) { + Assert.assertTrue(exc.getCause() instanceof RingBuffer.RingBufferTimeoutException); + } + + + Assert.assertEquals(0, bufferOutput.getFreeBytes()); + Assert.assertEquals(bufferInput.getBufferSize() - 1, bufferInput.getAvailableBytes()); + + tempFutur = ex.submit(tempCall); + Thread.sleep(100); + din.readInt(); + tempFutur.get(5000, TimeUnit.MILLISECONDS); + + Assert.assertEquals(0, bufferOutput.getFreeBytes()); + Assert.assertEquals(bufferInput.getBufferSize() - 1, bufferInput.getAvailableBytes()); + } +}