Skip to content

Commit

Permalink
HAWQ-1605. Support INSERT in PXF JDBC plugin
Browse files Browse the repository at this point in the history
(closes apache#1353)

Fix incorrect TIMESTAMP handling

PXF JDBC plugin update

* Add support for INSERT queries:
	* The INSERT queries are processed by the same classes as the SELECT queries;
	* INSERTs are processed by the JDBC PreparedStatement;
	* INSERTs support batching (by means of JDBC);

* Minor changes in WhereSQLBuilder and JdbcPartitionFragmenter:
	* Removed 'WHERE 1=1';
	* The same pattern of spaces around operators everywhere ('a = b', not 'a=b');
	* JdbcPartitionFragmenter.buildFragmenterSql() made static to avoid extra checks of InputData (proposed by @sansanichfb);

* Refactoring and some microoptimizations;

PXF JDBC refactoring

* The README.md is completely rewritten;

* Lots of changes in comments and javadoc comments;

* Code refactoring and minor changes in codestyle

Fixes proposed by @sansanichfb

Add DbProduct for Microsoft SQL Server

Notes on consistency in README and errors

* Add an explicit note on consistency of INSERT queries (it is not guaranteed).

* Change error message on INSERT failure

* Minor corrections of README

The fixes were proposed by @sansanichfb

Improve WhereSQLBuilder

* Add support of TIMESTAMP values;

* Add support of operations <>, LIKE, IS NULL, IS NOT NULL.

Fix proposed by @sansanichfb

Throw an exception when trying to open an already open connection when writing to an external database using `openForWrite()`.

Although the behaviour is different in case of `openForRead()`, it does not apply here. The second call to `openForWrite()` could be made from another thread, and that would result in a race: the `PreparedStatement` we use to write to an external database is the same object for all threads, and the procedure `writeNextObject()` is not `synchronized` (or "protected" some other way).

Simplify logging; BatchUpdateException

Simplify logging so that the logs produced by pxf-jdbc do not grow too big in case DEBUG is enabled (the removed logging calls provide the field types and names, and in most cases they are the same as in the data provided. The exceptions are still being logged).

Add processing of BatchUpdateException, so that the real cause of an exception is returned to the user.

PXF JDBC thread pool support

Implement support of multi-threaded processing of INSERT queries, using a thread pool. To use the feature, set the parameter POOL_SIZE in the LOCATION clause of an external table (<1: Pool size is equal to a number of CPUs available to JVM; =1: Disable thread pool; >1: Use the given size of a pool.

Not all operations are processed by pool threads: pool threads only execute() the queries, but they do not fill the PreparedStatement from OneRow.

Redesign connection pooling

* Redesign connection pooling: move OneRow objects processing to threads from the pool. This decreases the load of a single-thread part of PXF;

* Introduce WriterCallable & related. This significantly simplifies the code of JdbcAccessor and allows to introduce new methods of processing INSERT queries with ease and enables fast hardcode tweaks for the same purpose.

* Add docs on thread pool feature

Support long values in PARTITION clause

Support values of Java primitive type 'long' in PARTITION clause (both for RANGE and INTERVAL variables).

* Modify JdbcPartitionFragmenter (convert all int variables to long)
* Move parsing of INTERVAL values for PARTITION_TYPE "INT" to class constructor (and add a parse exception handler)
* Simplify ByteUtil (remove methods to deal with values of type 'int')
* Update JdbcPartitionFragmenterTest
* Minor changes in comments

Fix pxf-profiles-default.xml

Remove ampersand from a description of JDBC profile from pxf-profiles-default.xml

Remove explicit throws of IllegalArgumentException

Remove explicit references to 'IllegalArgumentException', as the caller is probably unable to recover from them.
'IllegalStateException' is left unchanged, as it is thrown when the caller must perform an action that will resolve the problem ('WriterCallable' is full).

Other runtime exceptions are explicitly listed in function definitions as before; their causes are usually known to the caller, so it could do something about them or at least send a more meaningful message about the error cause to the user.

Proposed by Alex Denissov <[email protected]>

Simplify isCallRequired()

Make function 'isCallRequired()' body a one-line expression in all implementations of 'WriterCallable'.

Proposed by Alex Denissov <[email protected]>

Remove rollback and change BATCH_SIZE logic

Remove calls to 'tryRollback()' and all processing of rollbacks in INSERT queries.
The reason for the change is that rollback is effective for only one case: INSERT is performed from one PXF segment that uses one thread to perform that INSERT, and the external database supports transactions. In most cases, there are more than one PXF segment that performs INSERT, and rollback is of no use then.
On the other hand, rollback logic is cumbersome and notably increases code complexity.

Due to the removal of rollback, there is no longer a need to keep BATCH_SIZE infinite as often as possible (when BATCH_SIZE is infinite, the number of scenarious of rollback() failing is lower (but this number is not zero)).
Thus, setting a recommended (https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28754) value makes sense.
The old logic of infinite batch size also remains active.

Modify README.md: minor corrections, new BATCH_SIZE logic

Proposed by Alex Denissov <[email protected]>

Change BATCH_SIZE logic

* Modify BATCH_SIZE parameter processing according to new proposals apache#1353 (comment)
* Update README.md
* Restore fallback to non-batched INSERTs in case the external database (or JDBC connector) does not support batch updates

Proposed by Alex Denissov <[email protected]>
Proposed by Dmitriy Pavlov <[email protected]>

Modify processing of BATCH_SIZE parameter

Modify BATCH_SIZE parameter processing according to the proposal apache#1353 (comment):
* Update allowed values of BATCH_SIZE and their meanings
* Introduce explicit flag of presentness of a BATCH_SIZE parameter
* Introduce DEFAULT_BATCH_SIZE constant in JdbcPlugin
* Move processing of BATCH_SIZE values to JdbcAccessor
* Update README.md

Proposed by @divyabhargov, @denalex

Fix column type for columns converted to TEXT

Modify column type processing so that the column type is set correctly for fields that:
* Are represented as columns of type TEXT by GPDBWritable, but whose actual type is different
* Contain NULL value

Before, the column type code was not set correctly for such columns due to a check of NULL field value.

Proposed and authored by @divyabhargov

removed parseUnsignedInt
  • Loading branch information
Ivan Leskin authored and denalex committed Sep 7, 2018
1 parent a741655 commit 472fa2b
Show file tree
Hide file tree
Showing 22 changed files with 1,984 additions and 746 deletions.
341 changes: 228 additions & 113 deletions pxf/pxf-jdbc/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
package org.apache.hawq.pxf.plugins.jdbc;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.hawq.pxf.api.OneRow;
import org.apache.hawq.pxf.api.ReadAccessor;
import org.apache.hawq.pxf.api.WriteAccessor;
import org.apache.hawq.pxf.api.UserDataException;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallable;
import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallableFactory;

import java.util.List;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import java.io.IOException;
import java.text.ParseException;
import java.sql.Statement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* JDBC tables accessor
*
* The SELECT queries are processed by {@link java.sql.Statement}
*
* The INSERT queries are processed by {@link java.sql.PreparedStatement} and
* built-in JDBC batches of arbitrary size
*/
public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
/**
* Class constructor
*/
public JdbcAccessor(InputData inputData) throws UserDataException {
super(inputData);
}

/**
* openForRead() implementation
* Create query, open JDBC connection, execute query and store the result into resultSet
*
* @throws SQLException if a database access error occurs
* @throws SQLTimeoutException if a problem with the connection occurs
* @throws ParseException if th SQL statement provided in PXF InputData is incorrect
* @throws ClassNotFoundException if the JDBC driver was not found
*/
@Override
public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
if (statementRead != null && !statementRead.isClosed()) {
return true;
}

Connection connection = super.getConnection();

queryRead = buildSelectQuery(connection.getMetaData());
statementRead = connection.createStatement();
resultSetRead = statementRead.executeQuery(queryRead);

return true;
}

/**
* readNextObject() implementation
* Retreive the next tuple from resultSet and return it
*
* @throws SQLException if a problem in resultSet occurs
*/
@Override
public OneRow readNextObject() throws SQLException {
if (resultSetRead.next()) {
return new OneRow(resultSetRead);
}
return null;
}

/**
* closeForRead() implementation
*/
@Override
public void closeForRead() {
JdbcPlugin.closeStatement(statementRead);
}

/**
* openForWrite() implementation
* Create query template and open JDBC connection
*
* @throws SQLException if a database access error occurs
* @throws SQLTimeoutException if a problem with the connection occurs
* @throws ParseException if the SQL statement provided in PXF InputData is incorrect
* @throws ClassNotFoundException if the JDBC driver was not found
*/
@Override
public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException {
if (statementWrite != null && !statementWrite.isClosed()) {
throw new SQLException("The connection to an external database is already open.");
}

Connection connection = super.getConnection();

queryWrite = buildInsertQuery();
statementWrite = super.getPreparedStatement(connection, queryWrite);

// Process batchSize
if (!connection.getMetaData().supportsBatchUpdates()) {
if ((batchSizeIsSetByUser) && (batchSize > 1)) {
throw new SQLException("The external database does not support batch updates");
}
else {
batchSize = 1;
}
}

// Process poolSize
if (poolSize < 1) {
poolSize = Runtime.getRuntime().availableProcessors();
LOG.info(
"The POOL_SIZE is set to the number of CPUs available (" + Integer.toString(poolSize) + ")"
);
}
if (poolSize > 1) {
executorServiceWrite = Executors.newFixedThreadPool(poolSize);
poolTasks = new LinkedList<>();
}

// Setup WriterCallableFactory
writerCallableFactory = new WriterCallableFactory();
writerCallableFactory.setPlugin(this);
writerCallableFactory.setQuery(queryWrite);
writerCallableFactory.setBatchSize(batchSize);
if (poolSize == 1) {
writerCallableFactory.setStatement(statementWrite);
}

writerCallable = writerCallableFactory.get();

return true;
}

/**
* writeNextObject() implementation
*
* If batchSize is not 0 or 1, add a tuple to the batch of statementWrite
* Otherwise, execute an INSERT query immediately
*
* In both cases, a {@link java.sql.PreparedStatement} is used
*
* @throws SQLException if a database access error occurs
* @throws IOException if the data provided by {@link JdbcResolver} is corrupted
* @throws ClassNotFoundException if pooling is used and the JDBC driver was not found
* @throws IllegalStateException if writerCallableFactory was not properly initialized
* @throws Exception if it happens in writerCallable.call()
*/
@Override
public boolean writeNextObject(OneRow row) throws Exception {
if (writerCallable == null) {
throw new IllegalStateException("The JDBC connection was not properly initialized (writerCallable is null)");
}

writerCallable.supply(row);
if (writerCallable.isCallRequired()) {
if (poolSize > 1) {
// Pooling is used. Create new writerCallable
poolTasks.add(executorServiceWrite.submit(writerCallable));
writerCallable = writerCallableFactory.get();
}
else {
// Pooling is not used
writerCallable.call();
}
}

return true;
}

/**
* closeForWrite() implementation
*
* @throws SQLException if a database access error occurs
* @throws Exception if it happens in writerCallable.call() or due to runtime errors in thread pool
*/
@Override
public void closeForWrite() throws Exception {
if ((statementWrite == null) || (writerCallable == null)) {
return;
}

try {
if (poolSize > 1) {
// Process thread pool
Exception firstException = null;
for (Future<SQLException> task : poolTasks) {
// We need this construction to ensure that we try to close all connections opened by pool threads
try {
SQLException currentSqlException = task.get();
if (currentSqlException != null) {
if (firstException == null) {
firstException = currentSqlException;
}
LOG.error(
"A SQLException in a pool thread occured: " + currentSqlException.getClass() + " " + currentSqlException.getMessage()
);
}
}
catch (Exception e) {
// This exception must have been caused by some thread execution error. However, there may be other exception (maybe of class SQLException) that happened in one of threads that were not examined yet. That is why we do not modify firstException
if (LOG.isDebugEnabled()) {
LOG.debug(
"A runtime exception in a thread pool occured: " + e.getClass() + " " + e.getMessage()
);
}
}
}
try {
executorServiceWrite.shutdown();
executorServiceWrite.shutdownNow();
}
catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("executorServiceWrite.shutdown() or .shutdownNow() threw an exception: " + e.getClass() + " " + e.getMessage());
}
}
if (firstException != null) {
throw firstException;
}
}

// Send data that is left
writerCallable.call();
}
finally {
JdbcPlugin.closeStatement(statementWrite);
}
}


