From 412cb17f826edbbcb417f939c8f847fa925d867b Mon Sep 17 00:00:00 2001 From: dzmipt Date: Sat, 6 Jul 2024 15:28:40 +0200 Subject: [PATCH] Implement mock kdb server and run KSerialiseTest via the mock server --- test/kx/KServerMock.java | 176 ++++++++++++++++++++++++++++ test/studio/kdb/KSerialiseTest.java | 37 +++++- 2 files changed, 207 insertions(+), 6 deletions(-) create mode 100644 test/kx/KServerMock.java diff --git a/test/kx/KServerMock.java b/test/kx/KServerMock.java new file mode 100644 index 00000000..5e31d13a --- /dev/null +++ b/test/kx/KServerMock.java @@ -0,0 +1,176 @@ +package kx; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import studio.kdb.K; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; + +public class KServerMock implements Runnable { + + private final ServerSocket serverSocket; + private volatile boolean running = true; + private List sessions = new ArrayList<>(); + private int port; + + private static final Logger log = LogManager.getLogger(); + + public KServerMock() throws IOException { + this(0); + } + + public KServerMock(int port) throws IOException { + serverSocket = new ServerSocket(port); + this.port = serverSocket.getLocalPort(); + new Thread(this, "Server listen port " + port).start(); + log.info("Server started on port {}", this.port); + } + + public int getPort() { + return port; + } + @Override + public void run() { + while (running) { + try { + Socket socket = serverSocket.accept(); + sessions.add(new Session(socket)); + } catch (IOException e) { + if (running) { + log.error("Error in the server socket", e); + } + } + } + running = false; + } + + public void shutdown() { + if (! running) { + log.info("Already stopped"); + return; + } + + running = false; + try { + for (Session session: sessions) { + if (session.isRunning()) { + session.shutdown(); + } + } + serverSocket.close(); + } catch (IOException e) { + log.info("Error during server socket closure", e); + } + } + + + static class Session implements Runnable { + + private static int index = 0; + private int thisIndex = index++; + + private final Socket socket; + private final InputStream inputStream; + private final OutputStream outputStream; + private boolean running = true; + + Session(Socket socket) throws IOException { + this.socket = socket; + inputStream = socket.getInputStream(); + outputStream = socket.getOutputStream(); + new Thread(this, "Session " + thisIndex).start(); + } + + public void run() { + try { + ByteArrayOutputStream baosConnection = new ByteArrayOutputStream(); + for (;;) { + int next = inputStream.read(); + if (next == -1) { + throw new IOException("Socket closed"); + } + + if (next == 0) break; + baosConnection.write((byte)next); + } + + byte[] bytes = baosConnection.toByteArray(); + log.info("Session {}: got connection '{}' with version {}", thisIndex, new String(bytes, 0, bytes.length-1), bytes[bytes.length-1]); + outputStream.write(3); + outputStream.flush(); + + while (running) { + byte[] header = new byte[8]; + for (int index = 0; index