Skip to content

Commit

Permalink
Fixed issue with multi-head I/O HA failover when a worker rank dies
Browse files Browse the repository at this point in the history
  • Loading branch information
mmahmud committed Oct 3, 2019
1 parent 68e5fbf commit d23643d
Show file tree
Hide file tree
Showing 6 changed files with 365 additions and 42 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Version 7.0

### Version 7.0.7.2 - 2019-10-03

#### Fixed
- Multi-head I/O high-availability failover issue when a worker rank dies.

### Version 7.0.7.1 - 2019-09-11

#### Added
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
MAJOR = 7
MINOR = 0
REVISION = 7
ABI_VERSION = 1
ABI_VERSION = 2
4 changes: 2 additions & 2 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.gpudb</groupId>
<artifactId>gpudb-api</artifactId>
<version>7.0.7.1</version>
<version>7.0.7.2</version>
<packaging>jar</packaging>
<name>Kinetica Java API</name>
<distributionManagement>
Expand Down Expand Up @@ -49,7 +49,7 @@
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<gpudb-api.version>7.0.7.0</gpudb-api.version>
<gpudb-api.version>7.0.7.2</gpudb-api.version>
</properties>
<profiles>
<profile>
Expand Down
153 changes: 143 additions & 10 deletions api/src/main/java/com/gpudb/BulkInserter.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.gpudb;

