Skip to content

Commit

Permalink
feat: unassociated credit trade calculations, credit validation script
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Zorkin committed Dec 17, 2024
1 parent 6e2f6b4 commit e5aaec7
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 1 deletion.
Binary file modified etl/database/nifi-registry-primary.mv.db
Binary file not shown.
Binary file modified etl/nifi/conf/flow.json.gz
Binary file not shown.
Binary file modified etl/nifi/conf/flow.xml.gz
Binary file not shown.
81 changes: 80 additions & 1 deletion etl/nifi_scripts/compliance_report.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Key Features:
6. Ensures sequence integrity for compliance_report_id.
7. Facilitates efficient testing by allowing deletion of inserted records based on display_name.
8. Adds handling for 'in reserve' transactions based on snapshot data.
9. After all existing compliance reports are processed, queries for unassociated credit trades of type 3 or 4
and creates new transactions for them.
*/

// =========================================
Expand Down Expand Up @@ -417,10 +419,87 @@ try {

rs.close()
consolidatedStmt.close()
// Commit all changes from the original compliance report processing
destinationConn.commit()

// =========================================
// Process Unassociated Credit Trades
// =========================================
// This section identifies credit trades of type 3 or 4 (Credit Validation or Credit Reduction)
// that are not currently associated with any compliance_report. Instead of creating new
// compliance reports, it only creates corresponding transactions. (to be reviewed at a later date)
//
// For each unassociated credit trade:
// 1. Determines if it's a credit validation or a credit reduction.
// 2. Creates a transaction (positive units for validations, negative units for reductions).
// 3. Does not associate the transaction with any compliance report, leaving them as standalone transactions.
//
// This ensures that all known credit trades have corresponding transactions,
// even if they do not currently relate to a compliance report.

log.warn("Processing Unassociated Credit Trades of Type 3 or 4 to Create Transactions Only")

// Query for unassociated credit trades of type 3 or 4 and approved status
// These trades exist in the credit_trade table but are not linked to a compliance_report.
def UNASSOCIATED_CREDIT_TRADES_QUERY = """
SELECT ct.id AS credit_trade_id,
ct.respondent_id AS organization_id,
ct.compliance_period_id AS period_id,
ct.type_id AS credit_trade_type_id,
ct.trade_effective_date,
ct.number_of_credits,
ct.status_id,
ct.create_user_id AS ct_create_user_id,
ct.create_timestamp AS ct_create_timestamp
FROM credit_trade ct
LEFT JOIN compliance_report cr ON cr.credit_transaction_id = ct.id
WHERE ct.type_id IN (3,4)
AND ct.status_id = 7
AND cr.id IS NULL;
"""

PreparedStatement unassociatedTradesStmt = sourceConn.prepareStatement(UNASSOCIATED_CREDIT_TRADES_QUERY)
ResultSet unassocRs = unassociatedTradesStmt.executeQuery()

while (unassocRs.next()) {
def credit_trade_id = unassocRs.getInt("credit_trade_id")
def organization_id = unassocRs.getInt("organization_id")
def credit_trade_type_id = unassocRs.getInt("credit_trade_type_id")
def trade_effective_date = unassocRs.getTimestamp("trade_effective_date")
def number_of_credits = unassocRs.getInt("number_of_credits")
def ct_create_user_id = unassocRs.getInt("ct_create_user_id")
def ct_create_timestamp = unassocRs.getTimestamp("ct_create_timestamp")

def createUser = userProfiles[ct_create_user_id] ?: "imported_user"

// Commit all changes
// Determine transaction direction based on trade type
// type_id = 3 (Credit Validation) => positive credits
// type_id = 4 (Credit Reduction) => negative credits
def adjustedCredits = (credit_trade_type_id == 4) ? -number_of_credits : number_of_credits

// Create a standalone transaction for this credit trade
def transactionId = insertTransactionForReport(
statements.insertTransactionStmt,
organization_id,
adjustedCredits,
"Adjustment",
createUser,
ct_create_timestamp,
trade_effective_date
)
totalTransactionsInserted++

log.warn("Created standalone transaction ${transactionId} for credit_trade_id ${credit_trade_id} (no associated compliance report)")
}

unassocRs.close()
unassociatedTradesStmt.close()

// Commit changes for the newly created transactions
destinationConn.commit()

log.warn("Inserted ${totalInserted} compliance reports (from earlier processing), ${totalHistoryInserted} history records, and ${totalTransactionsInserted} transactions into LCFS, including standalone transactions for unassociated credit trades.")

// Re-enable and refresh the materialized views
stmt = destinationConn.createStatement()
stmt.execute("""
Expand Down
143 changes: 143 additions & 0 deletions etl/nifi_scripts/credit_validation.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet

/*
This NiFi Groovy script:
1. Fetches all credit trades from the TFRS database with status_id = 7.
2. Sums the credit balances for each organization based on credit_trade_type logic:
- Sell (type_id = 1): initiator loses credits, respondent gains credits
- Buy (type_id = 2): initiator gains credits, respondent loses credits
- Credit Validation (type_id = 3): respondent_id gains credits
- Credit Reduction (type_id = 4): respondent_id loses credits
- Part 3 Award (type_id = 5): respondent_id gains credits
(Ignore type_id = 6 for now as instructed)
3. Fetches LCFS transactions and sums them per organization.
4. Prints out organizations where TFRS balance differs from LCFS balance, along with the discrepancy.
5. Logs how many organizations have discrepancies and how many have matching balances.
*/

log.warn("******* STARTING CREDIT VALIDATION *******")

// NiFi DBCP Controller Services
def sourceDbcpService = context.controllerServiceLookup.getControllerService('3245b078-0192-1000-ffff-ffffba20c1eb')
def destinationDbcpService = context.controllerServiceLookup.getControllerService('3244bf63-0192-1000-ffff-ffffc8ec6d93')

Connection sourceConn = null
Connection destinationConn = null

try {

sourceConn = sourceDbcpService.getConnection()
destinationConn = destinationDbcpService.getConnection()

// Query TFRS for all credit trades with status_id = 7
def TFRS_QUERY = """
SELECT ct.id AS credit_trade_id,
ct.type_id,
ct.initiator_id,
ct.respondent_id,
ct.number_of_credits
FROM credit_trade ct
WHERE ct.status_id = 7
"""

PreparedStatement tfrsStmt = sourceConn.prepareStatement(TFRS_QUERY)
ResultSet tfrsRs = tfrsStmt.executeQuery()

// Map for TFRS balances: org_id -> credits
def tfrsBalances = [:].withDefault {0}

while (tfrsRs.next()) {
def typeId = tfrsRs.getInt("type_id")
def initiator = tfrsRs.getInt("initiator_id")
def respondent = tfrsRs.getInt("respondent_id")
def credits = tfrsRs.getInt("number_of_credits")

// Apply logic based on type_id
switch(typeId) {
case 1: // Sell
tfrsBalances[initiator] = tfrsBalances[initiator] - credits
tfrsBalances[respondent] = tfrsBalances[respondent] + credits
break
case 2: // Buy
tfrsBalances[initiator] = tfrsBalances[initiator] + credits
tfrsBalances[respondent] = tfrsBalances[respondent] - credits
break
case 3: // Credit Validation
tfrsBalances[respondent] = tfrsBalances[respondent] + credits
break
case 4: // Credit Reduction
tfrsBalances[respondent] = tfrsBalances[respondent] - credits
break
case 5: // Part 3 Award
tfrsBalances[respondent] = tfrsBalances[respondent] + credits
break
// type_id = 6 (Admin Adjustment) not considered per requirement
}
}

tfrsRs.close()
tfrsStmt.close()

// Query LCFS for all transactions and sum by org
def LCFS_QUERY = """
SELECT organization_id, SUM(compliance_units) AS total_units
FROM transaction
WHERE transaction_action = 'Adjustment'
GROUP BY organization_id
"""

PreparedStatement lcfsStmt = destinationConn.prepareStatement(LCFS_QUERY)
ResultSet lcfsRs = lcfsStmt.executeQuery()

def lcfsBalances = [:].withDefault {0}

while (lcfsRs.next()) {
def orgId = lcfsRs.getInt("organization_id")
def totalUnits = lcfsRs.getInt("total_units")
lcfsBalances[orgId] = totalUnits
}

lcfsRs.close()
lcfsStmt.close()

// Compare balances
def allOrgs = (tfrsBalances.keySet() + lcfsBalances.keySet()).unique()
def discrepancies = []
allOrgs.each { orgId ->
def tfrsVal = tfrsBalances[orgId]
def lcfsVal = lcfsBalances[orgId]
if (tfrsVal != lcfsVal) {
def diff = tfrsVal - lcfsVal
discrepancies << [orgId: orgId, tfrs: tfrsVal, lcfs: lcfsVal, difference: diff]
}
}

// Print out discrepancies
if (discrepancies) {
log.warn("******** Organizations with balance discrepancies between TFRS and LCFS: ********")
discrepancies.each { d ->
log.warn("OrgID: ${d.orgId}, TFRS: ${d.tfrs}, LCFS: ${d.lcfs}, Difference (TFRS-LCFS): ${d.difference}")
}
} else {
log.warn("No discrepancies found. Balances match for all organizations.")
}

// Log counts
def discrepancyCount = discrepancies.size()
def totalOrgs = allOrgs.size()
def matchingCount = totalOrgs - discrepancyCount

log.warn("Number of organizations with discrepancies: ${discrepancyCount}")
log.warn("Number of organizations with matching balances: ${matchingCount}")
log.warn("Total organizations considered: ${totalOrgs}")

} catch (Exception e) {
log.error("Error while validating organization balances", e)
} finally {
log.warn("**** FINISHED CREDIT VALIDATION ****")
if (sourceConn != null) sourceConn.close()
if (destinationConn != null) destinationConn.close()
}

0 comments on commit e5aaec7

Please sign in to comment.