Skip to content

Commit

Permalink
Merge pull request #27 from arenadata/feature-pxf-6.x/ADBDEV-3096
Browse files Browse the repository at this point in the history
PXF-6.x/ADBDEV-3096: PXF JDBC: Allow set Oracle parallel instructions
  • Loading branch information
iamlapa authored Oct 11, 2022
2 parents e3a90f4 + 208e91c commit d649732
Show file tree
Hide file tree
Showing 8 changed files with 392 additions and 1 deletion.
1 change: 1 addition & 0 deletions server/pxf-jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {

testImplementation("org.apache.parquet:parquet-pig")
testImplementation('org.springframework.boot:spring-boot-starter-test')
testImplementation('org.mockito:mockito-inline')
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import org.greenplum.pxf.plugins.jdbc.utils.oracle.OracleJdbcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,7 +79,7 @@ public String wrapTimestamp(Object val) {

@Override
public String buildSessionQuery(String key, String value) {
return String.format("ALTER SESSION SET %s = %s", key, value);
return OracleJdbcUtils.buildSessionQuery(key, value);
}
},

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.greenplum.pxf.plugins.jdbc.utils.oracle;

public class OracleJdbcUtils {
private static final OracleSessionQueryFactory sessionQueryFactory = new OracleSessionQueryFactory();

public static String buildSessionQuery(String property, String value) {
return sessionQueryFactory.create(property, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.greenplum.pxf.plugins.jdbc.utils.oracle;

public class OracleParallelSessionParam {
private Clause clause;
private StatementType statementType;
private String degreeOfParallelism;

public enum Clause {
ENABLE,
DISABLE,
FORCE
}

public enum StatementType {
DML,
DDL,
QUERY
}

public void setClause(Clause clause) {
this.clause = clause;
}

public void setStatementType(StatementType statementType) {
this.statementType = statementType;
}

public void setDegreeOfParallelism(String degreeOfParallelism) {
this.degreeOfParallelism = degreeOfParallelism;
}

public Clause getClause() {
return clause;
}

public StatementType getStatementType() {
return statementType;
}

public String getDegreeOfParallelism() {
return degreeOfParallelism;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.greenplum.pxf.plugins.jdbc.utils.oracle;

import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.util.Strings;

import java.util.HashMap;

public class OracleParallelSessionParamFactory {
public OracleParallelSessionParam create(String property, String value, String delimiter) {
String[] values = splitValue(property, value, delimiter);

HashMap<String, String> map = getParallelSessionParam(values);
String clause = map.get("clause").toUpperCase();
String statementType = map.get("statement_type").toUpperCase();
String degreeOfParallelism = map.get("degree_of_parallelism");

OracleParallelSessionParam param = new OracleParallelSessionParam();
param.setClause(getClause(clause, property));
param.setStatementType(getStatementType(statementType, property));
param.setDegreeOfParallelism(getDegreeOfParallelism(degreeOfParallelism, property));
return param;
}

private String[] splitValue(String property, String value, String delimiter) {
validateValue(property, value);
String[] values = value.split(delimiter);
if (values.length < 2 || values.length > 3) {
throw new IllegalArgumentException(String.format(
"The parameter '%s' in jdbc-site.xml has to contain at least 2 but not more then 3 values delimited by %s",
property, delimiter)
);
}
return values;
}

private void validateValue(String property, String value) {
if (StringUtils.isBlank(value)) {
throw new IllegalArgumentException(String.format("The parameter '%s' is blank in jdbc-site.xml", property));
}
}

private HashMap<String, String> getParallelSessionParam(String[] values) {
HashMap<String, String> params = new HashMap<>();
params.put("clause", values[0]);
params.put("statement_type", values[1]);
if (values.length == 3 && Strings.isNotBlank(values[2])) {
params.put("degree_of_parallelism", values[2]);
}
return params;
}

private OracleParallelSessionParam.Clause getClause(String clause, String property) {
try {
return OracleParallelSessionParam.Clause.valueOf(clause);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(String.format(
"The 'clause' value '%s' in the parameter '%s' is not valid", clause, property)
);
}
}

private OracleParallelSessionParam.StatementType getStatementType(String statementType, String property) {
try {
return OracleParallelSessionParam.StatementType.valueOf(statementType);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(String.format(
"The 'statement type' value '%s' in the parameter '%s' is not valid", statementType, property)
);
}
}

private String getDegreeOfParallelism(String degreeOfParallelism, String property) {
if (degreeOfParallelism == null) {
return Strings.EMPTY;
}
try {
Integer.parseInt(degreeOfParallelism);
return degreeOfParallelism;
} catch (NumberFormatException nfe) {
throw new IllegalArgumentException(String.format(
"The 'degree of parallelism' value '%s' in the parameter '%s' is not valid", degreeOfParallelism, property)
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.greenplum.pxf.plugins.jdbc.utils.oracle;

import org.apache.logging.log4j.util.Strings;

public class OracleSessionQueryFactory {
private static final String ORACLE_JDBC_SESSION_PARALLEL_PROPERTY_PREFIX = "alter_session_parallel";
private static final String ORACLE_JDBC_SESSION_PARALLEL_PROPERTY_DELIMITER = "\\.";
private final OracleParallelSessionParamFactory oracleSessionParamFactory = new OracleParallelSessionParamFactory();

public String create(String property, String value) {
if (property.contains(ORACLE_JDBC_SESSION_PARALLEL_PROPERTY_PREFIX)) {
return getParallelSessionCommand(property, value);
}
return String.format("ALTER SESSION SET %s = %s", property, value);
}

private String getParallelSessionCommand(String property, String value) {
OracleParallelSessionParam param = oracleSessionParamFactory.create(property,
value, ORACLE_JDBC_SESSION_PARALLEL_PROPERTY_DELIMITER);
return createParallelSessionCommand(param);
}

private String createParallelSessionCommand(OracleParallelSessionParam param) {
if (Strings.isNotEmpty(param.getDegreeOfParallelism())
&& param.getClause() == OracleParallelSessionParam.Clause.FORCE) {
return String.format("ALTER SESSION %s PARALLEL %s PARALLEL %s",
param.getClause(), param.getStatementType(), param.getDegreeOfParallelism());
} else {
return String.format("ALTER SESSION %s PARALLEL %s", param.getClause(), param.getStatementType());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package org.greenplum.pxf.plugins.jdbc.utils.oracle;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

class OracleParallelSessionParamFactoryTest {

private final OracleParallelSessionParamFactory oracleParallelSessionParamFactory = new OracleParallelSessionParamFactory();
private final String property = "jdbc.session.property.alter_session_parallel.1";
private final String delimiter = "\\.";

@Test
void createWithClauseAndStatementAndDegreeOfParallelismSuccess() {
String value = "force.query.5";
OracleParallelSessionParam param = oracleParallelSessionParamFactory.create(property, value, delimiter);
assertEquals(param.getClause(), OracleParallelSessionParam.Clause.FORCE);
assertEquals(param.getStatementType(), OracleParallelSessionParam.StatementType.QUERY);
assertEquals(param.getDegreeOfParallelism(), "5");
}

@Test
void createWithClauseAndStatementSuccess() {
String value = "enable.ddl";
OracleParallelSessionParam param = oracleParallelSessionParamFactory.create(property, value, delimiter);
assertEquals(param.getClause(), OracleParallelSessionParam.Clause.ENABLE);
assertEquals(param.getStatementType(), OracleParallelSessionParam.StatementType.DDL);
assertEquals(param.getDegreeOfParallelism(), "");
}

@Test
void createWithClauseAndStatementAndBlankDegreeOfParallelismSuccess() {
String value = "disable.dml. ";
String property = "jdbc.session.property.alter_session_parallel.1";
OracleParallelSessionParam param = oracleParallelSessionParamFactory.create(property, value, delimiter);
assertEquals(param.getClause(), OracleParallelSessionParam.Clause.DISABLE);
assertEquals(param.getStatementType(), OracleParallelSessionParam.StatementType.DML);
assertEquals(param.getDegreeOfParallelism(), "");
}

@Test
void createWithEmptyValue() {
String value = "enable.dml.";
String property = "jdbc.session.property.alter_session_parallel.1";
OracleParallelSessionParam param = oracleParallelSessionParamFactory.create(property, value, delimiter);
assertEquals(param.getClause(), OracleParallelSessionParam.Clause.ENABLE);
assertEquals(param.getStatementType(), OracleParallelSessionParam.StatementType.DML);
assertEquals(param.getDegreeOfParallelism(), "");
}

@Test
void createWithWrongClause() {
String value = "fake_force.query.5";
Exception exception = assertThrows(IllegalArgumentException.class,
() -> oracleParallelSessionParamFactory.create(property, value, delimiter));
String expectedMessage = "The 'clause' value 'FAKE_FORCE' in the parameter 'jdbc.session.property.alter_session_parallel.1' is not valid";
String actualMessage = exception.getMessage();
assertEquals(actualMessage, expectedMessage);
}

@Test
void createWithWrongStatement() {
String value = "enable.fake_statement";
Exception exception = assertThrows(IllegalArgumentException.class,
() -> oracleParallelSessionParamFactory.create(property, value, delimiter));
String expectedMessage = "The 'statement type' value 'FAKE_STATEMENT' in the parameter 'jdbc.session.property.alter_session_parallel.1' is not valid";
String actualMessage = exception.getMessage();
assertEquals(expectedMessage, actualMessage);
}

@Test
void createWithWrongDegreeOfParallelism() {
String value = "force.dml.fake_number";
Exception exception = assertThrows(IllegalArgumentException.class,
() -> oracleParallelSessionParamFactory.create(property, value, delimiter));
String expectedMessage = "The 'degree of parallelism' value 'fake_number' in the parameter 'jdbc.session.property.alter_session_parallel.1' is not valid";
String actualMessage = exception.getMessage();
assertEquals(expectedMessage, actualMessage);
}

@Test
void createWithWrongValueMoreThen3() {
String value = "force.dml.number.70";
Exception exception = assertThrows(IllegalArgumentException.class,
() -> oracleParallelSessionParamFactory.create(property, value, delimiter));
String expectedMessage = "The parameter 'jdbc.session.property.alter_session_parallel.1' in jdbc-site.xml has to contain at least 2 but not more then 3 values delimited by \\.";
String actualMessage = exception.getMessage();
assertEquals(expectedMessage, actualMessage);
}

@Test
void createWithWrongValueLessThen2() {
String value = "force";
Exception exception = assertThrows(IllegalArgumentException.class,
() -> oracleParallelSessionParamFactory.create(property, value, delimiter));
String expectedMessage = "The parameter 'jdbc.session.property.alter_session_parallel.1' in jdbc-site.xml has to contain at least 2 but not more then 3 values delimited by \\.";
String actualMessage = exception.getMessage();
assertEquals(expectedMessage, actualMessage);
}

@Test
void createWithBlankValue() {
String value = " ";
Exception exception = assertThrows(IllegalArgumentException.class,
() -> oracleParallelSessionParamFactory.create(property, value, delimiter));
String expectedMessage = "The parameter 'jdbc.session.property.alter_session_parallel.1' is blank in jdbc-site.xml";
String actualMessage = exception.getMessage();
assertEquals(expectedMessage, actualMessage);
}
}
Loading

0 comments on commit d649732

Please sign in to comment.