Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Compliance Report ETL #1428

Merged
merged 10 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion backend/lcfs/services/tfrs/redis_balance.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ async def init_org_balance_cache(app: FastAPI):
transaction_repo = TransactionRepository(db=session)

# Get the oldest transaction year
oldest_year = await transaction_repo.get_transaction_start_year()
oldest_year = await transaction_repo.get_transaction_start_year() or int(
2019
)

# Get the current year
current_year = datetime.now().year
Expand Down
Binary file modified etl/database/nifi-registry-primary.mv.db
Binary file not shown.
30 changes: 15 additions & 15 deletions etl/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ services:
networks:
- shared_network
# TFRS database loaded with TFRS data
# tfrs:
# image: postgres:14.2
# container_name: tfrs
# environment:
# POSTGRES_USER: tfrs
# POSTGRES_PASSWORD: development_only
# POSTGRES_DB: tfrs
# ports:
# - "5435:5432"
# volumes:
# - tfrs_data:/var/lib/postgresql/data
# restart: unless-stopped
# networks:
# - shared_network
tfrs:
image: postgres:14.2
container_name: tfrs
environment:
POSTGRES_USER: tfrs
POSTGRES_PASSWORD: development_only
POSTGRES_DB: tfrs
ports:
- "5435:5432"
volumes:
- tfrs_data:/var/lib/postgresql/data
restart: unless-stopped
networks:
- shared_network

volumes:
# tfrs_data:
tfrs_data:
nifi_output:
nifi_scripts:

Expand Down
300 changes: 300 additions & 0 deletions etl/local_nifi_manager.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
#!/usr/bin/env bash
# A Simplified NiFi Processor Management Script for Local Databases
#
# This script manages NiFi processors and updates database connection configurations
# for a local setup, without OpenShift or port-forwarding. It assumes:
# - NiFi is running locally and accessible at http://localhost:8091
# - TFRS and LCFS databases are running locally on specified ports.
#
# Usage:
# ./local_nifi_manager.sh [--debug|--verbose]
#
# Features:
# - No OpenShift or port-forwarding; everything runs locally.
# - Updates NiFi database controller connections to point to local DBs.
# - Triggers NiFi processors to run once and waits for them to finish.
#
# Requirements:
# - `jq` for JSON parsing.
# - NiFi running locally and accessible via the NiFi API.
#
# Notes:
# Update the processor and controller service IDs, as well as the database credentials,
# according to your local environment.

set -euo pipefail

# Debug Levels
readonly DEBUG_NONE=0
readonly DEBUG_ERROR=1
readonly DEBUG_WARN=2
readonly DEBUG_INFO=3
readonly DEBUG_VERBOSE=4
readonly DEBUG_DEBUG=5

# Default Debug Level
DEBUG_LEVEL=${DEBUG_LEVEL:-$DEBUG_INFO}

# Processor and Controller Service IDs (Update these as needed)
ORGANIZATION_PROCESSOR="328e2539-0192-1000-0000-00007a4304c1"
USER_PROCESSOR="e6c63130-3eac-1b13-a947-ee0103275138"
TRANSFER_PROCESSOR="b9d73248-1438-1418-a736-cc94c8c21e70"
TRANSACTIONS_PROCESSOR="7a010ef5-0193-1000-ffff-ffff8c22e67e"

# Controller Service IDs for local DB connections (Update these)
LCFS_CONTROLLER_SERVICE_ID="32417e8c-0192-1000-ffff-ffff8ccb5dfa"
TFRS_CONTROLLER_SERVICE_ID="3245b078-0192-1000-ffff-ffffba20c1eb"

# Local Database Configuration (Update as needed)
# Example: TFRS DB at localhost:5435, LCFS DB at localhost:5432
TFRS_DB_PORT=5435
TFRS_DB_USER="tfrs"
TFRS_DB_PASS="development_only"
TFRS_DB_NAME="tfrs"

LCFS_DB_PORT=5432
LCFS_DB_USER="lcfs"
LCFS_DB_PASS="development_only"
LCFS_DB_NAME="lcfs"

