Skip to content

Commit

Permalink
Merge pull request #1600 from bcgov/fix/prashanth-etl-role-map-1594
Browse files Browse the repository at this point in the history
Fix: LCFS - Fix ETL Role Mapping Between TFRS and LCFS #1594
  • Loading branch information
prv-proton authored Jan 4, 2025
2 parents 1d72287 + c34a898 commit 26b0bbb
Showing 1 changed file with 126 additions and 53 deletions.
179 changes: 126 additions & 53 deletions etl/nifi_scripts/user.groovy
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import groovy.json.JsonSlurper

log.warn("**** STARTING USER ETL ****")
log.warn('**** STARTING USER ETL ****')

// SQL query to extract user profiles
def userProfileQuery = """
Expand All @@ -17,33 +18,98 @@ def userProfileQuery = """
first_name,
last_name,
is_active,
CASE WHEN organization_id = 1 THEN null ELSE organization_id END as organization_id
CASE WHEN organization_id = 1 THEN NULL ELSE organization_id END as organization_id
FROM public.user;
"""

// SQL query to extract user roles
def userRoleQuery = """
SELECT ur.user_id as user_profile_id,
CASE
WHEN r.name = 'Admin' THEN 'ADMINISTRATOR'
WHEN r.name = 'GovUser' THEN 'ANALYST'
WHEN r.name IN ('GovDirector', 'GovDeputyDirector') THEN 'DIRECTOR'
WHEN r.name = 'GovComplianceManager' THEN 'COMPLIANCE_MANAGER'
WHEN r.name = 'FSAdmin' THEN 'MANAGE_USERS'
WHEN r.name = 'FSUser' THEN 'TRANSFER'
WHEN r.name = 'FSManager' THEN 'SIGNING_AUTHORITY'
WHEN r.name = 'FSNoAccess' THEN 'READ_ONLY'
WHEN r.name = 'ComplianceReporting' THEN 'COMPLIANCE_REPORTING'
ELSE NULL
END AS role_name
FROM public.user u
INNER JOIN user_role ur ON ur.user_id = u.id
INNER JOIN role r ON r.id = ur.role_id
WHERE r.name NOT IN ('FSDocSubmit', 'GovDoc');
WITH RoleData AS (
SELECT ur.user_id AS user_profile_id,
CASE
-- Government Roles
WHEN u.organization_id = 1 THEN
CASE
WHEN r.name = 'Admin' THEN 'ADMINISTRATOR'
WHEN r.name = 'GovUser' THEN 'ANALYST'
WHEN r.name IN ('GovDirector', 'GovDeputyDirector') THEN 'DIRECTOR'
WHEN r.name = 'GovComplianceManager' THEN 'COMPLIANCE_MANAGER'
END
-- Supplier Roles
WHEN u.organization_id > 1 THEN
CASE
WHEN r.name = 'FSAdmin' THEN 'MANAGE_USERS'
WHEN r.name = 'FSUser' THEN 'TRANSFER'
WHEN r.name = 'FSManager' THEN 'SIGNING_AUTHORITY'
WHEN r.name = 'FSNoAccess' THEN 'READ_ONLY'
WHEN r.name = 'ComplianceReporting' THEN 'COMPLIANCE_REPORTING'
END
END AS role_name,
u.organization_id
FROM public.user u
INNER JOIN user_role ur ON ur.user_id = u.id
INNER JOIN role r ON r.id = ur.role_id
WHERE r.name NOT IN ('FSDocSubmit', 'GovDoc')
),
FilteredRoles AS (
SELECT user_profile_id,
organization_id,
ARRAY_AGG(role_name) AS roles
FROM RoleData
WHERE role_name IS NOT NULL
GROUP BY user_profile_id, organization_id
),
ProcessedRoles AS (
SELECT
user_profile_id,
CASE
-- Rule 1: Government Users
WHEN organization_id = 1 THEN
CASE
-- Retain Administrator and one prioritized gov role
WHEN 'ADMINISTRATOR' = ANY(roles) THEN
ARRAY_REMOVE(ARRAY[
'ADMINISTRATOR',
CASE
WHEN 'DIRECTOR' = ANY(roles) THEN 'DIRECTOR'
WHEN 'COMPLIANCE_MANAGER' = ANY(roles) THEN 'COMPLIANCE_MANAGER'
WHEN 'ANALYST' = ANY(roles) THEN 'ANALYST'
END
], NULL)
-- Priority among gov roles (no Administrator)
ELSE ARRAY_REMOVE(ARRAY[
CASE
WHEN 'DIRECTOR' = ANY(roles) THEN 'DIRECTOR'
WHEN 'COMPLIANCE_MANAGER' = ANY(roles) THEN 'COMPLIANCE_MANAGER'
WHEN 'ANALYST' = ANY(roles) THEN 'ANALYST'
END
], NULL)
END
-- Rule 2: Supplier Users
WHEN organization_id > 1 THEN
CASE
-- Return empty array if READ_ONLY exists
WHEN 'READ_ONLY' = ANY(roles) THEN
ARRAY[]::text[]
ELSE ARRAY(
SELECT UNNEST(roles)
EXCEPT
SELECT UNNEST(ARRAY['ADMINISTRATOR', 'ANALYST', 'DIRECTOR', 'COMPLIANCE_MANAGER'])
)
END
END AS filtered_roles,
organization_id
FROM FilteredRoles
)
SELECT
user_profile_id,
organization_id,
array_to_string(filtered_roles, ',') as roles_string
FROM ProcessedRoles;
"""

// SQL queries to insert user profiles and roles into destination tables with ON CONFLICT handling
def insertUserProfileSQL = """
def insertUserProfileSQL = '''
INSERT INTO user_profile (user_profile_id, keycloak_user_id, keycloak_email, keycloak_username, email, title, phone, mobile_phone, first_name, last_name, is_active, organization_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (user_profile_id) DO UPDATE
Expand All @@ -58,17 +124,17 @@ def insertUserProfileSQL = """
last_name = EXCLUDED.last_name,
is_active = EXCLUDED.is_active,
organization_id = EXCLUDED.organization_id;
"""
'''

