diff --git a/CHANGELOG.md b/CHANGELOG.md
index e3e459e..e1d608b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,11 @@
## Version 7.0
+### Version 7.0.7.2 - 2019-10-03
+
+#### Fixed
+- Multi-head I/O high-availability failover issue when a worker rank dies.
+
### Version 7.0.7.1 - 2019-09-11
#### Added
diff --git a/VERSION b/VERSION
index 9664324..90f0201 100644
--- a/VERSION
+++ b/VERSION
@@ -1,4 +1,4 @@
MAJOR = 7
MINOR = 0
REVISION = 7
-ABI_VERSION = 1
+ABI_VERSION = 2
diff --git a/api/pom.xml b/api/pom.xml
index 7e54dd2..0bbde75 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -3,7 +3,7 @@
4.0.0com.gpudbgpudb-api
- 7.0.7.1
+ 7.0.7.2jarKinetica Java API
@@ -49,7 +49,7 @@
1.71.7UTF-8
- 7.0.7.0
+ 7.0.7.2
diff --git a/api/src/main/java/com/gpudb/BulkInserter.java b/api/src/main/java/com/gpudb/BulkInserter.java
index 1618b93..7546693 100644
--- a/api/src/main/java/com/gpudb/BulkInserter.java
+++ b/api/src/main/java/com/gpudb/BulkInserter.java
@@ -1,5 +1,6 @@
package com.gpudb;
+import com.gpudb.GPUdbBase.GPUdbExitException;
import com.gpudb.protocol.AdminShowShardsRequest;
import com.gpudb.protocol.AdminShowShardsResponse;
import com.gpudb.protocol.InsertRecordsRequest;
@@ -198,6 +199,7 @@ public URL getUrl() {
private long shardVersion;
private MutableLong shardUpdateTime;
private int numClusterSwitches;
+ private URL currentHeadNodeURL;
private com.gpudb.WorkerList workerList;
private List> workerQueues;
private int numRanks;
@@ -304,12 +306,18 @@ private BulkInserter(GPUdb gpudb, String tableName, Type type, TypeObjectMap
this.typeObjectMap = typeObjectMap;
this.workerList = workers;
+ // Initialize the shard version and update time
this.shardVersion = 0;
this.shardUpdateTime = new MutableLong();
+
// Keep track of how many times the db client has switched HA clusters
// in order to decide later if it's time to update the worker queues
this.numClusterSwitches = gpudb.getNumClusterSwitches();
+
+ // Keep track of which cluster we're using (helpful in knowing if an
+ // HA failover has happened)
+ this.currentHeadNodeURL = gpudb.getURL();
// Validate that the table exists
if ( !gpudb.hasTable( tableName, null ).getTableExists() ) {
@@ -400,14 +408,8 @@ private BulkInserter(GPUdb gpudb, String tableName, Type type, TypeObjectMap
// Update the worker queues, if needed
updateWorkerQueues( false );
- // routingTable = gpudb.adminShowShards(new AdminShowShardsRequest()).getRank();
this.numRanks = this.workerQueues.size();
- // for (int i = 0; i < routingTable.size(); i++) {
- // if (routingTable.get(i) > this.workerQueues.size()) {
- // throw new IllegalArgumentException("Too few worker URLs specified.");
- // }
- // }
} else { // use the head node only for insertion
if (gpudb.getURLs().size() == 1) {
this.workerQueues.add(new WorkerQueue(GPUdbBase.appendPathToURL(gpudb.getURL(), "/insert/records"), batchSize, primaryKeyBuilder != null, updateOnExistingPk));
@@ -424,6 +426,66 @@ private BulkInserter(GPUdb gpudb, String tableName, Type type, TypeObjectMap
}
+ /**
+ * Force a high-availability cluster failover over. Check the health of the
+ * cluster (either head node only, or head node and worker ranks, based on
+ * the retriever configuration), and use it if healthy. If no healthy cluster
+ * is found, then throw an error. Otherwise, stop at the first healthy cluster.
+ *
+ * @throws GPUdbException if a successful failover could not be achieved.
+ */
+ private void forceHAFailover() throws GPUdbException {
+ while (true) {
+ // Try to switch to a new cluster
+ try {
+ // this.gpudb.switchURL( this.currentHeadNodeURL );
+ synchronized ( this.currentHeadNodeURL ) {
+ this.gpudb.switchURL( this.currentHeadNodeURL );
+ }
+ } catch (GPUdbBase.GPUdbHAUnavailableException ex ) {
+ // Have tried all clusters; back to square 1
+ throw ex;
+ }
+
+
+ // We did switch to a different cluster; now check the health
+ // of the cluster, starting with the head node
+ if ( !this.gpudb.isKineticaRunning( this.gpudb.getURL() ) ) {
+ continue; // try the next cluster because this head node is down
+ }
+
+ boolean isClusterHealthy = true;
+ if ( this.isMultiHeadEnabled ) {
+ // Obtain the worker rank addresses
+ com.gpudb.WorkerList workerRanks;
+ try {
+ workerRanks = new com.gpudb.WorkerList( this.gpudb,
+ this.workerList.getIpRegex() );
+ } catch (GPUdbException ex) {
+ // Some problem occurred; move to the next cluster
+ continue;
+ }
+
+ // Check the health of all the worker ranks
+ for ( URL workerRank : workerRanks) {
+ if ( !this.gpudb.isKineticaRunning( workerRank ) ) {
+ isClusterHealthy = false;
+ }
+ }
+ }
+
+ if ( isClusterHealthy ) {
+ // Save the healthy cluster's URL as the current head node URL
+ synchronized ( this.currentHeadNodeURL ) {
+ this.currentHeadNodeURL = this.gpudb.getURL();
+ }
+
+ return;
+ }
+ }
+ }
+
+
/**
* Updates the shard mapping based on the latest cluster configuration.
* Also reconstructs the worker queues based on the new sharding.
@@ -484,7 +546,6 @@ private boolean updateWorkerQueues( boolean doReconstructWorkerQueues ) throws G
// Could not update the worker queues because we can't connect
// to the database
return false;
-
} else {
// Unknown errors not handled here
throw ex;
@@ -691,6 +752,13 @@ public void flush() throws InsertException {
@SuppressWarnings("unchecked")
private void flush(List queue, URL url, boolean forcedFlush) throws InsertException {
+ // Flush with a total of retryCount retries
+ flush( queue, url, forcedFlush, retryCount );
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private void flush(List queue, URL url, boolean forcedFlush, int retries) throws InsertException {
if (queue.isEmpty()) {
return;
}
@@ -706,9 +774,9 @@ private void flush(List queue, URL url, boolean forcedFlush) throws InsertExc
InsertRecordsResponse response = new InsertRecordsResponse();
- int retries = retryCount;
+ // int retries = retryCount;
long insertionAttemptTimestamp = 0;
-
+
while (true) {
insertionAttemptTimestamp = new Timestamp( System.currentTimeMillis() ).getTime();
try {
@@ -728,6 +796,67 @@ private void flush(List queue, URL url, boolean forcedFlush) throws InsertExc
}
break; // out of the while loop
+ } catch (GPUdbException ex) {
+ // If some connection issue occurred, we want to force an HA failover
+ if ( (ex instanceof GPUdbExitException)
+ || ex.hadConnectionFailure() ) {
+ // We did encounter an HA failover trigger
+ try {
+ // Switch to a different cluster in the HA ring, if any
+ forceHAFailover();
+ } catch (GPUdbException ex2) {
+ if (retries <= 0) {
+ // We've now tried all the HA clusters and circled back;
+ // propagate the error to the user, but only there
+ // are no more retries left
+ String originalCause = (ex.getCause() == null) ? ex.toString() : ex.getCause().toString();
+ throw new GPUdbException( originalCause
+ + ex2.getMessage(), true );
+ }
+ }
+ }
+
+ // Update the worker queues since we've failed over to a
+ // different cluster
+ boolean updatedWorkerQueues = updateWorkerQueues();
+ synchronized ( this.shardUpdateTime ) {
+ if ( updatedWorkerQueues
+ || ( insertionAttemptTimestamp < this.shardUpdateTime.longValue() ) ) {
+
+
+ // Now that we've switched to a different cluster, re-insert
+ // since no worker queue has these records any more (but the
+ // records may go to a worker queue different from the one
+ // they came from)
+ --retries;
+ try {
+ this.insert( queue );
+
+ // If the user intends a forceful flush, i.e. the public flush()
+ // was invoked, then make sure that the records get flushed
+ if ( forcedFlush ) {
+ this.flush();
+ }
+ break; // out of the while loop
+ } catch (Exception ex2) {
+ // Re-setting the exception since we may re-try again
+ if (retries <= 0) {
+ throw ex2;
+ }
+ }
+ }
+ }
+
+ // If we still have retries left, then we'll go into the next
+ // iteration of the infinite while loop; otherwise, propagate
+ // the exception
+ if (retries > 0) {
+ --retries;
+ } else {
+ // No more retries; propagate exception to user along with the
+ // failed queue of records
+ throw new InsertException( url, queue, ex.getMessage(), ex );
+ }
} catch (Exception ex) {
// Insertion failed, but maybe due to shard mapping changes (due to
// cluster reconfiguration)? Check if the mapping needs to be updated
@@ -759,6 +888,9 @@ private void flush(List queue, URL url, boolean forcedFlush) throws InsertExc
}
}
+ // If we still have retries left, then we'll go into the next
+ // iteration of the infinite while loop; otherwise, propagate
+ // the exception
if (retries > 0) {
--retries;
} else {
@@ -836,8 +968,9 @@ public void insert(T record) throws InsertException {
if (queue != null) {
flush(queue, workerQueue.getUrl(), false);
}
- }
+ } // end insert( single record )
+
/**
* Queues a list of records for insertion into GPUdb. If any queue reaches
* the {@link #getBatchSize batch size}, all records in that queue will be
diff --git a/api/src/main/java/com/gpudb/GPUdbBase.java b/api/src/main/java/com/gpudb/GPUdbBase.java
index 4ed7c2e..6891134 100644
--- a/api/src/main/java/com/gpudb/GPUdbBase.java
+++ b/api/src/main/java/com/gpudb/GPUdbBase.java
@@ -1192,7 +1192,7 @@ private void updateHmUrls() throws GPUdbException {
}
}
}
-
+
/**
* Switches the URL of the HA ring cluster. Check if we've circled back to
@@ -1200,7 +1200,7 @@ private void updateHmUrls() throws GPUdbException {
* indices so that the next time, we pick up HA clusters in a different random
* manner and throw an exception.
*/
- private URL switchURL(URL oldURL) throws GPUdbHAUnavailableException {
+ protected URL switchURL(URL oldURL) throws GPUdbHAUnavailableException {
synchronized (urlLock) {
// If there is only one URL, then we can't switch URLs
@@ -1938,15 +1938,33 @@ public T submitRequestToHM( String endpoint,
// There's an error in creating the URL
throw new GPUdbRuntimeException(ex.getMessage(), ex);
} catch (GPUdbExitException ex) {
- // Handle our special exit exception
try {
- hmUrl = switchHmURL( originalURL );
- } catch (GPUdbHAUnavailableException ha_ex) {
- // We've now tried all the HA clusters and circled back
- // Get the original cause to propagate to the user
- String originalCause = (ex.getCause() == null) ? ex.toString() : ex.getCause().toString();
- throw new GPUdbException( originalCause
- + ha_ex.getMessage(), true );
+ if ( this.updateHostManagerPort() ) {
+ // Get the updated URL
+ hmUrl = getHmURL();
+ } else {
+ // Upon failure, try to use other clusters
+ try {
+ hmUrl = switchHmURL( originalURL );
+ } catch (GPUdbHAUnavailableException ha_ex) {
+ // We've now tried all the HA clusters and circled back
+ // Get the original cause to propagate to the user
+ String originalCause = (ex.getCause() == null) ? ex.toString() : ex.getCause().toString();
+ throw new GPUdbException( originalCause
+ + ha_ex.getMessage(), true );
+ }
+ }
+ } catch (Exception ex2) {
+ // Upon any error, try to use other clusters
+ try {
+ hmUrl = switchHmURL( originalURL );
+ } catch (GPUdbHAUnavailableException ha_ex) {
+ // We've now tried all the HA clusters and circled back
+ // Get the original cause to propagate to the user
+ String originalCause = (ex.getCause() == null) ? ex.toString() : ex.getCause().toString();
+ throw new GPUdbException( originalCause
+ + ha_ex.getMessage(), true );
+ }
}
} catch (SubmitException ex) {
// Some error occurred during the HTTP request;
@@ -2074,8 +2092,13 @@ public T submitRequestRaw(URL url, IndexedRecord reque
connection.setRequestProperty( HEADER_CONTENT_TYPE, "application/x-snappy" );
connection.setFixedLengthStreamingMode(requestSize);
- try (OutputStream outputStream = connection.getOutputStream()) {
- outputStream.write(encodedRequest);
+ try {
+ try (OutputStream outputStream = connection.getOutputStream()) {
+ outputStream.write(encodedRequest);
+ }
+ } catch (IOException ex) {
+ // Trigger an HA failover at the caller level
+ throw new GPUdbExitException( ex.toString() );
}
} else {
byte[] encodedRequest = Avro.encode(request).array();
@@ -2083,8 +2106,16 @@ public T submitRequestRaw(URL url, IndexedRecord reque
connection.setRequestProperty( HEADER_CONTENT_TYPE, "application/octet-stream" );
connection.setFixedLengthStreamingMode(requestSize);
- try (OutputStream outputStream = connection.getOutputStream()) {
- outputStream.write(encodedRequest);
+ try {
+ // Trying with the OutputStream resource so that it can be
+ // automatically closed (without a finally clause) if something
+ // breaks
+ try (OutputStream outputStream = connection.getOutputStream()) {
+ outputStream.write(encodedRequest);
+ }
+ } catch (IOException ex) {
+ // Trigger an HA failover at the caller level
+ throw new GPUdbExitException( ex.toString() );
}
/*
@@ -2112,7 +2143,7 @@ public T submitRequestRaw(URL url, IndexedRecord reque
String responseMsg = connection.getResponseMessage();
String errorMsg;
- if (response_code == 401) {
+ if (response_code == HttpURLConnection.HTTP_UNAUTHORIZED) {
errorMsg = ("Unauthorized access: '"
+ responseMsg + "'");
} else {
@@ -2124,18 +2155,19 @@ public T submitRequestRaw(URL url, IndexedRecord reque
// Parse response based on error code
InputStream inputStream;
- if (response_code == 401) {
+ if (response_code == HttpURLConnection.HTTP_UNAUTHORIZED) {
+ // Got sttaus 401 -- unauthorized
throw new SubmitException( url, request, requestSize,
connection.getResponseMessage());
- }
- else if (response_code < 400) {
+ } else if (response_code < 400) {
inputStream = connection.getInputStream();
} else {
inputStream = connection.getErrorStream();
}
if (inputStream == null) {
- throw new IOException("Server returned HTTP " + connection.getResponseCode() + " (" + connection.getResponseMessage() + ").");
+ // Trigger an HA failover at the caller level
+ throw new GPUdbExitException("Server returned HTTP " + connection.getResponseCode() + " (" + connection.getResponseMessage() + "). returning EXIT exception");
}
try {
@@ -2148,7 +2180,10 @@ else if (response_code < 400) {
if (status.equals("ERROR")) {
// Check if Kinetica is shutting down
- if ( message.contains( DB_EXITING_ERROR_MESSAGE )
+ if ( (response_code == HttpURLConnection.HTTP_UNAVAILABLE)
+ || (response_code == HttpURLConnection.HTTP_INTERNAL_ERROR)
+ || (response_code == HttpURLConnection.HTTP_GATEWAY_TIMEOUT)
+ || message.contains( DB_EXITING_ERROR_MESSAGE )
|| message.contains( DB_CONNECTION_REFUSED_ERROR_MESSAGE )
|| message.contains( DB_CONNECTION_RESET_ERROR_MESSAGE )
|| message.contains( DB_SYSTEM_LIMITED_ERROR_MESSAGE ) ) {
@@ -2260,4 +2295,59 @@ public void ping() throws GPUdbException {
}
}
}
+
+
+ /**
+ * Verifies that GPUdb is running at the given URL (does not do any HA failover).
+ *
+ * @returns true if Kinetica is running, false otherwise.
+ */
+ public boolean isKineticaRunning(URL url) {
+
+ HttpURLConnection connection = null;
+
+ try {
+ connection = initializeHttpConnection( url );
+
+ // Ping is a get, unlike all endpoints which are post
+ connection.setRequestMethod("GET");
+
+ byte[] buffer = new byte[1024];
+ int index = 0;
+
+ try (InputStream inputStream = connection.getResponseCode() < 400 ? connection.getInputStream() : connection.getErrorStream()) {
+ if (inputStream == null) {
+ throw new IOException("Server returned HTTP " + connection.getResponseCode() + " (" + connection.getResponseMessage() + ").");
+ }
+
+ int count;
+
+ while ((count = inputStream.read(buffer, index, buffer.length - index)) > -1) {
+ index += count;
+
+ if (index == buffer.length) {
+ buffer = Arrays.copyOf(buffer, buffer.length * 2);
+ }
+ }
+ }
+
+ String response = new String(Arrays.copyOf(buffer, index));
+
+ if (!response.equals("Kinetica is running!")) {
+ return false;
+ }
+
+ return true;
+ } catch (Exception ex) {
+ return false;
+ } finally {
+ if (connection != null) {
+ try {
+ connection.disconnect();
+ } catch (Exception ex) {
+ }
+ }
+ }
+ } // ping URL
+
}
diff --git a/api/src/main/java/com/gpudb/RecordRetriever.java b/api/src/main/java/com/gpudb/RecordRetriever.java
index 2c1099b..c0c3b28 100644
--- a/api/src/main/java/com/gpudb/RecordRetriever.java
+++ b/api/src/main/java/com/gpudb/RecordRetriever.java
@@ -1,5 +1,6 @@
package com.gpudb;
+import com.gpudb.GPUdbBase.GPUdbExitException;
import com.gpudb.protocol.AdminShowShardsRequest;
import com.gpudb.protocol.AdminShowShardsResponse;
import com.gpudb.protocol.GetRecordsRequest;
@@ -33,6 +34,7 @@ public class RecordRetriever {
private long shardVersion;
private MutableLong shardUpdateTime;
private int numClusterSwitches;
+ private URL currentHeadNodeURL;
private com.gpudb.WorkerList workerList;
private List routingTable;
private List workerUrls;
@@ -220,6 +222,10 @@ private RecordRetriever( GPUdb gpudb,
// in order to decide later if it's time to update the worker queues
this.numClusterSwitches = gpudb.getNumClusterSwitches();
+ // Keep track of which cluster we're using (helpful in knowing if an
+ // HA failover has happened)
+ this.currentHeadNodeURL = gpudb.getURL();
+
RecordKeyBuilder shardKeyBuilderTemp;
if (typeObjectMap == null) {
@@ -281,6 +287,64 @@ private RecordRetriever( GPUdb gpudb,
}
+ /**
+ * Force a high-availability cluster failover over. Check the health of the
+ * cluster (either head node only, or head node and worker ranks, based on
+ * the retriever configuration), and use it if healthy. If no healthy cluster
+ * is found, then throw an error. Otherwise, stop at the first healthy cluster.
+ *
+ * @throws GPUdbException if a successful failover could not be achieved.
+ */
+ private void forceHAFailover() throws GPUdbException {
+ while (true) {
+ // Try to switch to a new cluster
+ try {
+ // this.gpudb.switchURL( this.currentHeadNodeURL );
+ synchronized ( this.currentHeadNodeURL ) {
+ this.gpudb.switchURL( this.currentHeadNodeURL );
+ }
+ } catch (GPUdbBase.GPUdbHAUnavailableException ex ) {
+ // Have tried all clusters; back to square 1
+ throw ex;
+ }
+
+
+ // We did switch to a different cluster; now check the health
+ // of the cluster, starting with the head node
+ if ( !this.gpudb.isKineticaRunning( this.gpudb.getURL() ) ) {
+ continue; // try the next cluster because this head node is down
+ }
+
+ boolean isClusterHealthy = true;
+ if ( this.isMultiHeadEnabled ) {
+ // Obtain the worker rank addresses
+ com.gpudb.WorkerList workerRanks;
+ try {
+ workerRanks = new com.gpudb.WorkerList( this.gpudb,
+ this.workerList.getIpRegex() );
+ } catch (GPUdbException ex) {
+ // Some problem occurred; move to the next cluster
+ continue;
+ }
+
+ // Check the health of all the worker ranks
+ for ( URL workerRank : workerRanks) {
+ if ( !this.gpudb.isKineticaRunning( workerRank ) ) {
+ isClusterHealthy = false;
+ }
+ }
+ }
+
+ if ( isClusterHealthy ) {
+ // Save the healthy cluster's URL as the current head node URL
+ synchronized ( this.currentHeadNodeURL ) {
+ this.currentHeadNodeURL = this.gpudb.getURL();
+ }
+ return;
+ }
+ }
+ }
+
/**
* Updates the shard mapping based on the latest cluster configuration.
@@ -337,14 +401,9 @@ private boolean updateWorkerQueues( boolean doReconstructWorkerURLs ) throws GPU
// Couldn't get the current shard assignment info; see if this is due
// to cluster failure
if ( ex.hadConnectionFailure() ) {
- // Check if the db client has failed over to a different HA ring node
- int _numClusterSwitches = this.gpudb.getNumClusterSwitches();
- if ( this.numClusterSwitches == _numClusterSwitches ) {
- return false; // nothing to do; some other problem occurred
- }
-
- // Update the HA ring node switch counter
- this.numClusterSwitches = _numClusterSwitches;
+ // Could not update the worker queues because we can't connect
+ // to the database
+ return false;
} else {
// Unknown errors not handled here
throw ex;
@@ -551,6 +610,41 @@ public GetRecordsResponse getByKey(List