# NiFi API URL
NIFI_API_URL="http://localhost:8091/nifi-api"
MAX_RETRIES=5

# Logging function
_log() {
local level=$1
local message=$2
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')

if [ "$level" -le "$DEBUG_LEVEL" ]; then
case $level in
$DEBUG_ERROR) printf "\e[31m[ERROR][%s] %s\e[0m\n" "$timestamp" "$message" >&2 ;;
$DEBUG_WARN) printf "\e[33m[WARN][%s] %s\e[0m\n" "$timestamp" "$message" >&2 ;;
$DEBUG_INFO) printf "\e[32m[INFO][%s] %s\e[0m\n" "$timestamp" "$message" >&2 ;;
$DEBUG_VERBOSE)printf "\e[34m[VERBOSE][%s] %s\e[0m\n" "$timestamp" "$message" >&2 ;;
$DEBUG_DEBUG) printf "\e[36m[DEBUG][%s] %s\e[0m\n" "$timestamp" "$message" >&2 ;;
esac
fi
}

error() { _log $DEBUG_ERROR "$1"; }
warn() { _log $DEBUG_WARN "$1"; }
info() { _log $DEBUG_INFO "$1"; }
verbose() { _log $DEBUG_VERBOSE "$1"; }
debug() { _log $DEBUG_DEBUG "$1"; }

error_exit() {
error "$1"
exit 1
}

curl_with_retry() {
local max_attempts=$1
shift
local url=$1
local method=${2:-GET}
local data=${3:-}
local attempt=1

info "Curl Request: $method $url"
verbose "Max Attempts: $max_attempts"

while [ $attempt -le "$max_attempts" ]; do
debug "Attempt $attempt for $url"
local response
local http_code

if [ -z "$data" ]; then
response=$(curl -sS -w "%{http_code}" -X "$method" "$url")
else
response=$(curl -sS -w "%{http_code}" -X "$method" -H "Content-Type: application/json" -d "$data" "$url")
fi

http_code="${response: -3}"
response_body="${response:0:${#response}-3}"

if [ "$http_code" -ge 200 ] && [ "$http_code" -lt 300 ]; then
verbose "Curl successful. HTTP Code: $http_code"
echo "$response_body"
return 0
fi

warn "Curl attempt $attempt failed (HTTP $http_code). Retrying..."
((attempt++))
sleep $((attempt * 2))
done

error_exit "Curl failed after $max_attempts attempts: $url"
}

enable_controller_service() {
local service_id=$1
info "Enabling controller service $service_id"
local current_config revision_version

current_config=$(curl_with_retry 3 "$NIFI_API_URL/controller-services/$service_id")
revision_version=$(echo "$current_config" | jq -r '.revision.version')

curl_with_retry 3 "$NIFI_API_URL/controller-services/$service_id/run-status" PUT "{\"state\": \"ENABLED\", \"revision\": { \"version\": $revision_version }}"
info "Controller service $service_id enabled."
}

execute_processor() {
local processor_id=$1

info "Triggering single execution for processor $processor_id"
local current_config revision_version

current_config=$(curl_with_retry 3 "$NIFI_API_URL/processors/$processor_id")
revision_version=$(echo "$current_config" | jq -r '.revision.version')

local run_once_payload=$(jq -n \
--argjson revision_version "$revision_version" \
'{
"revision": { "version": $revision_version },
"state": "RUN_ONCE",
"disconnectedNodeAcknowledged": false
}')

local response
response=$(curl -sS -w "%{http_code}" -X PUT \
-H "Content-Type: application/json" \
-d "$run_once_payload" \
"$NIFI_API_URL/processors/$processor_id/run-status")

local http_code="${response: -3}"
local response_body="${response:0:${#response}-3}"

if [ "$http_code" -ge 200 ] && [ "$http_code" -lt 300 ]; then
info "Processor $processor_id triggered for single run successfully"
sleep 10
else
error "Failed to trigger single run for processor $processor_id (HTTP $http_code)"
error "Response: $response_body"
return 1
fi

# Wait for completion
local max_wait_time=300
local start_time=$(date +%s)
local timeout=$((start_time + max_wait_time))

while true; do
local current_time=$(date +%s)
if [ "$current_time" -gt "$timeout" ]; then
error "Processor $processor_id execution timed out after $max_wait_time seconds"
return 1
fi

local processor_status
processor_status=$(curl_with_retry 3 "$NIFI_API_URL/processors/$processor_id" | jq '.status.aggregateSnapshot')

local active_threads=$(echo "$processor_status" | jq '.activeThreadCount')
local processed_count=$(echo "$processor_status" | jq '.flowFilesProcessed')

verbose "Processor $processor_id - Active Threads: $active_threads, Processed: $processed_count"

if [ "$active_threads" -eq 0 ]; then
info "Processor $processor_id completed execution"
break
fi
sleep 2
done
}

