Skip to content

Commit

Permalink
Merge pull request #4 from FYP-Live-Query/fixed
Browse files Browse the repository at this point in the history
fix aggregate avg of rtt.
  • Loading branch information
MaheshMadushan authored Jun 8, 2023
2 parents 407be2f + 57f1770 commit c843ddb
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 75 deletions.
47 changes: 18 additions & 29 deletions src/main/java/com/ksqlDB/Live/RestApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.Row;

import io.confluent.ksql.api.client.StreamedQueryResult;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.boot.autoconfigure.SpringBootApplication;
Expand All @@ -15,7 +16,9 @@
import java.io.IOException;
import java.time.Duration;
import java.util.List;

import java.util.concurrent.*;

import java.util.concurrent.atomic.AtomicInteger;

@SpringBootApplication
Expand All @@ -24,14 +27,17 @@ public class RestApplication {

public static String KSQLDB_SERVER_HOST = "172.174.71.151";
public static int KSQLDB_SERVER_HOST_PORT = 8088;
private final List<Long> latencyValues = new CopyOnWriteArrayList<>();

private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
public AtomicInteger iterateID = new AtomicInteger(0);
private final List<Long> latencyValues = new CopyOnWriteArrayList<>();
private final MeterRegistry meterRegistry;

public RestApplication(MeterRegistry meterRegistry) {
executorService.scheduleAtFixedRate(this::writeLatencyValuesToCsv, 1, 1, TimeUnit.MINUTES);
this.meterRegistry = meterRegistry;
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(this::writeLatencyValuesToCsv, 1, 1, TimeUnit.MINUTES);
}

private synchronized void writeLatencyValuesToCsv() {
Expand Down Expand Up @@ -82,34 +88,17 @@ public void runQuery() throws ExecutionException, InterruptedException {
.setPort(KSQLDB_SERVER_HOST_PORT);
Client client = Client.create(options);

// Send requests with the client by following the other examples
Thread streamingThread = new Thread(() -> {
StreamedQueryResult streamedQueryResult = null;
try {
streamedQueryResult = client.streamQuery("SELECT ip,ROWTIME FROM network EMIT CHANGES;").get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
client.streamQuery("SELECT ip,ROWTIME FROM network EMIT CHANGES;")
.thenAccept(streamedQueryResult -> {
System.out.println("Query has started. Query ID: " + streamedQueryResult.queryID());

while (true) {
// Block until a new row is available
Row row = streamedQueryResult.poll();
if (row != null) {
long current = System.currentTimeMillis();
long updated = (long) row.getValue("ROWTIME");
// skip processing rows that were created before the query started
if (updated > start) {
long latency = current - updated;
latencyValues.add(latency);
meterRegistry.timer(userId).record(Duration.ofMillis(latency));
System.out.println("latency: " + latency);
}
// Request the next row
} else {
System.out.println("Query has ended.");
}
}
});
streamingThread.start();
RowSubscriber subscriber = new RowSubscriber(userId, this.meterRegistry,start,latencyValues);
streamedQueryResult.subscribe(subscriber);
}).exceptionally(e -> {
System.out.println("Request failed: " + e);
return null;
});
// Terminate any open connections and close the client
// client.close();
}
}
51 changes: 5 additions & 46 deletions src/main/java/com/ksqlDB/Live/RowSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ public class RowSubscriber implements Subscriber<Row> {
private Subscription subscription;
private MeterRegistry meterRegistry;
private final long start;
private final List<Long> latencyValues = new CopyOnWriteArrayList<>();
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private final List<Long> latencyValues;


private final String userId;

public RowSubscriber(String userId, MeterRegistry meterRegistry,long start) {
public RowSubscriber(String userId, MeterRegistry meterRegistry,long start, List<Long> latencyValues) {
this.userId = userId;
this.meterRegistry = meterRegistry;
this.start =start;
executorService.scheduleAtFixedRate(this::writeLatencyValuesToCsv, 1, 1, TimeUnit.MINUTES);
this.latencyValues = latencyValues;

}

@Override
Expand All @@ -46,14 +47,6 @@ public synchronized void onSubscribe(Subscription subscription) {
@Override
public synchronized void onNext(Row row) {
long current = System.currentTimeMillis();
// String datetime = row.getValue("EVENTTIMESTAMP").toString();
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
// Date date = null;
// try {
// date = sdf.parse(datetime);
// } catch (ParseException e) {
// throw new RuntimeException(e);
// }
long updated = (long) row.getValue("ROWTIME");
// skip processing rows that were created before the query started
if(updated > start){
Expand All @@ -66,40 +59,6 @@ public synchronized void onNext(Row row) {
subscription.request(1);
}

private synchronized void writeLatencyValuesToCsv() {
try {
// Calculate average and 90th percentile of latency values
double averageLatency = latencyValues.stream()
.mapToLong(Long::longValue)
.average()
.orElse(Double.NaN);
double percentile95Latency = latencyValues.stream()
.sorted()
.skip((long) (latencyValues.size() * 0.95))
.findFirst()
.orElse(0L);
double percentile99Latency = latencyValues.stream()
.sorted()
.skip((long) (latencyValues.size() * 0.99))
.findFirst()
.orElse(0L);
// Write average and 90th percentile of latency values to CSV file
FileWriter csvWriter = new FileWriter("latency_values_ksql.csv", true);
csvWriter.append(Double.toString(averageLatency));
csvWriter.append(",");
csvWriter.append(Double.toString(percentile95Latency));
csvWriter.append(",");
csvWriter.append(Double.toString(percentile99Latency));
csvWriter.append("\n");
csvWriter.flush();
csvWriter.close();

// Clear the latency values list
latencyValues.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public synchronized void onError(Throwable t) {
System.out.println("Received an error: " + t);
Expand Down

0 comments on commit c843ddb

Please sign in to comment.