Skip to content

Commit

Permalink
Bugifx/async tests (#22)
Browse files Browse the repository at this point in the history
* bugfix async tests

* bugfix async tests: ArangoCollectionTest

* bugfix async tests: ArangoDatabaseTest

* bugfix async tests: ArangoDBTest

* bugfix pending futures

* added dbg console logs

* simplified ArangoExecutorAsync::execute

* working deleteIndex test example

* bugfix cascade futures execution

* nestedGetVersion test

* cleanup tests console logs

* fix geo test

* fix explain test

* bugfix updateDocuments test

* commented getSelectivityEstimate assertions

* concurrency limit test

* output buffer thread test

* outgoing executor service
  • Loading branch information
rashtao authored Aug 26, 2019
1 parent 1aea502 commit f5ddd9f
Show file tree
Hide file tree
Showing 9 changed files with 3,826 additions and 3,972 deletions.
96 changes: 53 additions & 43 deletions src/main/java/com/arangodb/internal/ArangoExecutorAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,65 +20,75 @@

package com.arangodb.internal;

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;

import com.arangodb.ArangoDBException;
import com.arangodb.internal.net.HostHandle;
import com.arangodb.internal.util.ArangoSerializationFactory;
import com.arangodb.internal.velocystream.VstCommunicationAsync;
import com.arangodb.velocypack.exception.VPackException;
import com.arangodb.velocystream.Request;

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author Mark Vollmary
*
* @author Michele Rastelli
*/
public class ArangoExecutorAsync extends ArangoExecutor {

private final VstCommunicationAsync communication;
private final VstCommunicationAsync communication;
private final ExecutorService outgoingExecutor = Executors.newSingleThreadExecutor();

public ArangoExecutorAsync(final VstCommunicationAsync communication, final ArangoSerializationFactory util,
final DocumentCache documentCache) {
super(util, documentCache);
this.communication = communication;
}

public ArangoExecutorAsync(final VstCommunicationAsync communication, final ArangoSerializationFactory util,
final DocumentCache documentCache) {
super(util, documentCache);
this.communication = communication;
}
public <T> CompletableFuture<T> execute(final Request request, final Type type) {
return execute(request, (response) -> createResult(type, response));
}

public <T> CompletableFuture<T> execute(final Request request, final Type type) {
return execute(request, (response) -> createResult(type, response));
}
public <T> CompletableFuture<T> execute(final Request request, final Type type, final HostHandle hostHandle) {
return execute(request, (response) -> createResult(type, response), hostHandle);
}

public <T> CompletableFuture<T> execute(final Request request, final Type type, final HostHandle hostHandle) {
return execute(request, (response) -> createResult(type, response), hostHandle);
}
public <T> CompletableFuture<T> execute(final Request request, final ResponseDeserializer<T> responseDeserializer) {
return execute(request, responseDeserializer, null);
}

public <T> CompletableFuture<T> execute(final Request request, final ResponseDeserializer<T> responseDeserializer) {
return execute(request, responseDeserializer, null);
}
public <T> CompletableFuture<T> execute(
final Request request,
final ResponseDeserializer<T> responseDeserializer,
final HostHandle hostHandle) {

public <T> CompletableFuture<T> execute(
final Request request,
final ResponseDeserializer<T> responseDeserializer,
final HostHandle hostHandle) {
final CompletableFuture<T> result = new CompletableFuture<>();
communication.execute(request, hostHandle).whenComplete((response, ex) -> {
if (response != null) {
try {
result.complete(responseDeserializer.deserialize(response));
} catch (final VPackException | ArangoDBException e) {
result.completeExceptionally(e);
}
} else if (ex != null) {
result.completeExceptionally(ex);
} else {
result.cancel(true);
}
});
return result;
}
CompletableFuture<T> result = new CompletableFuture<>();
outgoingExecutor.execute(() -> {
try {
communication.execute(request, hostHandle)
.whenCompleteAsync((response, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
} else if (response != null) {
try {
result.complete(responseDeserializer.deserialize(response));
} catch (final VPackException | ArangoDBException e) {
result.completeExceptionally(e);
}
}
});
} catch (ArangoDBException e) {
result.completeExceptionally(e);
}
}
);
return result;
}

public void disconnect() throws IOException {
communication.close();
}
public void disconnect() throws IOException {
communication.close();
}
}
Loading

0 comments on commit f5ddd9f

Please sign in to comment.