/**
* Build SELECT query (with "WHERE" and partition constraints)
*
* @return Complete SQL query
*
* @throws ParseException if the constraints passed in InputData are incorrect
* @throws SQLException if the database metadata is invalid
*/
private String buildSelectQuery(DatabaseMetaData databaseMetaData) throws ParseException, SQLException {
if (databaseMetaData == null) {
throw new IllegalArgumentException("The provided databaseMetaData is null");
}
StringBuilder sb = new StringBuilder();
sb.append("SELECT ");

// Insert columns' names
String columnDivisor = "";
for (ColumnDescriptor column : columns) {
sb.append(columnDivisor);
columnDivisor = ", ";
sb.append(column.columnName());
}

// Insert the table name
sb.append(" FROM ").append(tableName);

// Insert regular WHERE constraints
(new WhereSQLBuilder(inputData)).buildWhereSQL(databaseMetaData.getDatabaseProductName(), sb);

// Insert partition constraints
JdbcPartitionFragmenter.buildFragmenterSql(inputData, databaseMetaData.getDatabaseProductName(), sb);

return sb.toString();
}

/**
* Build INSERT query template (field values are replaced by placeholders '?')
*
* @return SQL query with placeholders instead of actual values
*/
private String buildInsertQuery() {
StringBuilder sb = new StringBuilder();

sb.append("INSERT INTO ");

// Insert the table name
sb.append(tableName);

// Insert columns' names
sb.append("(");
String fieldDivisor = "";
for (ColumnDescriptor column : columns) {
sb.append(fieldDivisor);
fieldDivisor = ", ";
sb.append(column.columnName());
}
sb.append(")");

sb.append(" VALUES ");

// Insert values placeholders
sb.append("(");
fieldDivisor = "";
for (int i = 0; i < columns.size(); i++) {
sb.append(fieldDivisor);
fieldDivisor = ", ";
sb.append("?");
}
sb.append(")");

return sb.toString();
}

// Read variables
private String queryRead = null;
private Statement statementRead = null;
private ResultSet resultSetRead = null;

// Write variables
private String queryWrite = null;
private PreparedStatement statementWrite = null;
private WriterCallableFactory writerCallableFactory = null;
private WriterCallable writerCallable = null;
private ExecutorService executorServiceWrite = null;
private List<Future<SQLException> > poolTasks = null;

// Static variables
private static final Log LOG = LogFactory.getLog(JdbcAccessor.class);
}
Loading

0 comments on commit 472fa2b

Please sign in to comment.