Skip to content

Commit

Permalink
feat: etl logic updates for compliance period and transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Zorkin committed Dec 17, 2024
1 parent 9744d96 commit 6e2f6b4
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 32 deletions.
165 changes: 139 additions & 26 deletions etl/nifi_scripts/compliance_report.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ try {
credit_trade_type : rs.getString('credit_trade_type'),
fair_market_value_per_credit : rs.getBigDecimal('fair_market_value_per_credit')
]


// Map TFRS compliance period ID to LCFS compliance period ID
record.compliance_period_id = mapCompliancePeriodId(record.compliance_period_id)

// Insert Compliance Report History from JSON
def historyJson = record.history_json
def historyRecords = historyJson ? new JsonSlurper().parseText(historyJson).sort { a, b ->
Expand Down Expand Up @@ -312,33 +315,101 @@ try {
updateComplianceReportWithTransaction(destinationConn, lcfsReportId, associatedTransactionId)
}
} else {
// **Non-Accepted Report:** Determine if an In Reserve transaction is needed
// After determining currentStatusName and having processed "In Reserve" transactions:
def currentStatusName = referenceData.statusIdToName[currentStatusId]?.toLowerCase()

if (["submitted", "recommended by analyst", "recommended by manager", "not recommended"].contains(currentStatusName)) {
// Create an In Reserve transaction
if (inReserveQuantity != null) {
def inReserveTransactionId = insertTransactionForReport(
statements.insertTransactionStmt,
orgId,
inReserveQuantity,
"Reserved",
reportCreateUser,
record.create_timestamp,
null
)
totalTransactionsInserted++

if (inReserveTransactionId != null) {
updateComplianceReportWithTransaction(destinationConn, lcfsReportId, inReserveTransactionId)
// Check if not the latest report in the chain or status is rejected
boolean isLatestReport = (record.compliance_report_id == record.latest_report_id)

// Determine if we should consider a "Reserved" transaction
def shouldReserve = ["submitted", "recommended by analyst", "recommended by manager", "not recommended"].contains(currentStatusName)

if (shouldReserve && inReserveQuantity != null && inReserveQuantity < 0) {
boolean mustRelease = false

// If current report is rejected, release immediately
if (currentStatusName == "rejected") {
mustRelease = true
} else {
// If not the latest report, check the source DB for a future report
if (!isLatestReport) {
def futureQuery = """
SELECT c.id,
s.fuel_supplier_status_id,
s.analyst_status_id,
s.manager_status_id,
s.director_status_id
FROM compliance_report c
JOIN compliance_report_workflow_state s ON c.status_id = s.id
WHERE c.root_report_id = ?
AND c.id > ?
ORDER BY c.id ASC LIMIT 1
"""

PreparedStatement futureStmt = sourceConn.prepareStatement(futureQuery)
futureStmt.setObject(1, record.root_report_id)
futureStmt.setInt(2, record.compliance_report_id)
ResultSet futureRs = futureStmt.executeQuery()

if (futureRs.next()) {
def futureFuelStatus = futureRs.getString("fuel_supplier_status_id")
def futureAnalystStatus = futureRs.getString("analyst_status_id")
def futureManagerStatus = futureRs.getString("manager_status_id")
def futureDirectorStatus = futureRs.getString("director_status_id")

// Map these future statuses to a final mapped status ID
def futureStatusId = mapCurrentStatus(
futureFuelStatus,
futureAnalystStatus,
futureManagerStatus,
futureDirectorStatus,
referenceData,
[]
)

if (futureStatusId != null) {
def futureStatusName = referenceData.statusIdToName[futureStatusId]?.toLowerCase()
log.warn("Future report found with mapped status: ${futureStatusName}")

// If the future report progresses beyond current (e.g., submitted or recommended), we must release immediately
if (["submitted", "recommended by analyst", "recommended by manager", "assessed"].contains(futureStatusName)) {
mustRelease = true
}
} else {
log.warn("Future report found but no valid mapped status. Not releasing at this time.")
}
}

futureRs.close()
futureStmt.close()
}
}

// Determine final action: if mustRelease is true, directly release; otherwise reserve
def finalAction = mustRelease ? "Released" : "Reserved"

log.warn("Inserting a single ${finalAction} transaction for compliance_report_id: ${record.compliance_report_id} with quantity: ${inReserveQuantity}")

def transactionId = insertTransactionForReport(
statements.insertTransactionStmt,
orgId,
inReserveQuantity,
finalAction,
reportCreateUser,
record.create_timestamp,
null
)
totalTransactionsInserted++
if (transactionId != null) {
updateComplianceReportWithTransaction(destinationConn, lcfsReportId, transactionId)
}

} else if (currentStatusName == "rejected") {
// **Rejected Report:** Do nothing
log.debug("Compliance report_id: ${record.compliance_report_id} is rejected. No transaction created.")
// Report is rejected but no inReserveQuantity or not in the reservable statuses, no transaction
log.debug("Rejected report_id: ${record.compliance_report_id}. No transaction created.")
} else {
// **Unhandled Status:** Log or handle as needed
log.warn("Compliance report_id: ${record.compliance_report_id} has an unhandled status: ${currentStatusName}. No transaction created.")
// Unhandled status or inReserveQuantity is null
log.warn("No action for compliance_report_id: ${record.compliance_report_id}, status: ${currentStatusName}, inReserveQuantity: ${inReserveQuantity}.")
}
}
}
Expand Down Expand Up @@ -547,23 +618,20 @@ def mapCurrentStatus(fuelStatusName, analystStatusName, managerStatusName, direc
} else if (directorStatusName == "rejected") {
return referenceData.statusMap["rejected"]
}
// Handle other director statuses if necessary
}

// Check Manager Status
if (managerStatusName) {
if (managerStatusName == "recommended") {
return referenceData.statusMap["recommended by manager"]
}
// Handle other manager statuses if necessary
}

// Check Analyst Status
if (analystStatusName) {
if (analystStatusName == "recommended") {
return referenceData.statusMap["recommended by analyst"]
}
// Handle other analyst statuses if necessary
}

// Check Fuel Supplier Status
Expand All @@ -576,13 +644,58 @@ def mapCurrentStatus(fuelStatusName, analystStatusName, managerStatusName, direc
// Exclude deleted reports
return null
}
// Handle other fuel supplier statuses if necessary
}

