diff --git a/etl/database/nifi-registry-primary.mv.db b/etl/database/nifi-registry-primary.mv.db index c02fd000c..7d5887b3c 100644 Binary files a/etl/database/nifi-registry-primary.mv.db and b/etl/database/nifi-registry-primary.mv.db differ diff --git a/etl/nifi/conf/flow.json.gz b/etl/nifi/conf/flow.json.gz index 8f1178300..f27e12b1d 100644 Binary files a/etl/nifi/conf/flow.json.gz and b/etl/nifi/conf/flow.json.gz differ diff --git a/etl/nifi/conf/flow.xml.gz b/etl/nifi/conf/flow.xml.gz index 021f4e383..03c72fb35 100644 Binary files a/etl/nifi/conf/flow.xml.gz and b/etl/nifi/conf/flow.xml.gz differ diff --git a/etl/nifi_scripts/compliance_report.groovy b/etl/nifi_scripts/compliance_report.groovy index f4572ac18..6bef3f29c 100644 --- a/etl/nifi_scripts/compliance_report.groovy +++ b/etl/nifi_scripts/compliance_report.groovy @@ -20,6 +20,8 @@ Key Features: 6. Ensures sequence integrity for compliance_report_id. 7. Facilitates efficient testing by allowing deletion of inserted records based on display_name. 8. Adds handling for 'in reserve' transactions based on snapshot data. +9. After all existing compliance reports are processed, queries for unassociated credit trades of type 3 or 4 + and creates new transactions for them. */ // ========================================= @@ -417,10 +419,87 @@ try { rs.close() consolidatedStmt.close() + // Commit all changes from the original compliance report processing + destinationConn.commit() + + // ========================================= + // Process Unassociated Credit Trades + // ========================================= + // This section identifies credit trades of type 3 or 4 (Credit Validation or Credit Reduction) + // that are not currently associated with any compliance_report. Instead of creating new + // compliance reports, it only creates corresponding transactions. (to be reviewed at a later date) + // + // For each unassociated credit trade: + // 1. Determines if it's a credit validation or a credit reduction. + // 2. Creates a transaction (positive units for validations, negative units for reductions). + // 3. Does not associate the transaction with any compliance report, leaving them as standalone transactions. + // + // This ensures that all known credit trades have corresponding transactions, + // even if they do not currently relate to a compliance report. + + log.warn("Processing Unassociated Credit Trades of Type 3 or 4 to Create Transactions Only") + + // Query for unassociated credit trades of type 3 or 4 and approved status + // These trades exist in the credit_trade table but are not linked to a compliance_report. + def UNASSOCIATED_CREDIT_TRADES_QUERY = """ + SELECT ct.id AS credit_trade_id, + ct.respondent_id AS organization_id, + ct.compliance_period_id AS period_id, + ct.type_id AS credit_trade_type_id, + ct.trade_effective_date, + ct.number_of_credits, + ct.status_id, + ct.create_user_id AS ct_create_user_id, + ct.create_timestamp AS ct_create_timestamp + FROM credit_trade ct + LEFT JOIN compliance_report cr ON cr.credit_transaction_id = ct.id + WHERE ct.type_id IN (3,4) + AND ct.status_id = 7 + AND cr.id IS NULL; + """ + + PreparedStatement unassociatedTradesStmt = sourceConn.prepareStatement(UNASSOCIATED_CREDIT_TRADES_QUERY) + ResultSet unassocRs = unassociatedTradesStmt.executeQuery() + + while (unassocRs.next()) { + def credit_trade_id = unassocRs.getInt("credit_trade_id") + def organization_id = unassocRs.getInt("organization_id") + def credit_trade_type_id = unassocRs.getInt("credit_trade_type_id") + def trade_effective_date = unassocRs.getTimestamp("trade_effective_date") + def number_of_credits = unassocRs.getInt("number_of_credits") + def ct_create_user_id = unassocRs.getInt("ct_create_user_id") + def ct_create_timestamp = unassocRs.getTimestamp("ct_create_timestamp") + + def createUser = userProfiles[ct_create_user_id] ?: "imported_user" - // Commit all changes + // Determine transaction direction based on trade type + // type_id = 3 (Credit Validation) => positive credits + // type_id = 4 (Credit Reduction) => negative credits + def adjustedCredits = (credit_trade_type_id == 4) ? -number_of_credits : number_of_credits + + // Create a standalone transaction for this credit trade + def transactionId = insertTransactionForReport( + statements.insertTransactionStmt, + organization_id, + adjustedCredits, + "Adjustment", + createUser, + ct_create_timestamp, + trade_effective_date + ) + totalTransactionsInserted++ + + log.warn("Created standalone transaction ${transactionId} for credit_trade_id ${credit_trade_id} (no associated compliance report)") + } + + unassocRs.close() + unassociatedTradesStmt.close() + + // Commit changes for the newly created transactions destinationConn.commit() + log.warn("Inserted ${totalInserted} compliance reports (from earlier processing), ${totalHistoryInserted} history records, and ${totalTransactionsInserted} transactions into LCFS, including standalone transactions for unassociated credit trades.") + // Re-enable and refresh the materialized views stmt = destinationConn.createStatement() stmt.execute(""" diff --git a/etl/nifi_scripts/credit_validation.groovy b/etl/nifi_scripts/credit_validation.groovy new file mode 100644 index 000000000..31289e22f --- /dev/null +++ b/etl/nifi_scripts/credit_validation.groovy @@ -0,0 +1,143 @@ +import java.sql.Connection +import java.sql.PreparedStatement +import java.sql.ResultSet + +/* +This NiFi Groovy script: +1. Fetches all credit trades from the TFRS database with status_id = 7. +2. Sums the credit balances for each organization based on credit_trade_type logic: + - Sell (type_id = 1): initiator loses credits, respondent gains credits + - Buy (type_id = 2): initiator gains credits, respondent loses credits + - Credit Validation (type_id = 3): respondent_id gains credits + - Credit Reduction (type_id = 4): respondent_id loses credits + - Part 3 Award (type_id = 5): respondent_id gains credits + (Ignore type_id = 6 for now as instructed) + +3. Fetches LCFS transactions and sums them per organization. +4. Prints out organizations where TFRS balance differs from LCFS balance, along with the discrepancy. +5. Logs how many organizations have discrepancies and how many have matching balances. +*/ + +log.warn("******* STARTING CREDIT VALIDATION *******") + +// NiFi DBCP Controller Services +def sourceDbcpService = context.controllerServiceLookup.getControllerService('3245b078-0192-1000-ffff-ffffba20c1eb') +def destinationDbcpService = context.controllerServiceLookup.getControllerService('3244bf63-0192-1000-ffff-ffffc8ec6d93') + +Connection sourceConn = null +Connection destinationConn = null + +try { + + sourceConn = sourceDbcpService.getConnection() + destinationConn = destinationDbcpService.getConnection() + + // Query TFRS for all credit trades with status_id = 7 + def TFRS_QUERY = """ + SELECT ct.id AS credit_trade_id, + ct.type_id, + ct.initiator_id, + ct.respondent_id, + ct.number_of_credits + FROM credit_trade ct + WHERE ct.status_id = 7 + """ + + PreparedStatement tfrsStmt = sourceConn.prepareStatement(TFRS_QUERY) + ResultSet tfrsRs = tfrsStmt.executeQuery() + + // Map for TFRS balances: org_id -> credits + def tfrsBalances = [:].withDefault {0} + + while (tfrsRs.next()) { + def typeId = tfrsRs.getInt("type_id") + def initiator = tfrsRs.getInt("initiator_id") + def respondent = tfrsRs.getInt("respondent_id") + def credits = tfrsRs.getInt("number_of_credits") + + // Apply logic based on type_id + switch(typeId) { + case 1: // Sell + tfrsBalances[initiator] = tfrsBalances[initiator] - credits + tfrsBalances[respondent] = tfrsBalances[respondent] + credits + break + case 2: // Buy + tfrsBalances[initiator] = tfrsBalances[initiator] + credits + tfrsBalances[respondent] = tfrsBalances[respondent] - credits + break + case 3: // Credit Validation + tfrsBalances[respondent] = tfrsBalances[respondent] + credits + break + case 4: // Credit Reduction + tfrsBalances[respondent] = tfrsBalances[respondent] - credits + break + case 5: // Part 3 Award + tfrsBalances[respondent] = tfrsBalances[respondent] + credits + break + // type_id = 6 (Admin Adjustment) not considered per requirement + } + } + + tfrsRs.close() + tfrsStmt.close() + + // Query LCFS for all transactions and sum by org + def LCFS_QUERY = """ + SELECT organization_id, SUM(compliance_units) AS total_units + FROM transaction + WHERE transaction_action = 'Adjustment' + GROUP BY organization_id + """ + + PreparedStatement lcfsStmt = destinationConn.prepareStatement(LCFS_QUERY) + ResultSet lcfsRs = lcfsStmt.executeQuery() + + def lcfsBalances = [:].withDefault {0} + + while (lcfsRs.next()) { + def orgId = lcfsRs.getInt("organization_id") + def totalUnits = lcfsRs.getInt("total_units") + lcfsBalances[orgId] = totalUnits + } + + lcfsRs.close() + lcfsStmt.close() + + // Compare balances + def allOrgs = (tfrsBalances.keySet() + lcfsBalances.keySet()).unique() + def discrepancies = [] + allOrgs.each { orgId -> + def tfrsVal = tfrsBalances[orgId] + def lcfsVal = lcfsBalances[orgId] + if (tfrsVal != lcfsVal) { + def diff = tfrsVal - lcfsVal + discrepancies << [orgId: orgId, tfrs: tfrsVal, lcfs: lcfsVal, difference: diff] + } + } + + // Print out discrepancies + if (discrepancies) { + log.warn("******** Organizations with balance discrepancies between TFRS and LCFS: ********") + discrepancies.each { d -> + log.warn("OrgID: ${d.orgId}, TFRS: ${d.tfrs}, LCFS: ${d.lcfs}, Difference (TFRS-LCFS): ${d.difference}") + } + } else { + log.warn("No discrepancies found. Balances match for all organizations.") + } + + // Log counts + def discrepancyCount = discrepancies.size() + def totalOrgs = allOrgs.size() + def matchingCount = totalOrgs - discrepancyCount + + log.warn("Number of organizations with discrepancies: ${discrepancyCount}") + log.warn("Number of organizations with matching balances: ${matchingCount}") + log.warn("Total organizations considered: ${totalOrgs}") + +} catch (Exception e) { + log.error("Error while validating organization balances", e) +} finally { + log.warn("**** FINISHED CREDIT VALIDATION ****") + if (sourceConn != null) sourceConn.close() + if (destinationConn != null) destinationConn.close() +}