update_nifi_connection() {
local controller_service_id=$1
local db_url=$2
local db_user=$3
local db_pass=$4

info "Updating NiFi controller service $controller_service_id"

local controller_config
controller_config=$(curl_with_retry 3 "$NIFI_API_URL/controller-services/$controller_service_id")
local revision_version=$(echo "$controller_config" | jq '.revision.version')
local current_name=$(echo "$controller_config" | jq -r '.component.name')

# Disable the controller service first
info "Disabling controller service $controller_service_id"
curl_with_retry 3 "$NIFI_API_URL/controller-services/$controller_service_id/run-status" PUT "{\"state\": \"DISABLED\", \"revision\": { \"version\": $revision_version }}"

# Wait until disabled
while true; do
controller_config=$(curl_with_retry 3 "$NIFI_API_URL/controller-services/$controller_service_id")
local current_state=$(echo "$controller_config" | jq -r '.component.state')
if [ "$current_state" == "DISABLED" ]; then
break
fi
warn "Controller service not yet disabled. Retrying..."
sleep 2
done

local updated_config
updated_config=$(jq -n \
--arg name "$current_name" \
--arg db_url "$db_url" \
--arg db_user "$db_user" \
--arg db_pass "$db_pass" \
--argjson version "$revision_version" \
'{
"revision": { "version": $version },
"component": {
"id": "'$controller_service_id'",
"name": $name,
"properties": {
"Database Connection URL": $db_url,
"Database User": $db_user,
"Password": $db_pass
}
}
}')

curl_with_retry 3 "$NIFI_API_URL/controller-services/$controller_service_id" PUT "$updated_config"
info "Controller service updated."

enable_controller_service "$controller_service_id"
}

main() {
while [[ $# -gt 0 ]]; do
case "$1" in
--debug)
DEBUG_LEVEL=$DEBUG_DEBUG
shift
;;
--verbose)
DEBUG_LEVEL=$DEBUG_VERBOSE
shift
;;
*)
break
;;
esac
done

info "Starting Local NiFi Manager"
info "Debug Level: $DEBUG_LEVEL"

# Prepare DB URLs for local connections
local TFRS_DB_URL="jdbc:postgresql://localhost:$TFRS_DB_PORT/$TFRS_DB_NAME"
local LCFS_DB_URL="jdbc:postgresql://localhost:$LCFS_DB_PORT/$LCFS_DB_NAME"

info "Updating TFRS DB connection..."
update_nifi_connection "$TFRS_CONTROLLER_SERVICE_ID" "$TFRS_DB_URL" "$TFRS_DB_USER" "$TFRS_DB_PASS"

info "Updating LCFS DB connection..."
update_nifi_connection "$LCFS_CONTROLLER_SERVICE_ID" "$LCFS_DB_URL" "$LCFS_DB_USER" "$LCFS_DB_PASS"

info "Executing processors..."
execute_processor "$ORGANIZATION_PROCESSOR"
execute_processor "$USER_PROCESSOR"
execute_processor "$TRANSFER_PROCESSOR"
execute_processor "$TRANSACTIONS_PROCESSOR"

info "All processors executed successfully."
info "Check your local databases for the expected data."
}

main "$@"
Binary file modified etl/nifi/conf/flow.json.gz
Binary file not shown.
Binary file modified etl/nifi/conf/flow.xml.gz
Binary file not shown.
Loading