// Default case
return null
}

// =========================================
// Add a mapping function for compliance_period_id
// =========================================
def mapCompliancePeriodId(Integer tfrsId) {
// This mapping adjusts for the doubled up years in TFRS (2012-13, 2013-14)
// and ensures correct alignment with LCFS single-year periods.
//
// TFRS IDs vs. Descriptions:
// 3 -> "2012-13"
// 4 -> "2013-14"
// LCFS has each year individually:
// 3 -> "2012"
// 4 -> "2013"
// 5 -> "2014"
//
// Because TFRS combines two-year periods, align them as follows:
// TFRS:3 (2012-13) -> LCFS:3 (2012)
// TFRS:4 (2013-14) -> LCFS:5 (2014), skipping LCFS:4 (2013)
// After handling these two combined periods, subsequent years shift by 1.

def compliancePeriodMapping = [
1: 1,
2: 2,
3: 3, // 2012-13 mapped to 2012
4: 5, // 2013-14 mapped to 2014, skipping 2013
5: 6,
6: 7,
7: 8,
8: 9,
9: 10,
10: 11,
11: 12,
12: 13,
13: 14,
14: 15,
15: 16,
16: 17,
17: 18,
18: 19,
19: 20,
20: 21
]

return compliancePeriodMapping[tfrsId] ?: tfrsId
}

/**
* Prepares SQL statements for insertion and updates.
* @param conn Destination database connection.
Expand Down
38 changes: 32 additions & 6 deletions etl/nifi_scripts/transfer.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ try {
def internalCommentsJson = internalComments ? jsonSlurper.parseText(internalComments) : []
def creditTradeHistoryJson = creditTradeHistory ? jsonSlurper.parseText(creditTradeHistory) : []

// First, determine if the transfer already exists
def transferIdFromSource = resultSet.getInt('transfer_id')
if (transferExists(destinationConn, transferIdFromSource)) {
log.warn("Duplicate transfer detected with transfer_id: ${transferIdFromSource}, skipping insertion.")
// Since this transfer already exists, do not insert transactions or history again.
continue
}

// Only if transfer does not exist, proceed to create transactions and then insert the transfer.
def (fromTransactionId, toTransactionId) = processTransactions(resultSet.getString('current_status'),
resultSet,
statements.transactionStmt)
Expand All @@ -205,7 +214,7 @@ try {
processInternalComments(transferId, internalCommentsJson, statements.internalCommentStmt,
statements.transferInternalCommentStmt)
} else {
log.warn("Transfer not inserted for record: ${resultSet.getInt('transfer_id')}")
log.warn("Transfer not inserted for record: ${transferIdFromSource}")
}
}
resultSet.close()
Expand Down Expand Up @@ -344,19 +353,24 @@ def processTransactions(String currentStatus, ResultSet rs, PreparedStatement st
case ['Draft', 'Deleted', 'Refused', 'Declined', 'Rescinded']:
break
case ['Sent', 'Submitted', 'Recommended']:
fromTransactionId = insertTransaction(stmt, rs, 'Reserved', rs.getInt('from_organization_id'))
fromTransactionId = insertTransaction(stmt, rs, 'Reserved', rs.getInt('from_organization_id'), true)
break
case 'Recorded':
fromTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('from_organization_id'))
toTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('to_organization_id'))
fromTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('from_organization_id'), true)
toTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('to_organization_id'), false)
break
}

return [fromTransactionId, toTransactionId]
}

def insertTransaction(PreparedStatement stmt, ResultSet rs, String action, int orgId) {
stmt.setInt(1, rs.getInt('quantity'))
def insertTransaction(PreparedStatement stmt, ResultSet rs, String action, int orgId, boolean isDebit) {
def quantity = rs.getInt('quantity')
if (isDebit) {
quantity *= -1 // Make the transaction negative for the sender
}

stmt.setInt(1, quantity)
stmt.setInt(2, orgId)
stmt.setString(3, action)
stmt.setDate(4, rs.getDate('transaction_effective_date') ?: rs.getDate('agreement_date'))
Expand All @@ -367,6 +381,18 @@ def insertTransaction(PreparedStatement stmt, ResultSet rs, String action, int o
return result.next() ? result.getInt('transaction_id') : null
}

def transferExists(Connection conn, int transferId) {
def duplicateCheckStmt = conn.prepareStatement('SELECT COUNT(*) FROM transfer WHERE transfer_id = ?')
duplicateCheckStmt.setInt(1, transferId)
def duplicateResult = duplicateCheckStmt.executeQuery()
duplicateResult.next()
def count = duplicateResult.getInt(1)
duplicateResult.close()
duplicateCheckStmt.close()

return count > 0
}

def processHistory(Integer transferId, List creditTradeHistory, PreparedStatement historyStmt, Map preparedData) {
if (!creditTradeHistory) return

Expand Down

0 comments on commit 6e2f6b4

Please sign in to comment.