Skip to content

Commit

Permalink
Merge pull request #159 from NitorCreations/archiving2
Browse files Browse the repository at this point in the history
Implement archiving tables
  • Loading branch information
Edvard Fonsell committed Nov 6, 2015
2 parents 3a1dad2 + 4a11221 commit 7969922
Show file tree
Hide file tree
Showing 38 changed files with 1,895 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.nitorcreations.nflow.engine.internal.dao;

import static com.nitorcreations.nflow.engine.internal.dao.DaoUtil.toTimestamp;
import static com.nitorcreations.nflow.engine.internal.dao.DaoUtil.ColumnNamesExtractor.columnNamesExtractor;
import static org.apache.commons.lang3.StringUtils.join;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

import javax.inject.Inject;
import javax.inject.Named;

import org.joda.time.DateTime;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.transaction.annotation.Transactional;

import com.nitorcreations.nflow.engine.internal.config.NFlow;

@Named
public class ArchiveDao {
private JdbcTemplate jdbc;
private TableMetadataChecker tableMetadataChecker;

@Inject
public void setJdbcTemplate(@NFlow JdbcTemplate jdbcTemplate) {
this.jdbc = jdbcTemplate;
}

@Inject
public void setTableMetadataChecker(TableMetadataChecker tableMetadataChecker) {
this.tableMetadataChecker = tableMetadataChecker;
}

public void ensureValidArchiveTablesExist() {
tableMetadataChecker.ensureCopyingPossible("nflow_workflow", "nflow_archive_workflow");
tableMetadataChecker.ensureCopyingPossible("nflow_workflow_action", "nflow_archive_workflow_action");
tableMetadataChecker.ensureCopyingPossible("nflow_workflow_state", "nflow_archive_workflow_state");
}

public List<Integer> listArchivableWorkflows(DateTime before, int maxRows) {
return jdbc.query(
"select w.id id from nflow_workflow w, " +
"(" +
" select parent.id from nflow_workflow parent " +
" where parent.next_activation is null and parent.modified <= ? " +
" and parent.root_workflow_id is null " +
" and not exists(" +
" select 1 from nflow_workflow child where child.root_workflow_id = parent.id " +
" and (child.modified > ? or child.next_activation is not null)" +
" )" +
" order by modified asc " +
" limit " + maxRows +
") as archivable_parent " +
"where archivable_parent.id = w.id or archivable_parent.id = w.root_workflow_id",
new ArchivableWorkflowsRowMapper(), toTimestamp(before), toTimestamp(before));
}

@Transactional
public int archiveWorkflows(List<Integer> workflowIds) {
String workflowIdParams = params(workflowIds);

int archivedWorkflows = archiveWorkflowTable(workflowIdParams);
archiveActionTable(workflowIdParams);
archiveStateTable(workflowIdParams);
deleteWorkflows(workflowIdParams);
return archivedWorkflows;
}

private int archiveWorkflowTable(String workflowIdParams) {
String columns = columnsFromMetadata("nflow_workflow");
return jdbc.update("insert into nflow_archive_workflow(" + columns + ") " +
"select " + columns + " from nflow_workflow where id in " + workflowIdParams);
}

private void archiveActionTable(String workflowIdParams) {
String columns = columnsFromMetadata("nflow_workflow_action");
jdbc.update("insert into nflow_archive_workflow_action(" + columns + ") " +
"select " + columns + " from nflow_workflow_action where workflow_id in " + workflowIdParams);
}

private void archiveStateTable(String workflowIdParams) {
String columns = columnsFromMetadata("nflow_workflow_state");
jdbc.update("insert into nflow_archive_workflow_state (" + columns + ") " +
"select " + columns + " from nflow_workflow_state where workflow_id in " + workflowIdParams);
}

private void deleteWorkflows(String workflowIdParams) {
jdbc.update("delete from nflow_workflow_state where workflow_id in " + workflowIdParams);
jdbc.update("update nflow_workflow set root_workflow_id=null, parent_workflow_id=null, parent_action_id=null " +
"where id in " + workflowIdParams + " and (root_workflow_id is not null or parent_workflow_id is not null)");
jdbc.update("delete from nflow_workflow_action where workflow_id in " + workflowIdParams);
jdbc.update("delete from nflow_workflow where id in " + workflowIdParams);
}

private String columnsFromMetadata(String tableName) {
List<String> columnNames = jdbc.query("select * from " + tableName + " where 1 = 0", columnNamesExtractor);
return join(columnNames, ",");
}

private String params(List<Integer> workflowIds) {
return "(" + join(workflowIds, ",") + ")";
}

static class ArchivableWorkflowsRowMapper implements RowMapper<Integer> {
@Override
public Integer mapRow(ResultSet rs, int rowNum) throws SQLException {
return rs.getInt("id");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.nitorcreations.nflow.engine.internal.dao;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;

import org.joda.time.DateTime;
import org.springframework.dao.DataAccessException;
Expand Down Expand Up @@ -35,4 +38,22 @@ public static Integer getInt(ResultSet rs, String columnLabel) throws SQLExcepti
int value = rs.getInt(columnLabel);
return rs.wasNull() ? null : value;
}

public static final class ColumnNamesExtractor implements ResultSetExtractor<List<String>> {
static final ColumnNamesExtractor columnNamesExtractor = new ColumnNamesExtractor();

private ColumnNamesExtractor() {
}

@Override
public List<String> extractData(ResultSet rs) throws SQLException, DataAccessException {
ResultSetMetaData metadata = rs.getMetaData();
int columnCount = metadata.getColumnCount();
List<String> columnNames = new ArrayList<>(columnCount);
for (int col = 1; col <= columnCount; col++) {
columnNames.add(metadata.getColumnName(col));
}
return columnNames;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.nitorcreations.nflow.engine.internal.dao;

import static java.lang.String.format;
import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import javax.inject.Inject;
import javax.inject.Named;

import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.ResultSetExtractor;

import com.nitorcreations.nflow.engine.internal.config.NFlow;

@Named
public class TableMetadataChecker {
private JdbcTemplate jdbc;

public void ensureCopyingPossible(String sourceTable, String destinationTable) {
Map<String, ColumnMetadata> sourceMetadataMap = getMetadata(sourceTable);
Map<String, ColumnMetadata> destMetadataMap = getMetadata(destinationTable);
if (destMetadataMap.size() < sourceMetadataMap.size()) {
throw new IllegalArgumentException(format("Source table %s has more columns than destination table %s", sourceTable,
destinationTable));
}
if (!destMetadataMap.keySet().containsAll(sourceMetadataMap.keySet())) {
Set<String> missingColumns = new LinkedHashSet<>(sourceMetadataMap.keySet());
missingColumns.removeAll(destMetadataMap.keySet());
throw new IllegalArgumentException(format("Destination table %s is missing columns %s that are present in source table %s",
destinationTable, missingColumns, sourceTable));
}
for (Entry<String, ColumnMetadata> entry : sourceMetadataMap.entrySet()) {
ColumnMetadata sourceMetadata = entry.getValue();
ColumnMetadata destMetadata = destMetadataMap.get(entry.getKey());
if (!sourceMetadata.typeName.equals(destMetadata.typeName)) {
throw new IllegalArgumentException(format(
"Source column %s.%s has type %s and destination column %s.%s has mismatching type %s", sourceTable,
sourceMetadata.columnName, sourceMetadata.typeName, destinationTable, destMetadata.columnName, destMetadata.typeName));
}
if (sourceMetadata.size > destMetadata.size) {
throw new IllegalArgumentException(format("Source column %s.%s has size %s and destination column %s.%s smaller size %s",
sourceTable, sourceMetadata.columnName, sourceMetadata.size, destinationTable, destMetadata.columnName,
destMetadata.size));
}
}
}

private Map<String, ColumnMetadata> getMetadata(String tableName) {
return jdbc.query("select * from " + tableName + " where 1 = 0", new MetadataExtractor());
}

static class MetadataExtractor implements ResultSetExtractor<Map<String, ColumnMetadata>> {
private final Map<String, String> typeAliases = typeAliases();

@Override
public Map<String, ColumnMetadata> extractData(ResultSet rs) throws SQLException, DataAccessException {
ResultSetMetaData metadata = rs.getMetaData();
Map<String, ColumnMetadata> metadataMap = new LinkedHashMap<>();
for (int col = 1; col <= metadata.getColumnCount(); col++) {
String columnName = metadata.getColumnName(col);
String typeName = metadata.getColumnTypeName(col);
int size = metadata.getColumnDisplaySize(col);
metadataMap.put(columnName, new ColumnMetadata(columnName, resolveTypeAlias(typeName), size));
}
return metadataMap;
}

private String resolveTypeAlias(String type) {
String resolvedType = typeAliases.get(type);
if (resolvedType != null) {
return resolvedType;
}
return type;
}

private Map<String, String> typeAliases() {
Map<String, String> map = new LinkedHashMap<>();
map.put("serial", "int4");
return map;
}
}

private static class ColumnMetadata {
public final String columnName;
public final String typeName;
public final int size;

public ColumnMetadata(String columnName, String typeName, int size) {
this.columnName = columnName;
this.typeName = typeName;
this.size = size;
}

@Override
public String toString() {
return ReflectionToStringBuilder.toString(this, SHORT_PREFIX_STYLE);
}
}

@Inject
public void setJdbcTemplate(@NFlow JdbcTemplate jdbcTemplate) {
this.jdbc = jdbcTemplate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ public int insertWorkflowInstance(WorkflowInstance instance) {
private int insertWorkflowInstanceWithCte(WorkflowInstance instance) {
StringBuilder sqlb = new StringBuilder(256);
sqlb.append("with wf as (" + insertWorkflowInstanceSql() + " returning id)");
Object[] instanceValues = new Object[] { instance.type, instance.parentWorkflowId, instance.parentActionId,
instance.businessKey, instance.externalId, executorInfo.getExecutorGroup(), instance.status.name(), instance.state,
abbreviate(instance.stateText, instanceStateTextLength), toTimestamp(instance.nextActivation) };
Object[] instanceValues = new Object[] { instance.type, instance.rootWorkflowId, instance.parentWorkflowId,
instance.parentActionId, instance.businessKey, instance.externalId, executorInfo.getExecutorGroup(),
instance.status.name(), instance.state, abbreviate(instance.stateText, instanceStateTextLength),
toTimestamp(instance.nextActivation) };
int pos = instanceValues.length;
Object[] args = Arrays.copyOf(instanceValues, pos + instance.stateVariables.size() * 2);
for (Entry<String, String> var : instance.stateVariables.entrySet()) {
Expand All @@ -153,8 +154,8 @@ private int insertWorkflowInstanceWithCte(WorkflowInstance instance) {
}

String insertWorkflowInstanceSql() {
return "insert into nflow_workflow(type, parent_workflow_id, parent_action_id, business_key, external_id, "
+ "executor_group, status, state, state_text, next_activation) values (?, ?, ?, ?, ?, ?, " + sqlVariants.workflowStatus()
return "insert into nflow_workflow(type, root_workflow_id, parent_workflow_id, parent_action_id, business_key, external_id, "
+ "executor_group, status, state, state_text, next_activation) values (?, ?, ?, ?, ?, ?, ?, " + sqlVariants.workflowStatus()
+ ", ?, ?, ?)";
}

Expand All @@ -175,6 +176,7 @@ public PreparedStatement createPreparedStatement(Connection connection) throws S
int p = 1;
ps = connection.prepareStatement(insertWorkflowInstanceSql(), new String[] { "id" });
ps.setString(p++, instance.type);
ps.setObject(p++, instance.rootWorkflowId);
ps.setObject(p++, instance.parentWorkflowId);
ps.setObject(p++, instance.parentActionId);
ps.setString(p++, instance.businessKey);
Expand Down Expand Up @@ -216,20 +218,20 @@ void insertVariables(final int id, final int actionId, Map<String, String> state
}
final Iterator<Entry<String, String>> variables = changedStateVariables.entrySet().iterator();
int[] updateStatus = jdbc.batchUpdate(insertWorkflowInstanceStateSql() + " values (?,?,?,?)",
new AbstractInterruptibleBatchPreparedStatementSetter() {
@Override
protected boolean setValuesIfAvailable(PreparedStatement ps, int i) throws SQLException {
if (!variables.hasNext()) {
return false;
}
Entry<String, String> var = variables.next();
ps.setInt(1, id);
ps.setInt(2, actionId);
ps.setString(3, var.getKey());
ps.setString(4, var.getValue());
return true;
}
});
new AbstractInterruptibleBatchPreparedStatementSetter() {
@Override
protected boolean setValuesIfAvailable(PreparedStatement ps, int i) throws SQLException {
if (!variables.hasNext()) {
return false;
}
Entry<String, String> var = variables.next();
ps.setInt(1, id);
ps.setInt(2, actionId);
ps.setString(3, var.getKey());
ps.setString(4, var.getValue());
return true;
}
});
int updatedRows = 0;
boolean unknownResults = false;
for (int i = 0; i < updateStatus.length; ++i) {
Expand Down Expand Up @@ -276,8 +278,9 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
updateWorkflowInstance(instance);
int parentActionId = insertWorkflowInstanceAction(instance, action);
for (WorkflowInstance childTemplate : childWorkflows) {
WorkflowInstance childWorkflow = new WorkflowInstance.Builder(childTemplate).setParentWorkflowId(instance.id)
.setParentActionId(parentActionId).build();
Integer rootWorkflowId = instance.rootWorkflowId == null ? instance.id : instance.rootWorkflowId;
WorkflowInstance childWorkflow = new WorkflowInstance.Builder(childTemplate).setRootWorkflowId(rootWorkflowId)
.setParentWorkflowId(instance.id).setParentActionId(parentActionId).build();
insertWorkflowInstance(childWorkflow);
}
}
Expand Down Expand Up @@ -639,6 +642,7 @@ public WorkflowInstance mapRow(ResultSet rs, int rowNum) throws SQLException {
return new WorkflowInstance.Builder()
.setId(rs.getInt("id"))
.setExecutorId(getInt(rs, "executor_id"))
.setRootWorkflowId(getInt(rs, "root_workflow_id"))
.setParentWorkflowId(getInt(rs, "parent_workflow_id"))
.setParentActionId(getInt(rs, "parent_action_id"))
.setStatus(WorkflowInstanceStatus.valueOf(rs.getString("status")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.zaxxer.hikari.HikariDataSource;

public abstract class DatabaseConfiguration {
private static final String NFLOW_DATABASE_INITIALIZER = "nflowDatabaseInitializer";
public static final String NFLOW_DATABASE_INITIALIZER = "nflowDatabaseInitializer";
private static final Logger logger = getLogger(DatabaseConfiguration.class);
private final String dbType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Objects;

import org.h2.tools.TriggerAdapter;

public class H2ModifiedColumnTrigger extends TriggerAdapter {
@Override
public void fire(Connection conn, ResultSet oldRow, ResultSet newRow) throws SQLException {
newRow.updateTimestamp("modified", new Timestamp(currentTimeMillis()));
Timestamp oldModified = oldRow.getTimestamp("modified");
Timestamp newModified = newRow.getTimestamp("modified");
if (Objects.equals(oldModified, newModified)) {
newRow.updateTimestamp("modified", new Timestamp(currentTimeMillis()));
}
}
}
}
Loading

0 comments on commit 7969922

Please sign in to comment.