From 7d298ac53da76ca3bc42521a31b23c39390fd08b Mon Sep 17 00:00:00 2001 From: Mahesh <58337761+MaheshMadushan@users.noreply.github.com> Date: Thu, 8 Jun 2023 17:17:56 +0530 Subject: [PATCH 1/3] fix aggregate avg of rtt. --- .../java/com/ksqlDB/Live/RestApplication.java | 87 ++++++++++++++++--- 1 file changed, 73 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/ksqlDB/Live/RestApplication.java b/src/main/java/com/ksqlDB/Live/RestApplication.java index a88b023..3fc5d32 100644 --- a/src/main/java/com/ksqlDB/Live/RestApplication.java +++ b/src/main/java/com/ksqlDB/Live/RestApplication.java @@ -2,6 +2,8 @@ import io.confluent.ksql.api.client.Client; import io.confluent.ksql.api.client.ClientOptions; +import io.confluent.ksql.api.client.Row; +import io.confluent.ksql.api.client.StreamedQueryResult; import io.micrometer.core.instrument.MeterRegistry; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.bind.annotation.CrossOrigin; @@ -9,24 +11,67 @@ import org.springframework.web.bind.annotation.RestController; import javax.security.auth.login.CredentialException; +import java.io.FileWriter; import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @SpringBootApplication @RestController public class RestApplication { + public static String KSQLDB_SERVER_HOST = "10.8.100.246"; public static int KSQLDB_SERVER_HOST_PORT = 8088; public AtomicInteger iterateID = new AtomicInteger(0); + private final List latencyValues = new CopyOnWriteArrayList<>(); private final MeterRegistry meterRegistry; public RestApplication(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + executorService.scheduleAtFixedRate(this::writeLatencyValuesToCsv, 1, 1, TimeUnit.MINUTES); + } + + private synchronized void writeLatencyValuesToCsv() { + try { + // Calculate average and 90th percentile of latency values + double averageLatency = latencyValues.stream() + .mapToLong(Long::longValue) + .average() + .orElse(Double.NaN); + double percentile95Latency = latencyValues.stream() + .sorted() + .skip((long) (latencyValues.size() * 0.95)) + .findFirst() + .orElse(0L); + double percentile99Latency = latencyValues.stream() + .sorted() + .skip((long) (latencyValues.size() * 0.99)) + .findFirst() + .orElse(0L); + // Write average and 90th percentile of latency values to CSV file + FileWriter csvWriter = new FileWriter("latency_values_ksql.csv", true); + csvWriter.append(Double.toString(averageLatency)); + csvWriter.append(","); + csvWriter.append(Double.toString(percentile95Latency)); + csvWriter.append(","); + csvWriter.append(Double.toString(percentile99Latency)); + csvWriter.append("\n"); + csvWriter.flush(); + csvWriter.close(); + + // Clear the latency values list + latencyValues.clear(); + } catch (IOException e) { + e.printStackTrace(); + } } @GetMapping("/ksql") @CrossOrigin - public void runQuery() { + public void runQuery() throws ExecutionException, InterruptedException { // SpringApplication.run(LiveApplication.class, args); StringBuilder str1 = new StringBuilder("id-"); long start=System.currentTimeMillis(); @@ -38,19 +83,33 @@ public void runQuery() { Client client = Client.create(options); // Send requests with the client by following the other examples + Thread streamingThread = new Thread(() -> { + StreamedQueryResult streamedQueryResult = null; + try { + streamedQueryResult = client.streamQuery("SELECT ip,ROWTIME FROM network EMIT CHANGES;").get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } - client.streamQuery("SELECT ip,ROWTIME FROM network EMIT CHANGES;") - .thenAccept(streamedQueryResult -> { - System.out.println("Query has started. Query ID: " + streamedQueryResult.queryID()); - - RowSubscriber subscriber = new RowSubscriber(userId, this.meterRegistry,start); - streamedQueryResult.subscribe(subscriber); - }).exceptionally(e -> { - System.out.println("Request failed: " + e); - return null; - }); - - // Terminate any open connections and close the client -// client.close(); + while (Thread.interrupted()) { + // Block until a new row is available + Row row = streamedQueryResult.poll(); + if (row != null) { + long current = System.currentTimeMillis(); + long updated = (long) row.getValue("ROWTIME"); + // skip processing rows that were created before the query started + if (updated > start) { + long latency = current - updated; + latencyValues.add(latency); + meterRegistry.timer(userId).record(Duration.ofMillis(latency)); + System.out.println("latency: " + latency); + } + // Request the next row + } else { + System.out.println("Query has ended."); + } + } + }, "streaming Thread"); + streamingThread.start(); } } From 998fc2d698286f2fc6f07d8cd00958e9b579a3dc Mon Sep 17 00:00:00 2001 From: Mahesh <58337761+MaheshMadushan@users.noreply.github.com> Date: Thu, 8 Jun 2023 17:43:23 +0530 Subject: [PATCH 2/3] while loop fixed --- src/main/java/com/ksqlDB/Live/RestApplication.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/ksqlDB/Live/RestApplication.java b/src/main/java/com/ksqlDB/Live/RestApplication.java index 3fc5d32..3cf9677 100644 --- a/src/main/java/com/ksqlDB/Live/RestApplication.java +++ b/src/main/java/com/ksqlDB/Live/RestApplication.java @@ -22,7 +22,7 @@ @RestController public class RestApplication { - public static String KSQLDB_SERVER_HOST = "10.8.100.246"; + public static String KSQLDB_SERVER_HOST = "172.174.71.151"; public static int KSQLDB_SERVER_HOST_PORT = 8088; public AtomicInteger iterateID = new AtomicInteger(0); private final List latencyValues = new CopyOnWriteArrayList<>(); @@ -91,7 +91,7 @@ public void runQuery() throws ExecutionException, InterruptedException { throw new RuntimeException(e); } - while (Thread.interrupted()) { + while (true) { // Block until a new row is available Row row = streamedQueryResult.poll(); if (row != null) { From 407be2f9663b5b400d079f119184b0c4c8acefb1 Mon Sep 17 00:00:00 2001 From: Mahesh <58337761+MaheshMadushan@users.noreply.github.com> Date: Thu, 8 Jun 2023 19:21:30 +0530 Subject: [PATCH 3/3] Thread name excluded --- src/main/java/com/ksqlDB/Live/RestApplication.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/ksqlDB/Live/RestApplication.java b/src/main/java/com/ksqlDB/Live/RestApplication.java index 3cf9677..b31b71d 100644 --- a/src/main/java/com/ksqlDB/Live/RestApplication.java +++ b/src/main/java/com/ksqlDB/Live/RestApplication.java @@ -109,7 +109,7 @@ public void runQuery() throws ExecutionException, InterruptedException { System.out.println("Query has ended."); } } - }, "streaming Thread"); + }); streamingThread.start(); } }