Skip to content

Commit

Permalink
Refactoring - adding details of query execution to KMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
dzmipt committed Dec 8, 2023
1 parent 6372874 commit bf53f9d
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 34 deletions.
25 changes: 17 additions & 8 deletions src/kx/KConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void setConnectionStateListener(ConnectionStateListener connectionStateLi

private final static byte[] HEADER = new byte[] {0,1,0,0};

private void send(K.KBase query) throws IOException {
private long send(K.KBase query) throws IOException {
ByteArrayOutputStream baosBody = new ByteArrayOutputStream();
query.serialise(baosBody);

Expand All @@ -134,22 +134,28 @@ private void send(K.KBase query) throws IOException {

outputStream.write(baosHeader.toByteArray());
outputStream.write(baosBody.toByteArray());
stats.sentBytes(baosHeader.size() + baosBody.size());
long sentBytes = baosHeader.size() + baosBody.size();
stats.sentBytes(sentBytes);
return sentBytes;
}

public synchronized K.KBase k(K.KBase x, ProgressCallback progress) throws K4Exception, IOException, InterruptedException {
public synchronized KMessage k(K.KBase x, ProgressCallback progress) throws K4Exception, IOException, InterruptedException {
try {
if (isClosed()) connect();
socketReader.setProgressCallback(progress);
send(x);
return socketReader.getResponse();
K.KTimestamp sentTime = K.KTimestamp.now();
long sentBytes = send(x);
KMessage message = socketReader.getResponse();
message.setStarted(sentTime);
message.setBytesSent(sentBytes);
return message;
} catch (IOException e) {
close();
throw e;
}
}

public K.KBase k(K.KBase x) throws K4Exception, IOException, InterruptedException {
public KMessage k(K.KBase x) throws K4Exception, IOException, InterruptedException {
return k(x, null);
}

Expand All @@ -174,7 +180,7 @@ private synchronized ProgressCallback getProgressCallback() {
return progress;
}

K.KBase getResponse() throws InterruptedException, K4Exception, IOException {
KMessage getResponse() throws InterruptedException, IOException {
KMessage response;
synchronized (lockRead) {
while (message == null) {
Expand All @@ -187,7 +193,7 @@ K.KBase getResponse() throws InterruptedException, K4Exception, IOException {
message = null;
lockWrite.notifyAll();
}
return response.getObject();
return response;
}

@Override
Expand All @@ -208,6 +214,7 @@ public void run() {
IPC ipc = new IPC(buffer, 4, false, isLittleEndian);
final int msgLength = ipc.ri() - 8;

K.KTimestamp receivedTime = K.KTimestamp.now();
if (response && connectionStateListener != null) {
connectionStateListener.checkIncomingLimit(msgLength);
}
Expand All @@ -233,6 +240,8 @@ public void run() {
if (response) {
synchronized (lockRead) {
message = IPC.deserialise(buffer, compressed, isLittleEndian);
message.setBytesReceived(msgLength);
message.setFinished(receivedTime);
lockRead.notifyAll();
}
synchronized (lockWrite) {
Expand Down
48 changes: 44 additions & 4 deletions src/kx/KMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ public class KMessage {
private K.KBase object = null;
private K4Exception error = null;
private IOException exception = null;
private long bytesSent = 0;
private long bytesReceived = 0;
private K.KTimestamp started = K.KTimestamp.NULL;
private K.KTimestamp finished = K.KTimestamp.NULL;


public KMessage(K.KBase result) {
this.object = result;
Expand All @@ -22,10 +27,45 @@ public KMessage(IOException exception) {
this.exception = exception;
}

public K.KBase getObject() throws K4Exception, IOException {
if (error != null) throw error;
if (exception != null) throw exception;

public K.KBase getObject() {
return object;
}

public Throwable getError() {
if (error != null) return error;
if (exception != null) return exception;
return null;
}

public long getBytesSent() {
return bytesSent;
}

public void setBytesSent(long bytesSent) {
this.bytesSent = bytesSent;
}

public long getBytesReceived() {
return bytesReceived;
}

public void setBytesReceived(long bytesReceived) {
this.bytesReceived = bytesReceived;
}

public K.KTimestamp getStarted() {
return started;
}

public void setStarted(K.KTimestamp started) {
this.started = started;
}

public K.KTimestamp getFinished() {
return finished;
}

public void setFinished(K.KTimestamp finished) {
this.finished = finished;
}
}
2 changes: 1 addition & 1 deletion src/studio/kdb/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ static void mock(SessionCreator sessionCreator) {
Session.sessionCreator = sessionCreator;
}

public K.KBase execute(K.KBase x, ProgressCallback progress) throws K4Exception, IOException, InterruptedException {
public KMessage execute(K.KBase x, ProgressCallback progress) throws K4Exception, IOException, InterruptedException {
return kConn.k(x, progress);
}

Expand Down
3 changes: 2 additions & 1 deletion src/studio/ui/action/ConnectionStats.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package studio.ui.action;

import kx.KConnectionStats;
import kx.KMessage;
import studio.kdb.K;
import studio.kdb.Session;
import studio.ui.StudioWindow;
Expand All @@ -12,7 +13,7 @@ public class ConnectionStats {

public static void getStats(StudioWindow studioWindow) {
QueryResult queryResult = new QueryResult(null, "");
queryResult.setResult(getTable());
queryResult.setResult(new KMessage(getTable()));
studioWindow.addResultTab(queryResult, "Connection statistics");
}

Expand Down
5 changes: 2 additions & 3 deletions src/studio/ui/action/QueryExecutor.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package studio.ui.action;

import kx.K4Exception;
import kx.KMessage;
import kx.ProgressCallback;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -127,10 +128,9 @@ private Session getSession() {
protected QueryResult doInBackground() {
QueryResult result = new QueryResult(server, queryText);
queryLog.info("#{}: query {}({})\n{}",queryIndex, server.getFullName(), server.getConnectionString(), queryText);
long startTime = System.currentTimeMillis();
try {
session = getSession();
K.KBase response = session.execute(query, QueryExecutor.this);
KMessage response = session.execute(query, QueryExecutor.this);
result.setResult(response);
} catch (Throwable e) {
if (! (e instanceof K4Exception)) {
Expand All @@ -139,7 +139,6 @@ protected QueryResult doInBackground() {
}
result.setError(e);
}
result.setExecutionTime(System.currentTimeMillis() - startTime);
if (result.getError() != null) {
if (result.getError() instanceof K4Exception) {
queryLog.info("#{}: server returns error {}", queryIndex, result.getError().getMessage());
Expand Down
16 changes: 7 additions & 9 deletions src/studio/ui/action/QueryResult.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package studio.ui.action;

import kx.KMessage;
import studio.kdb.K;
import studio.kdb.Server;

Expand All @@ -8,18 +9,19 @@ public class QueryResult {
private String query;
private Server server;

private K.KBase result = null;
private KMessage result = null;
private Throwable error = null;
private long executionTime = -1;
private boolean complete = false;

public QueryResult(Server server, String query) {
this.server = server;
this.query = query;
}

public void setResult(K.KBase result) {
public void setResult(KMessage result) {
this.result = result;
Throwable error = result.getError();
if (error != null) setError(error);
complete = true;
}

Expand All @@ -28,10 +30,6 @@ public void setError(Throwable error) {
complete = true;
}

public void setExecutionTime(long executionTime) {
this.executionTime = executionTime;
}

public boolean isComplete() {
return complete;
}
Expand All @@ -45,14 +43,14 @@ public Server getServer() {
}

public K.KBase getResult() {
return result;
return result.getObject();
}

public Throwable getError() {
return error;
}

public long getExecutionTime() {
return executionTime;
return result.getFinished().toLong() - result.getStarted().toLong();
}
}
11 changes: 4 additions & 7 deletions test-integration/studio/kdb/MockQSession.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package studio.kdb;

import kx.K4Exception;
import kx.KAuthentication;
import kx.KConnection;
import kx.ProgressCallback;
import kx.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -93,7 +90,7 @@ public boolean isClosed() {
}

@Override
public synchronized K.KBase k(K.KBase x, ProgressCallback progress) throws K4Exception, IOException {
public synchronized KMessage k(K.KBase x, ProgressCallback progress) throws K4Exception, IOException {
log.info("MockQSession.k - query execution");
queryCount.getAndIncrement();
closed = false;
Expand All @@ -110,7 +107,7 @@ public synchronized K.KBase k(K.KBase x, ProgressCallback progress) throws K4Exc
log.info("MockQSession.k - continue after locking");
}

if (kResponse != null) return kResponse;
if (kResponse != null) return new KMessage(kResponse);
if (kError != null) throw kError;

if (ioException != null) {
Expand All @@ -119,7 +116,7 @@ public synchronized K.KBase k(K.KBase x, ProgressCallback progress) throws K4Exc
}


return x;
return new KMessage(x);
}

public int getQueryCount() {
Expand Down
2 changes: 1 addition & 1 deletion test/studio/kdb/KSerialiseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static void exit() throws K4Exception, IOException, InterruptedException

private void test(K.KBase k) {
try {
K.KBase result = kConn.k(k);
K.KBase result = kConn.k(k).getObject();
assertEquals(k, result);
} catch (K4Exception | IOException | InterruptedException e) {
fail(e);
Expand Down

0 comments on commit bf53f9d

Please sign in to comment.