Skip to content

Commit

Permalink
Merge branch 'main' into fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
MaheshMadushan authored Jun 8, 2023
2 parents d2b10db + 407be2f commit 57f1770
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions src/main/java/com/ksqlDB/Live/RestApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.Row;

import io.confluent.ksql.api.client.StreamedQueryResult;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.CrossOrigin;
Expand All @@ -14,26 +16,28 @@
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.*;

import java.util.concurrent.atomic.AtomicInteger;

@SpringBootApplication
@RestController
public class RestApplication {

public static String KSQLDB_SERVER_HOST = "172.174.71.151";
public static int KSQLDB_SERVER_HOST_PORT = 8088;
private final List<Long> latencyValues = new CopyOnWriteArrayList<>();

private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
public AtomicInteger iterateID = new AtomicInteger(0);
private final List<Long> latencyValues = new CopyOnWriteArrayList<>();
private final MeterRegistry meterRegistry;

public RestApplication(MeterRegistry meterRegistry) {
executorService.scheduleAtFixedRate(this::writeLatencyValuesToCsv, 1, 1, TimeUnit.MINUTES);
this.meterRegistry = meterRegistry;
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
}

private synchronized void writeLatencyValuesToCsv() {
Expand Down Expand Up @@ -73,7 +77,7 @@ private synchronized void writeLatencyValuesToCsv() {

@GetMapping("/ksql")
@CrossOrigin
public void runQuery() {
public void runQuery() throws ExecutionException, InterruptedException {
// SpringApplication.run(LiveApplication.class, args);
StringBuilder str1 = new StringBuilder("id-");
long start=System.currentTimeMillis();
Expand All @@ -84,8 +88,6 @@ public void runQuery() {
.setPort(KSQLDB_SERVER_HOST_PORT);
Client client = Client.create(options);

// Send requests with the client by following the other examples

client.streamQuery("SELECT ip,ROWTIME FROM network EMIT CHANGES;")
.thenAccept(streamedQueryResult -> {
System.out.println("Query has started. Query ID: " + streamedQueryResult.queryID());
Expand All @@ -96,7 +98,6 @@ public void runQuery() {
System.out.println("Request failed: " + e);
return null;
});

// Terminate any open connections and close the client
// client.close();
}
Expand Down

0 comments on commit 57f1770

Please sign in to comment.