-
Notifications
You must be signed in to change notification settings - Fork 92
Asynchronous Operations
All the CRUD API, DSL API and RAW API support asynchronous execution
final CompletableFuture<Empty> future = userManager
.crud()
.insert(....)
.executeAsync();
final CompletableFuture<ExecutionInfo> future = userManager
.crud()
.insert(....)
.executeAsyncWithStats();
final CompletableFuture<User> future = userManager
.crud()
.findById(....)
.getOneAsync();
final CompletableFuture<Tuple2<User, ExecutionInfo>> future = userManager
.crud()
.findById(....)
.getOneAsyncWithStats();
final CompletableFuture<Empty> future = userManager
.crud()
.deleteById(....)
.executeAsync();
final CompletableFuture<ExecutionInfo> future = userManager
.crud()
.deleteById(....)
.executeAsyncWithStats();
final CompletableFuture<Empty> future = userManager
.crud()
.deleteByPartitionKeys(....)
.executeAsync();
final CompletableFuture<ExecutionInfo> future = userManager
.crud()
.deleteByPartitionKeys(....)
.executeAsyncWithStats();
final CompletableFuture<User> future = userManager
.dsl()
.select()
...
.getOneAsync();
final CompletableFuture<Tuple2<User, ExecutionInfo>> future = userManager
.dsl()
.select()
...
.getOneAsyncWithStats();
final CompletableFuture<List<User>> futures = userManager
.dsl()
.select()
...
.getListAsync();
final CompletableFuture<Tuple2<List<User>, ExecutionInfo>> futures = userManager
.dsl()
.select()
...
.getListAsyncWithStats();
final CompletableFuture<Empty> future = userManager
.dsl()
.update()
...
.executeAsync();
final CompletableFuture<ExecutionInfo> future = userManager
.dsl()
.update()
...
.executeAsyncWithStats();
final CompletableFuture<Empty> future = userManager
.dsl()
.delete()
...
.executeAsync();
final CompletableFuture<ExecutionInfo> future = userManager
.dsl()
.delete()
...
.executeAsyncWithStats();
final CompletableFuture<User> future = userManager
.raw()
.typedQueryForSelect(...)
...
.getOneAsync();
final CompletableFuture<Tuple2<User, ExecutionInfo>> future = userManager
.raw()
.typedQueryForSelect(...)
...
.getOneAsyncWithStats();
final CompletableFuture<List<User>> futures = userManager
.raw()
.typedQueryForSelect(...)
...
.getListAsync();
final CompletableFuture<Tuple2<List<User>, ExecutionInfo>> futures = userManager
.raw()
.typedQueryForSelect(...)
...
.getListAsyncWithStats();
final CompletableFuture<TypedMap> future = userManager
.raw()
.nativeQuery(...)
...
.getOneAsync();
final CompletableFuture<Tuple2<TypedMap, ExecutionInfo>> future = userManager
.raw()
.nativeQuery(...)
...
.getOneAsyncWithStats();
final CompletableFuture<List<TypedMap>> futures = userManager
.raw()
.nativeQuery(...)
...
.getListAsync();
final CompletableFuture<Tuple2<List<TypedMap>, ExecutionInfo>> futures = userManager
.raw()
.nativeQuery(...)
...
.getListAsyncWithStats();
final CompletableFuture<Empty> future = userManager
.raw()
.nativeQuery(...)
...
.executeAsync();
final CompletableFuture<ExecutionInfo> future = userManager
.raw()
.nativeQuery(...)
...
.executeAsyncWithStats();
It is just a marker enum. Sometimes an operation does not return any meaningful result (like delete()
or update()
).
Still we need to be able to ensure that the asynchronous operation has been executed correctly by calling get()
.
If the execution has been successful, the singleton Empty is returned, otherwise an exception is raised
All exceptions that could be raised during the blocking call to get()
will return the following runtime exception
- DriverException
- DriverInternalError
- AchillesLightWeightTransactionException
- AchillesBeanValidationException
- AchillesInvalidTableException
- AchillesStaleObjectStateException
- AchillesException
By default a Thread Pool executor with a LinkedBlockingQueue of size 1000 will be used for all asynchronous operations. The default configuration of this thread pool is given below
new ThreadPoolExecutor(5, 20, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new DefaultExecutorThreadFactory());
The default implementation of the ThreadFactory
is
public class DefaultExecutorThreadFactory implements ThreadFactory {
private static final Logger logger = getLogger("achilles-default-executor");
private final AtomicInteger threadNumber = new AtomicInteger(0);
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("Uncaught asynchronous exception : "+e.getMessage(), e);
}
};
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("achilles-default-executor-" + threadNumber.incrementAndGet());
thread.setDaemon(true);
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
return thread;
}
}
Of course, you call provide your own thread pool or configure the default thread pool with your custom parameters. Please refer to Thread Pool Config
To close properly the thread pool on application removal, Achilles exposes the ManagerFactory.shutDown()
method.
This method is annotated with javax.annotation.PreDestroy
so that in a managed contained, it will be invoked automatically.
Otherwise you can always manually call the shutDown()
method.
-
Bootstraping Achilles at runtime
- Runtime Configuration Parameters
-
Manager
-
Consistency Level
-
Cassandra Options at runtime
-
Lightweight Transaction (LWT)
-
JSON Serialization
-
Interceptors
-
Bean Validation (JSR-303)