From a6cb6fa5c6be2cbff3963f1c8bd9546658e57463 Mon Sep 17 00:00:00 2001 From: Alex Zorkin Date: Tue, 17 Dec 2024 17:31:21 -0800 Subject: [PATCH] feat: add legacy_id --- etl/nifi_scripts/compliance_report.groovy | 6 +- etl/nifi_scripts/transactions.groovy | 99 ----------------------- 2 files changed, 4 insertions(+), 101 deletions(-) delete mode 100644 etl/nifi_scripts/transactions.groovy diff --git a/etl/nifi_scripts/compliance_report.groovy b/etl/nifi_scripts/compliance_report.groovy index 6bef3f29c..f9345daa3 100644 --- a/etl/nifi_scripts/compliance_report.groovy +++ b/etl/nifi_scripts/compliance_report.groovy @@ -796,8 +796,9 @@ def prepareStatements(Connection conn) { create_user, create_date, update_user, - update_date - ) VALUES (?, ?, ?, NULL, ?, ?, ?::SupplementalInitiatorType, ?::ReportingFrequency, ?, ?, ?, ?, ?, ?) + update_date, + legacy_id + ) VALUES (?, ?, ?, NULL, ?, ?, ?::SupplementalInitiatorType, ?::ReportingFrequency, ?, ?, ?, ?, ?, ?, ?) RETURNING compliance_report_id """ @@ -867,6 +868,7 @@ def insertComplianceReport(PreparedStatement stmt, Map report, String groupUuid, stmt.setTimestamp(11, report.create_timestamp ?: Timestamp.valueOf("1970-01-01 00:00:00")) stmt.setString(12, updateUser) stmt.setTimestamp(13, report.update_timestamp ?: report.create_timestamp ?: Timestamp.valueOf("1970-01-01 00:00:00")) + stmt.setInt(14, report.compliance_report_id) def rs = null try { diff --git a/etl/nifi_scripts/transactions.groovy b/etl/nifi_scripts/transactions.groovy deleted file mode 100644 index 9cb4c40f4..000000000 --- a/etl/nifi_scripts/transactions.groovy +++ /dev/null @@ -1,99 +0,0 @@ -import groovy.json.JsonSlurper -import java.sql.Connection -import java.sql.PreparedStatement -import java.sql.ResultSet - -def SOURCE_QUERY = """ - SELECT - ct.id AS transaction_id, - ct.initiator_id AS initiator_id, - ct.respondent_id AS respondent_id, - ct.date_of_written_agreement AS agreement_date, - ct.trade_effective_date AS transaction_effective_date, - ct.number_of_credits AS quantity, - ct.create_user_id AS create_user, - ct.create_timestamp AS create_date, - ct.update_user_id AS update_user, - ct.update_timestamp AS update_date, - ctt.the_type AS transaction_type, - ct.fair_market_value_per_credit AS price_per_unit - FROM - credit_trade ct - JOIN credit_trade_type ctt ON ct.type_id = ctt.id - JOIN credit_trade_status cts ON ct.status_id = cts.id - WHERE - ctt.the_type IN ('Credit Validation', 'Part 3 Award', 'Credit Reduction', 'Administrative Adjustment') - AND cts.status = 'Approved'; -""" - -// Fetch connections to both the source and destination databases -def sourceDbcpService = context.controllerServiceLookup.getControllerService('3245b078-0192-1000-ffff-ffffba20c1eb') -def destinationDbcpService = context.controllerServiceLookup.getControllerService('3244bf63-0192-1000-ffff-ffffc8ec6d93') - -Connection sourceConn = null -Connection destinationConn = null - -try { - sourceConn = sourceDbcpService.getConnection() - destinationConn = destinationDbcpService.getConnection() - destinationConn.setAutoCommit(false) - - def transactionStmt = destinationConn.prepareStatement(''' - INSERT INTO transaction ( - compliance_units, organization_id, transaction_action, effective_status - ) VALUES (?, ?, ?::transaction_action_enum, TRUE) - RETURNING transaction_id - ''') - - PreparedStatement sourceStmt = sourceConn.prepareStatement(SOURCE_QUERY) - ResultSet resultSet = sourceStmt.executeQuery() - - int recordCount = 0 - - while (resultSet.next()) { - recordCount++ - def transactionType = resultSet.getString('transaction_type') - def organizationId = resultSet.getInt('respondent_id') - def quantity = resultSet.getInt('quantity') - def action = 'Adjustment' - - // Adjust quantity for 'Credit Reduction' transactions - if (transactionType == 'Credit Reduction') { - quantity = -quantity - } - - if (organizationId > 0 && quantity != null) { - insertTransaction(transactionStmt, organizationId, quantity, action) - } else { - log.warn("Skipping transaction_id ${resultSet.getInt('transaction_id')}: Missing required data.") - } - } - - resultSet.close() - destinationConn.commit() - log.debug("Processed ${recordCount} records successfully.") -} catch (Exception e) { - log.error('Error occurred while processing data', e) - destinationConn?.rollback() - throw e // Rethrow the exception to allow NiFi to handle retries or failure routing -} finally { - // Close resources in reverse order of their creation - if (transactionStmt != null) transactionStmt.close() - if (resultSet != null) resultSet.close() - if (sourceStmt != null) sourceStmt.close() - if (sourceConn != null) sourceConn.close() - if (destinationConn != null) destinationConn.close() -} - -def insertTransaction(PreparedStatement stmt, int orgId, int quantity, String action) { - stmt.setInt(1, quantity) - stmt.setInt(2, orgId) - stmt.setString(3, action) - - def result = stmt.executeQuery() - if (result.next()) { - def transactionId = result.getInt('transaction_id') - log.debug("Inserted transaction_id ${transactionId} for organization_id ${orgId}") - } - result.close() -}