import com.gpudb.GPUdbBase.GPUdbExitException;
import com.gpudb.protocol.AdminShowShardsRequest;
import com.gpudb.protocol.AdminShowShardsResponse;
import com.gpudb.protocol.InsertRecordsRequest;
Expand Down Expand Up @@ -198,6 +199,7 @@ public URL getUrl() {
private long shardVersion;
private MutableLong shardUpdateTime;
private int numClusterSwitches;
private URL currentHeadNodeURL;
private com.gpudb.WorkerList workerList;
private List<WorkerQueue<T>> workerQueues;
private int numRanks;
Expand Down Expand Up @@ -304,12 +306,18 @@ private BulkInserter(GPUdb gpudb, String tableName, Type type, TypeObjectMap<T>
this.typeObjectMap = typeObjectMap;
this.workerList = workers;

// Initialize the shard version and update time
this.shardVersion = 0;
this.shardUpdateTime = new MutableLong();


// Keep track of how many times the db client has switched HA clusters
// in order to decide later if it's time to update the worker queues
this.numClusterSwitches = gpudb.getNumClusterSwitches();

// Keep track of which cluster we're using (helpful in knowing if an
// HA failover has happened)
this.currentHeadNodeURL = gpudb.getURL();

// Validate that the table exists
if ( !gpudb.hasTable( tableName, null ).getTableExists() ) {
Expand Down Expand Up @@ -400,14 +408,8 @@ private BulkInserter(GPUdb gpudb, String tableName, Type type, TypeObjectMap<T>

// Update the worker queues, if needed
updateWorkerQueues( false );
// routingTable = gpudb.adminShowShards(new AdminShowShardsRequest()).getRank();

this.numRanks = this.workerQueues.size();
// for (int i = 0; i < routingTable.size(); i++) {
// if (routingTable.get(i) > this.workerQueues.size()) {
// throw new IllegalArgumentException("Too few worker URLs specified.");
// }
// }
} else { // use the head node only for insertion
if (gpudb.getURLs().size() == 1) {
this.workerQueues.add(new WorkerQueue<T>(GPUdbBase.appendPathToURL(gpudb.getURL(), "/insert/records"), batchSize, primaryKeyBuilder != null, updateOnExistingPk));
Expand All @@ -424,6 +426,66 @@ private BulkInserter(GPUdb gpudb, String tableName, Type type, TypeObjectMap<T>
}


/**
* Force a high-availability cluster failover over. Check the health of the
* cluster (either head node only, or head node and worker ranks, based on
* the retriever configuration), and use it if healthy. If no healthy cluster
* is found, then throw an error. Otherwise, stop at the first healthy cluster.
*
* @throws GPUdbException if a successful failover could not be achieved.
*/
private void forceHAFailover() throws GPUdbException {
while (true) {
// Try to switch to a new cluster
try {
// this.gpudb.switchURL( this.currentHeadNodeURL );
synchronized ( this.currentHeadNodeURL ) {
this.gpudb.switchURL( this.currentHeadNodeURL );
}
} catch (GPUdbBase.GPUdbHAUnavailableException ex ) {
// Have tried all clusters; back to square 1
throw ex;
}


// We did switch to a different cluster; now check the health
// of the cluster, starting with the head node
if ( !this.gpudb.isKineticaRunning( this.gpudb.getURL() ) ) {
continue; // try the next cluster because this head node is down
}

boolean isClusterHealthy = true;
if ( this.isMultiHeadEnabled ) {
// Obtain the worker rank addresses
com.gpudb.WorkerList workerRanks;
try {
workerRanks = new com.gpudb.WorkerList( this.gpudb,
this.workerList.getIpRegex() );
} catch (GPUdbException ex) {
// Some problem occurred; move to the next cluster
continue;
}

// Check the health of all the worker ranks
for ( URL workerRank : workerRanks) {
if ( !this.gpudb.isKineticaRunning( workerRank ) ) {
isClusterHealthy = false;
}
}
}

if ( isClusterHealthy ) {
// Save the healthy cluster's URL as the current head node URL
synchronized ( this.currentHeadNodeURL ) {
this.currentHeadNodeURL = this.gpudb.getURL();
}

return;
}
}
}


/**
* Updates the shard mapping based on the latest cluster configuration.
* Also reconstructs the worker queues based on the new sharding.
Expand Down Expand Up @@ -484,7 +546,6 @@ private boolean updateWorkerQueues( boolean doReconstructWorkerQueues ) throws G
// Could not update the worker queues because we can't connect
// to the database
return false;

} else {
// Unknown errors not handled here
throw ex;
Expand Down Expand Up @@ -691,6 +752,13 @@ public void flush() throws InsertException {

@SuppressWarnings("unchecked")
private void flush(List<T> queue, URL url, boolean forcedFlush) throws InsertException {
// Flush with a total of retryCount retries
flush( queue, url, forcedFlush, retryCount );
}


@SuppressWarnings("unchecked")
private void flush(List<T> queue, URL url, boolean forcedFlush, int retries) throws InsertException {
if (queue.isEmpty()) {
return;
}
Expand All @@ -706,9 +774,9 @@ private void flush(List<T> queue, URL url, boolean forcedFlush) throws InsertExc

InsertRecordsResponse response = new InsertRecordsResponse();

int retries = retryCount;
// int retries = retryCount;
long insertionAttemptTimestamp = 0;

while (true) {
insertionAttemptTimestamp = new Timestamp( System.currentTimeMillis() ).getTime();
try {
Expand All @@ -728,6 +796,67 @@ private void flush(List<T> queue, URL url, boolean forcedFlush) throws InsertExc
}

break; // out of the while loop
} catch (GPUdbException ex) {
// If some connection issue occurred, we want to force an HA failover
if ( (ex instanceof GPUdbExitException)
|| ex.hadConnectionFailure() ) {
// We did encounter an HA failover trigger
try {
// Switch to a different cluster in the HA ring, if any
forceHAFailover();
} catch (GPUdbException ex2) {
if (retries <= 0) {
// We've now tried all the HA clusters and circled back;
// propagate the error to the user, but only there
// are no more retries left
String originalCause = (ex.getCause() == null) ? ex.toString() : ex.getCause().toString();
throw new GPUdbException( originalCause
+ ex2.getMessage(), true );
}
}
}

// Update the worker queues since we've failed over to a
// different cluster
boolean updatedWorkerQueues = updateWorkerQueues();
synchronized ( this.shardUpdateTime ) {
if ( updatedWorkerQueues
|| ( insertionAttemptTimestamp < this.shardUpdateTime.longValue() ) ) {


// Now that we've switched to a different cluster, re-insert
// since no worker queue has these records any more (but the
// records may go to a worker queue different from the one
// they came from)
--retries;
try {
this.insert( queue );

// If the user intends a forceful flush, i.e. the public flush()
// was invoked, then make sure that the records get flushed
if ( forcedFlush ) {
this.flush();
}
break; // out of the while loop
} catch (Exception ex2) {
// Re-setting the exception since we may re-try again
if (retries <= 0) {
throw ex2;
}
}
}
}

// If we still have retries left, then we'll go into the next
// iteration of the infinite while loop; otherwise, propagate
// the exception
if (retries > 0) {
--retries;
} else {
// No more retries; propagate exception to user along with the
// failed queue of records
throw new InsertException( url, queue, ex.getMessage(), ex );
}
} catch (Exception ex) {
// Insertion failed, but maybe due to shard mapping changes (due to
// cluster reconfiguration)? Check if the mapping needs to be updated
Expand Down Expand Up @@ -759,6 +888,9 @@ private void flush(List<T> queue, URL url, boolean forcedFlush) throws InsertExc
}
}

// If we still have retries left, then we'll go into the next
// iteration of the infinite while loop; otherwise, propagate
// the exception
if (retries > 0) {
--retries;
} else {
Expand Down Expand Up @@ -836,8 +968,9 @@ public void insert(T record) throws InsertException {
if (queue != null) {
flush(queue, workerQueue.getUrl(), false);
}
}
} // end insert( single record )


/**
* Queues a list of records for insertion into GPUdb. If any queue reaches
* the {@link #getBatchSize batch size}, all records in that queue will be
Expand Down
Loading

0 comments on commit d23643d

Please sign in to comment.