Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: LCFS - Fix ETL Role Mapping Between TFRS and LCFS #1594 #1600

Merged
merged 3 commits into from
Jan 4, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 126 additions & 53 deletions etl/nifi_scripts/user.groovy
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import groovy.json.JsonSlurper

log.warn("**** STARTING USER ETL ****")
log.warn('**** STARTING USER ETL ****')

// SQL query to extract user profiles
def userProfileQuery = """
Expand All @@ -17,33 +18,98 @@ def userProfileQuery = """
first_name,
last_name,
is_active,
CASE WHEN organization_id = 1 THEN null ELSE organization_id END as organization_id
CASE WHEN organization_id = 1 THEN NULL ELSE organization_id END as organization_id
FROM public.user;
"""

// SQL query to extract user roles
def userRoleQuery = """
SELECT ur.user_id as user_profile_id,
CASE
WHEN r.name = 'Admin' THEN 'ADMINISTRATOR'
WHEN r.name = 'GovUser' THEN 'ANALYST'
WHEN r.name IN ('GovDirector', 'GovDeputyDirector') THEN 'DIRECTOR'
WHEN r.name = 'GovComplianceManager' THEN 'COMPLIANCE_MANAGER'
WHEN r.name = 'FSAdmin' THEN 'MANAGE_USERS'
WHEN r.name = 'FSUser' THEN 'TRANSFER'
WHEN r.name = 'FSManager' THEN 'SIGNING_AUTHORITY'
WHEN r.name = 'FSNoAccess' THEN 'READ_ONLY'
WHEN r.name = 'ComplianceReporting' THEN 'COMPLIANCE_REPORTING'
ELSE NULL
END AS role_name
FROM public.user u
INNER JOIN user_role ur ON ur.user_id = u.id
INNER JOIN role r ON r.id = ur.role_id
WHERE r.name NOT IN ('FSDocSubmit', 'GovDoc');
WITH RoleData AS (
SELECT ur.user_id AS user_profile_id,
CASE
-- Government Roles
WHEN u.organization_id = 1 THEN
CASE
WHEN r.name = 'Admin' THEN 'ADMINISTRATOR'
WHEN r.name = 'GovUser' THEN 'ANALYST'
WHEN r.name IN ('GovDirector', 'GovDeputyDirector') THEN 'DIRECTOR'
WHEN r.name = 'GovComplianceManager' THEN 'COMPLIANCE_MANAGER'
END
-- Supplier Roles
WHEN u.organization_id > 1 THEN
CASE
WHEN r.name = 'FSAdmin' THEN 'MANAGE_USERS'
WHEN r.name = 'FSUser' THEN 'TRANSFER'
WHEN r.name = 'FSManager' THEN 'SIGNING_AUTHORITY'
WHEN r.name = 'FSNoAccess' THEN 'READ_ONLY'
WHEN r.name = 'ComplianceReporting' THEN 'COMPLIANCE_REPORTING'
END
END AS role_name,
u.organization_id
FROM public.user u
INNER JOIN user_role ur ON ur.user_id = u.id
INNER JOIN role r ON r.id = ur.role_id
WHERE r.name NOT IN ('FSDocSubmit', 'GovDoc')
),
FilteredRoles AS (
SELECT user_profile_id,
organization_id,
ARRAY_AGG(role_name) AS roles
FROM RoleData
WHERE role_name IS NOT NULL
GROUP BY user_profile_id, organization_id
),
ProcessedRoles AS (
SELECT
user_profile_id,
CASE
-- Rule 1: Government Users
WHEN organization_id = 1 THEN
CASE
-- Retain Administrator and one prioritized gov role
WHEN 'ADMINISTRATOR' = ANY(roles) THEN
ARRAY_REMOVE(ARRAY[
'ADMINISTRATOR',
CASE
WHEN 'DIRECTOR' = ANY(roles) THEN 'DIRECTOR'
WHEN 'COMPLIANCE_MANAGER' = ANY(roles) THEN 'COMPLIANCE_MANAGER'
WHEN 'ANALYST' = ANY(roles) THEN 'ANALYST'
END
], NULL)
-- Priority among gov roles (no Administrator)
ELSE ARRAY_REMOVE(ARRAY[
CASE
WHEN 'DIRECTOR' = ANY(roles) THEN 'DIRECTOR'
WHEN 'COMPLIANCE_MANAGER' = ANY(roles) THEN 'COMPLIANCE_MANAGER'
WHEN 'ANALYST' = ANY(roles) THEN 'ANALYST'
END
], NULL)
END
-- Rule 2: Supplier Users
WHEN organization_id > 1 THEN
CASE
-- Return empty array if READ_ONLY exists
WHEN 'READ_ONLY' = ANY(roles) THEN
ARRAY[]::text[]
ELSE ARRAY(
SELECT UNNEST(roles)
EXCEPT
SELECT UNNEST(ARRAY['ADMINISTRATOR', 'ANALYST', 'DIRECTOR', 'COMPLIANCE_MANAGER'])
)
END
END AS filtered_roles,
organization_id
FROM FilteredRoles
)
SELECT
user_profile_id,
organization_id,
array_to_string(filtered_roles, ',') as roles_string
FROM ProcessedRoles;
"""

// SQL queries to insert user profiles and roles into destination tables with ON CONFLICT handling
def insertUserProfileSQL = """
def insertUserProfileSQL = '''
INSERT INTO user_profile (user_profile_id, keycloak_user_id, keycloak_email, keycloak_username, email, title, phone, mobile_phone, first_name, last_name, is_active, organization_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (user_profile_id) DO UPDATE
Expand All @@ -58,17 +124,17 @@ def insertUserProfileSQL = """
last_name = EXCLUDED.last_name,
is_active = EXCLUDED.is_active,
organization_id = EXCLUDED.organization_id;
"""
'''

