Skip to content

Commit

Permalink
Merge pull request #155 from NitorCreations/oracle-support
Browse files Browse the repository at this point in the history
initial oracle ddl and configuration
  • Loading branch information
jsyrjala committed Oct 15, 2015
2 parents 6317568 + 9056c4f commit dd210a4
Show file tree
Hide file tree
Showing 12 changed files with 316 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ public Integer extractData(ResultSet rs) throws SQLException, DataAccessExceptio
return rs.getMetaData().getColumnDisplaySize(1);
}
}

public static Integer getInt(ResultSet rs, String columnLabel) throws SQLException {
int value = rs.getInt(columnLabel);
return rs.wasNull() ? null : value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nitorcreations.nflow.engine.internal.config.NFlow;
import com.nitorcreations.nflow.engine.internal.storage.db.SQLVariants;
import com.nitorcreations.nflow.engine.internal.workflow.StoredWorkflowDefinition;
import com.nitorcreations.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import com.nitorcreations.nflow.engine.workflow.definition.WorkflowState;
Expand All @@ -42,6 +43,7 @@ public class WorkflowDefinitionDao {
private ExecutorDao executorInfo;
private NamedParameterJdbcTemplate namedJdbc;
private ObjectMapper nflowObjectMapper;
private SQLVariants sqlVariants;

@Inject
public void setExecutorDao(ExecutorDao executorDao) {
Expand All @@ -58,13 +60,18 @@ public void setObjectMapper(@NFlow ObjectMapper nflowObjectMapper) {
this.nflowObjectMapper = nflowObjectMapper;
}

@Inject
public void setSqlVariants(SQLVariants sqlVariants) {
this.sqlVariants = sqlVariants;
}

public void storeWorkflowDefinition(AbstractWorkflowDefinition<? extends WorkflowState> definition) {
StoredWorkflowDefinition storedDefinition = convert(definition);
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("type", definition.getType());
String serializedDefinition = serializeDefinition(storedDefinition);
params.addValue("definition_sha1", sha1(serializedDefinition));
params.addValue("definition", serializedDefinition);
params.addValue("definition", serializedDefinition, sqlVariants.longTextType());
params.addValue("modified_by", executorInfo.getExecutorId());
params.addValue("executor_group", executorInfo.getExecutorGroup());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.nitorcreations.nflow.engine.internal.dao;

import static com.nitorcreations.nflow.engine.internal.dao.DaoUtil.firstColumnLengthExtractor;
import static com.nitorcreations.nflow.engine.internal.dao.DaoUtil.getInt;
import static com.nitorcreations.nflow.engine.internal.dao.DaoUtil.toDateTime;
import static com.nitorcreations.nflow.engine.internal.dao.DaoUtil.toTimestamp;
import static com.nitorcreations.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created;
Expand All @@ -18,6 +19,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -229,10 +231,18 @@ protected boolean setValuesIfAvailable(PreparedStatement ps, int i) throws SQLEx
}
});
int updatedRows = 0;
boolean unknownResults = false;
for (int i = 0; i < updateStatus.length; ++i) {
if (updateStatus[i] == Statement.SUCCESS_NO_INFO) {
unknownResults = true;
continue;
}
if (updateStatus[i] == Statement.EXECUTE_FAILED) {
throw new IllegalStateException("Failed to insert/update state variables");
}
updatedRows += updateStatus[i];
}
if (updatedRows != changedStateVariables.size()) {
if (!unknownResults && updatedRows != changedStateVariables.size()) {
throw new IllegalStateException("Failed to insert/update state variables, expected update count "
+ changedStateVariables.size() + ", actual " + updatedRows);
}
Expand Down Expand Up @@ -413,22 +423,25 @@ String updateInstanceForExecutionQuery() {
+ sqlVariants.workflowStatus(executing) + ", " + "external_next_activation = null";
}

String whereConditionForInstanceUpdate(int batchSize) {
String whereConditionForInstanceUpdate() {
return "where executor_id is null and status in (" + sqlVariants.workflowStatus(created) + ", "
+ sqlVariants.workflowStatus(inProgress) + ") and next_activation < current_timestamp and "
+ executorInfo.getExecutorGroupCondition() + " order by next_activation asc limit " + batchSize;
+ executorInfo.getExecutorGroupCondition() + " order by next_activation asc";
}

private List<Integer> pollNextWorkflowInstanceIdsWithUpdateReturning(int batchSize) {
return jdbc.queryForList(updateInstanceForExecutionQuery() + " where id in (select id from nflow_workflow "
+ whereConditionForInstanceUpdate(batchSize) + ") and executor_id is null returning id", Integer.class);
String sql = updateInstanceForExecutionQuery() + " where id in ("
+ sqlVariants.limit("select id from nflow_workflow " + whereConditionForInstanceUpdate(), Integer.toString(batchSize))
+ ") and executor_id is null returning id";
return jdbc.queryForList(sql, Integer.class);
}

private List<Integer> pollNextWorkflowInstanceIdsWithTransaction(final int batchSize) {
return transaction.execute(new TransactionCallback<List<Integer>>() {
@Override
public List<Integer> doInTransaction(TransactionStatus transactionStatus) {
String sql = "select id, modified from nflow_workflow " + whereConditionForInstanceUpdate(batchSize);
String sql = sqlVariants.limit("select id, modified from nflow_workflow " + whereConditionForInstanceUpdate(),
Integer.toString(batchSize));
List<OptimisticLockKey> instances = jdbc.query(sql, new RowMapper<OptimisticLockKey>() {
@Override
public OptimisticLockKey mapRow(ResultSet rs, int rowNum) throws SQLException {
Expand All @@ -455,7 +468,7 @@ public OptimisticLockKey mapRow(ResultSet rs, int rowNum) throws SQLException {
}
continue;
}
if (status != 1) {
if (status != 1 && status != Statement.SUCCESS_NO_INFO) {
throw new PollingRaceConditionException("Race condition in polling workflow instances detected. "
+ "Multiple pollers using same name (" + executorInfo.getExecutorGroup() + ")");
}
Expand Down Expand Up @@ -527,7 +540,7 @@ public List<WorkflowInstance> queryWorkflowInstances(QueryWorkflowInstances quer
if (!isEmpty(conditions)) {
sql += " where " + collectionToDelimitedString(conditions, " and ");
}
sql += " limit :limit";
sql = sqlVariants.limit(sql, ":limit");
params.addValue("limit", getMaxResults(query.maxResults));
List<WorkflowInstance> ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper());
for (WorkflowInstance instance : ret) {
Expand Down Expand Up @@ -623,12 +636,11 @@ public String getWorkflowInstanceState(int workflowInstanceId) {
static class WorkflowInstanceRowMapper implements RowMapper<WorkflowInstance> {
@Override
public WorkflowInstance mapRow(ResultSet rs, int rowNum) throws SQLException {
Integer executorId = (Integer) rs.getObject("executor_id");
return new WorkflowInstance.Builder()
.setId(rs.getInt("id"))
.setExecutorId(executorId)
.setParentWorkflowId((Integer) rs.getObject("parent_workflow_id"))
.setParentActionId((Integer) rs.getObject("parent_action_id"))
.setExecutorId(getInt(rs, "executor_id"))
.setParentWorkflowId(getInt(rs, "parent_workflow_id"))
.setParentActionId(getInt(rs, "parent_action_id"))
.setStatus(WorkflowInstanceStatus.valueOf(rs.getString("status")))
.setType(rs.getString("type"))
.setBusinessKey(rs.getString("business_key"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.apache.commons.lang3.StringUtils.isBlank;

import java.sql.SQLException;
import java.sql.Types;

import org.h2.tools.Server;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -85,5 +86,15 @@ public String actionType() {
public String castToText() {
return "";
}

@Override
public String limit(String query, String limit) {
return query + " limit " + limit;
}

@Override
public int longTextType() {
return Types.VARCHAR;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.sql.Types;

import javax.sql.DataSource;

Expand Down Expand Up @@ -102,5 +103,15 @@ public String actionType() {
public String castToText() {
return "";
}

@Override
public String limit(String query, String limit) {
return query + " limit " + limit;
}

@Override
public int longTextType() {
return Types.VARCHAR;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.nitorcreations.nflow.engine.internal.storage.db;

import java.sql.Types;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

import com.nitorcreations.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;

@Profile("nflow.db.oracle")
@Configuration
public class OracleDatabaseConfiguration extends DatabaseConfiguration {

public OracleDatabaseConfiguration() {
super("oracle");
}

@Bean
public SQLVariants sqlVariants() {
return new OracleSqlVariants();
}

public static class OracleSqlVariants implements SQLVariants {

@Override
public String currentTimePlusSeconds(int seconds) {
return "current_timestamp + interval '" + seconds + "' second";
}

@Override
public boolean hasUpdateReturning() {
return false;
}

@Override
public boolean hasUpdateableCTE() {
return false;
}

@Override
public String nextActivationUpdate() {
return "(case "
+ "when ? is null then null "
+ "when external_next_activation is null then ? "
+ "else least(?, external_next_activation) end)";
}

@Override
public String workflowStatus(WorkflowInstanceStatus status) {
return "'" + status.name() + "'";
}

@Override
public String workflowStatus() {
return "?";
}

@Override
public String actionType() {
return "?";
}

@Override
public String castToText() {
return "";
}

@Override
public String limit(String query, String limit) {
return "select * from (" + query + ") where rownum <= " + limit;
}

@Override
public int longTextType() {
return Types.CLOB;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.nitorcreations.nflow.engine.internal.storage.db;

import java.sql.Types;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
Expand Down Expand Up @@ -62,5 +64,15 @@ public String actionType() {
public String castToText() {
return "::text";
}

@Override
public String limit(String query, String limit) {
return query + " limit " + limit;
}

@Override
public int longTextType() {
return Types.VARCHAR;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ public interface SQLVariants {
String nextActivationUpdate();

String castToText();

String limit(String query, String limit);

int longTextType();
}
3 changes: 3 additions & 0 deletions nflow-engine/src/main/resources/nflow-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ nflow.db.mysql.url=jdbc:mysql://localhost/nflow
nflow.db.postgresql.driver=org.postgresql.ds.PGSimpleDataSource
nflow.db.postgresql.url=jdbc:postgresql://localhost/nflow

nflow.db.oracle.driver=oracle.jdbc.pool.OracleDataSource
nflow.db.oracle.url=jdbc:oracle:thin:@//localhost:1521/XE

nflow.db.max_pool_size=4
nflow.db.idle_timeout_seconds=600
nflow.db.create_on_startup=true
Expand Down
Loading

0 comments on commit dd210a4

Please sign in to comment.