def insertUserRoleSQL = """
def insertUserRoleSQL = '''
INSERT INTO user_role (user_profile_id, role_id)
VALUES (?, (SELECT role_id FROM role WHERE name = ?::role_enum))
ON CONFLICT (user_profile_id, role_id) DO NOTHING;
"""
'''

// Fetch connections to both source and destination databases
def sourceDbcpService = context.controllerServiceLookup.getControllerService("3245b078-0192-1000-ffff-ffffba20c1eb")
def destinationDbcpService = context.controllerServiceLookup.getControllerService("3244bf63-0192-1000-ffff-ffffc8ec6d93")
def sourceDbcpService = context.controllerServiceLookup.getControllerService('3245b078-0192-1000-ffff-ffffba20c1eb')
def destinationDbcpService = context.controllerServiceLookup.getControllerService('3244bf63-0192-1000-ffff-ffffc8ec6d93')

Connection sourceConn = null
Connection destinationConn = null
Expand All @@ -88,18 +154,18 @@ try {

// Process the result set for user profiles
while (userProfileResultSet.next()) {
def userProfileId = userProfileResultSet.getInt("user_profile_id")
def keycloakUserId = userProfileResultSet.getString("keycloak_user_id")
def keycloakEmail = userProfileResultSet.getString("keycloak_email")
def keycloakUsername = userProfileResultSet.getString("keycloak_username")
def email = userProfileResultSet.getString("email")
def title = userProfileResultSet.getString("title")
def phone = userProfileResultSet.getString("phone")
def mobilePhone = userProfileResultSet.getString("mobile_phone")
def firstName = userProfileResultSet.getString("first_name")
def lastName = userProfileResultSet.getString("last_name")
def isActive = userProfileResultSet.getBoolean("is_active")
def organizationId = userProfileResultSet.getObject("organization_id") // Nullable
def userProfileId = userProfileResultSet.getInt('user_profile_id')
def keycloakUserId = userProfileResultSet.getString('keycloak_user_id')
def keycloakEmail = userProfileResultSet.getString('keycloak_email')
def keycloakUsername = userProfileResultSet.getString('keycloak_username')
def email = userProfileResultSet.getString('email')
def title = userProfileResultSet.getString('title')
def phone = userProfileResultSet.getString('phone')
def mobilePhone = userProfileResultSet.getString('mobile_phone')
def firstName = userProfileResultSet.getString('first_name')
def lastName = userProfileResultSet.getString('last_name')
def isActive = userProfileResultSet.getBoolean('is_active')
def organizationId = userProfileResultSet.getObject('organization_id') // Nullable

// Bind values to the prepared statement
insertUserProfileStmt.setInt(1, userProfileId)
Expand Down Expand Up @@ -136,25 +202,32 @@ try {
PreparedStatement sourceRoleStmt = sourceConn.prepareStatement(userRoleQuery)
ResultSet userRoleResultSet = sourceRoleStmt.executeQuery()

// Process the result set for user roles
while (userRoleResultSet.next()) {
def userProfileId = userRoleResultSet.getInt("user_profile_id")
def roleName = userRoleResultSet.getString("role_name")

// Bind values to the prepared statement
insertUserRoleStmt.setInt(1, userProfileId)
insertUserRoleStmt.setString(2, roleName)

// Execute the insert/update for user roles
insertUserRoleStmt.executeUpdate()
def userProfileId = userRoleResultSet.getInt('user_profile_id')
def rolesString = userRoleResultSet.getString('roles_string')

if (rolesString) {
def roles = rolesString.split(',')

roles.each { role ->
try {
insertUserRoleStmt.setInt(1, userProfileId)
insertUserRoleStmt.setString(2, role)
insertUserRoleStmt.executeUpdate()
log.info("Successfully inserted role ${role} for user ${userProfileId}")
} catch (Exception e) {
log.error("Failed to insert role ${role} for user ${userProfileId}: ${e.message}")
}
}
} else {
log.warn("No roles found for user ${userProfileId}")
}
}

} catch (Exception e) {
log.error("Error occurred while processing data", e)
log.error('Error occurred during ETL process', e)
} finally {
// Close the connections
if (sourceConn != null) sourceConn.close()
if (destinationConn != null) destinationConn.close()
if (sourceConn) sourceConn.close()
if (destinationConn) destinationConn.close()
}

log.warn("**** COMPLETED USER ETL ****")
log.warn('**** COMPLETED USER ETL ****')

0 comments on commit 26b0bbb

Please sign in to comment.