Skip to content

Commit

Permalink
part3 award transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
prv-proton committed Dec 10, 2024
1 parent eb586fc commit 8bfa775
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 48 deletions.
Binary file modified etl/nifi/conf/flow.json.gz
Binary file not shown.
Binary file modified etl/nifi/conf/flow.xml.gz
Binary file not shown.
80 changes: 32 additions & 48 deletions etl/nifi_scripts/part3award.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ WITH
-- JSON aggregation for credit trade history
json_agg (
json_build_object (
'transfer_id',
'initiative_agreement_id',
cth.credit_trade_id,
'transfer_status',
'initiative_agreement_status',
case
WHEN cts_history.status IN ('Cancelled', 'Not Recommended', 'Declined', 'Refused') or ct.is_rescinded = true THEN 'Deleted'
WHEN cts_history.status IN ('Accepted', 'Submitted', 'Recommended') THEN 'Recommended'
Expand Down Expand Up @@ -176,9 +176,9 @@ try {
if (initiativeAgreementId) {
processHistory(initiativeAgreementId, creditTradeHistoryJson, statements.historyStmt, preparedData)
processInternalComments(initiativeAgreementId, internalCommentsJson, statements.internalCommentStmt,
statements.transferInternalCommentStmt)
statements.initiativeAgreementInternalCommentStmt)
} else {
log.warn("Transfer not inserted for record: ${resultSet.getInt('transfer_id')}")
log.warn("initiative-agreement not inserted for record: ${resultSet.getInt('initiative_agreement_id')}")
}
}
resultSet.close()
Expand Down Expand Up @@ -236,26 +236,16 @@ def loadTableData(Connection conn, String query, String keyColumn, String valueC
}

def prepareData(Connection conn) {
def categoryMap = loadTableData(conn,
'SELECT DISTINCT category, MIN(transfer_category_id) AS transfer_category_id FROM transfer_category GROUP BY category',
'category',
'transfer_category_id'
)
def statusMap = loadTableData(conn,
'SELECT DISTINCT status, MIN(transfer_status_id) AS transfer_status_id FROM transfer_status GROUP BY status',
'SELECT DISTINCT status, MIN(initiative_agreement_status_id) AS initiative_agreement_status_id FROM initiative_agreement_status GROUP BY status',
'status',
'transfer_status_id'
'initiative_agreement_status_id'
)
return [
categoryMap: categoryMap,
statusMap : statusMap
]
}

def getTransferCategoryId(String category, Map preparedData) {
return preparedData.categoryMap[category]
}

def getStatusId(String status, Map preparedData) {
return preparedData.statusMap[status]
}
Expand All @@ -266,12 +256,12 @@ def prepareStatements(Connection conn) {
to_organization_id, transaction_id, transaction_effective_date, compliance_units, gov_comment,
current_status_id, create_date, update_date, create_user, update_user, effective_status,
initiative_agreement_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, true, ?)
RETURNING transfer_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, true, ?)
RETURNING initiative_agreement_id
'''
def INSERT_TRANSFER_HISTORY_SQL = '''
def INSERT_INITIATIVE_AGREEMENT_HISTORY_SQL = '''
INSERT INTO initiative_agreement_history (
transfer_history_id, transfer_id, transfer_status_id, user_profile_id, create_date, effective_status
initiative_agreement_history_id, initiative_agreement_id, initiative_agreement_status_id, user_profile_id, create_date, effective_status
) VALUES (DEFAULT, ?, ?, ?, ?, true)
'''
def INSERT_INTERNAL_COMMENT_SQL = '''
Expand All @@ -280,9 +270,9 @@ def prepareStatements(Connection conn) {
) VALUES (DEFAULT, ?, ?::audience_scope, ?, ?)
RETURNING internal_comment_id
'''
def INSERT_TRANSFER_INTERNAL_COMMENT_SQL = '''
INSERT INTO transfer_internal_comment (
transfer_id, internal_comment_id
def INSERT_INITIATIVE_AGREEMENT_INTERNAL_COMMENT_SQL = '''
INSERT INTO initiative_agreement_internal_comment (
initiative_agreement_id, internal_comment_id
) VALUES (?, ?)
'''
def INSERT_TRANSACTION_SQL = '''
Expand All @@ -291,10 +281,11 @@ def prepareStatements(Connection conn) {
) VALUES (DEFAULT, ?, ?, ?::transaction_action_enum, ?, ?, ?, true)
RETURNING transaction_id
'''
return [initiativeAgreementStmt : conn.prepareStatement(INSERT_INITIATIVE_AGREEMENT_SQL),
historyStmt : conn.prepareStatement(INSERT_TRANSFER_HISTORY_SQL),
return [initiativeAgreementStmt : conn.prepareStatement(INSERT_INITIATIVE_AGREEMENT_SQL),
historyStmt : conn.prepareStatement(INSERT_INITIATIVE_AGREEMENT_HISTORY_SQL),
internalCommentStmt : conn.prepareStatement(INSERT_INTERNAL_COMMENT_SQL),
transferInternalCommentStmt: conn.prepareStatement(INSERT_TRANSFER_INTERNAL_COMMENT_SQL),
initiativeAgreementInternalCommentStmt
: conn.prepareStatement(INSERT_INITIATIVE_AGREEMENT_INTERNAL_COMMENT_SQL),
transactionStmt : conn.prepareStatement(INSERT_TRANSACTION_SQL)]
}

Expand All @@ -312,19 +303,15 @@ def toSqlTimestamp(String timestampString) {
def processTransactions(String currentStatus, ResultSet rs, PreparedStatement stmt) {
def toTransactionId = null

switch (currentStatus) {
case ['Draft', 'Deleted', 'Recommended']:
break
case 'Approved':
toTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('to_organization_id'))
break
if (currentStatus == 'Approved') {
toTransactionId = insertTransaction(stmt, rs, 'Adjustment', rs.getInt('to_organization_id'))
}

return toTransactionId
}

def insertTransaction(PreparedStatement stmt, ResultSet rs, String action, int orgId) {
stmt.setInt(1, rs.getInt('quantity'))
stmt.setInt(1, rs.getInt('compliance_units'))
stmt.setInt(2, orgId)
stmt.setString(3, action)
stmt.setDate(4, rs.getDate('transaction_effective_date') ?: rs.getDate('agreement_date'))
Expand All @@ -338,18 +325,18 @@ def insertTransaction(PreparedStatement stmt, ResultSet rs, String action, int o
def processHistory(Integer initiativeAgreementId, List creditTradeHistory, PreparedStatement historyStmt, Map preparedData) {
if (!creditTradeHistory) return

// Use a Set to track unique combinations of transfer_id and transfer_status
// Use a Set to track unique combinations of initiative_agreement_id and initiative_agreement_status
def processedEntries = new HashSet<String>()

creditTradeHistory.each { historyItem ->
try {
def statusId = getStatusId(historyItem.transfer_status, preparedData)
def uniqueKey = "${transferId}_${statusId}"
def statusId = getStatusId(historyItem.initiative_agreement_status, preparedData)
def uniqueKey = "${initiativeAgreementId}_${statusId}"

// Check if this combination has already been processed
if (!processedEntries.contains(uniqueKey)) {
// If not processed, add to batch and mark as processed
historyStmt.setInt(1, transferId)
historyStmt.setInt(1, initiativeAgreementId)
historyStmt.setInt(2, statusId)
historyStmt.setInt(3, historyItem.user_profile_id)
historyStmt.setTimestamp(4, toSqlTimestamp(historyItem.create_timestamp ?: '2013-01-01T00:00:00Z'))
Expand All @@ -358,7 +345,7 @@ def processHistory(Integer initiativeAgreementId, List creditTradeHistory, Prepa
processedEntries.add(uniqueKey)
}
} catch (Exception e) {
log.error("Error processing history record for transfer_id: ${transferId}", e)
log.error("Error processing history record for initiative_agreement_id: ${initiativeAgreementId}", e)
}
}

Expand All @@ -367,14 +354,11 @@ def processHistory(Integer initiativeAgreementId, List creditTradeHistory, Prepa
}


def processInternalComments(Integer transferId, List internalComments,
def processInternalComments(Integer initiativeAgreementId, List internalComments,
PreparedStatement internalCommentStmt,
PreparedStatement transferInternalCommentStmt) {
PreparedStatement initiativeAgreementInternalCommentStmt) {
if (!internalComments) return

// Use Set to track processed IDs and avoid duplicates
def processedIds = new HashSet<Integer>()

internalComments.each { comment ->
if (!comment) return // Skip null comments

Expand All @@ -390,15 +374,15 @@ def processInternalComments(Integer transferId, List internalComments,
if (commentResult.next()) {
internalCommentId = commentResult.getInt('internal_comment_id')

// Insert the transfer-comment relationship
transferInternalCommentStmt.setInt(1, transferId)
transferInternalCommentStmt.setInt(2, internalCommentId)
transferInternalCommentStmt.executeUpdate()
// Insert the initiative-agreement-comment relationship
initiativeAgreementInternalCommentStmt.setInt(1, initiativeAgreementId)
initiativeAgreementInternalCommentStmt.setInt(2, internalCommentId)
initiativeAgreementInternalCommentStmt.executeUpdate()
}

commentResult.close()
} catch (Exception e) {
log.error("Error processing internal comment for transfer ${transferId}: ${e.getMessage()}", e)
log.error("Error processing internal comment for initiative-agreement ${initiativeAgreementId}: ${e.getMessage()}", e)
}
}
}
Expand Down

0 comments on commit 8bfa775

Please sign in to comment.