def insertUserRoleSQL = """
def insertUserRoleSQL = '''
INSERT INTO user_role (user_profile_id, role_id)
VALUES (?, (SELECT role_id FROM role WHERE name = ?::role_enum))
ON CONFLICT (user_profile_id, role_id) DO NOTHING;
"""
'''

// Fetch connections to both source and destination databases
def sourceDbcpService = context.controllerServiceLookup.getControllerService("3245b078-0192-1000-ffff-ffffba20c1eb")
def destinationDbcpService = context.controllerServiceLookup.getControllerService("3244bf63-0192-1000-ffff-ffffc8ec6d93")
def sourceDbcpService = context.controllerServiceLookup.getControllerService('3245b078-0192-1000-ffff-ffffba20c1eb')
def destinationDbcpService = context.controllerServiceLookup.getControllerService('3244bf63-0192-1000-ffff-ffffc8ec6d93')

Connection sourceConn = null
Connection destinationConn = null
Expand All @@ -88,18 +154,18 @@ try {

// Process the result set for user profiles
while (userProfileResultSet.next()) {
def userProfileId = userProfileResultSet.getInt("user_profile_id")
def keycloakUserId = userProfileResultSet.getString("keycloak_user_id")
def keycloakEmail = userProfileResultSet.getString("keycloak_email")
def keycloakUsername = userProfileResultSet.getString("keycloak_username")
def email = userProfileResultSet.getString("email")
def title = userProfileResultSet.getString("title")
def phone = userProfileResultSet.getString("phone")
def mobilePhone = userProfileResultSet.getString("mobile_phone")
def firstName = userProfileResultSet.getString("first_name")
def lastName = userProfileResultSet.getString("last_name")
def isActive = userProfileResultSet.getBoolean("is_active")
def organizationId = userProfileResultSet.getObject("organization_id") // Nullable
def userProfileId = userProfileResultSet.getInt('user_profile_id')
def keycloakUserId = userProfileResultSet.getString('keycloak_user_id')
def keycloakEmail = userProfileResultSet.getString('keycloak_email')
def keycloakUsername = userProfileResultSet.getString('keycloak_username')
def email = userProfileResultSet.getString('email')
def title = userProfileResultSet.getString('title')
def phone = userProfileResultSet.getString('phone')
def mobilePhone = userProfileResultSet.getString('mobile_phone')
def firstName = userProfileResultSet.getString('first_name')
def lastName = userProfileResultSet.getString('last_name')
def isActive = userProfileResultSet.getBoolean('is_active')
def organizationId = userProfileResultSet.getObject('organization_id') // Nullable

// Bind values to the prepared statement
insertUserProfileStmt.setInt(1, userProfileId)
Expand Down Expand Up @@ -136,25 +202,32 @@ try {
PreparedStatement sourceRoleStmt = sourceConn.prepareStatement(userRoleQuery)
ResultSet userRoleResultSet = sourceRoleStmt.executeQuery()

// Process the result set for user roles
while (userRoleResultSet.next()) {
def userProfileId = userRoleResultSet.getInt("user_profile_id")
def roleName = userRoleResultSet.getString("role_name")

// Bind values to the prepared statement
insertUserRoleStmt.setInt(1, userProfileId)
insertUserRoleStmt.setString(2, roleName)

// Execute the insert/update for user roles
insertUserRoleStmt.executeUpdate()
def userProfileId = userRoleResultSet.getInt('user_profile_id')
def rolesString = userRoleResultSet.getString('roles_string')

if (rolesString) {
def roles = rolesString.split(',')

roles.each { role ->
try {
insertUserRoleStmt.setInt(1, userProfileId)
insertUserRoleStmt.setString(2, role)
insertUserRoleStmt.executeUpdate()
log.info("Successfully inserted role ${role} for user ${userProfileId}")
} catch (Exception e) {
log.error("Failed to insert role ${role} for user ${userProfileId}: ${e.message}")
}
}
} else {
log.warn("No roles found for user ${userProfileId}")
}
}

} catch (Exception e) {
log.error("Error occurred while processing data", e)
log.error('Error occurred during ETL process', e)
} finally {
// Close the connections
if (sourceConn != null) sourceConn.close()
if (destinationConn != null) destinationConn.close()
if (sourceConn) sourceConn.close()
if (destinationConn) destinationConn.close()
}

log.warn("**** COMPLETED USER ETL ****")
log.warn('**** COMPLETED USER ETL ****')