diff --git a/etl/nifi_scripts/user.groovy b/etl/nifi_scripts/user.groovy index 77fb9bdf7..4c6ce85c2 100644 --- a/etl/nifi_scripts/user.groovy +++ b/etl/nifi_scripts/user.groovy @@ -1,8 +1,9 @@ import java.sql.Connection import java.sql.PreparedStatement import java.sql.ResultSet +import groovy.json.JsonSlurper -log.warn("**** STARTING USER ETL ****") +log.warn('**** STARTING USER ETL ****') // SQL query to extract user profiles def userProfileQuery = """ @@ -17,33 +18,98 @@ def userProfileQuery = """ first_name, last_name, is_active, - CASE WHEN organization_id = 1 THEN null ELSE organization_id END as organization_id + CASE WHEN organization_id = 1 THEN NULL ELSE organization_id END as organization_id FROM public.user; """ // SQL query to extract user roles def userRoleQuery = """ - SELECT ur.user_id as user_profile_id, - CASE - WHEN r.name = 'Admin' THEN 'ADMINISTRATOR' - WHEN r.name = 'GovUser' THEN 'ANALYST' - WHEN r.name IN ('GovDirector', 'GovDeputyDirector') THEN 'DIRECTOR' - WHEN r.name = 'GovComplianceManager' THEN 'COMPLIANCE_MANAGER' - WHEN r.name = 'FSAdmin' THEN 'MANAGE_USERS' - WHEN r.name = 'FSUser' THEN 'TRANSFER' - WHEN r.name = 'FSManager' THEN 'SIGNING_AUTHORITY' - WHEN r.name = 'FSNoAccess' THEN 'READ_ONLY' - WHEN r.name = 'ComplianceReporting' THEN 'COMPLIANCE_REPORTING' - ELSE NULL - END AS role_name - FROM public.user u - INNER JOIN user_role ur ON ur.user_id = u.id - INNER JOIN role r ON r.id = ur.role_id - WHERE r.name NOT IN ('FSDocSubmit', 'GovDoc'); + WITH RoleData AS ( + SELECT ur.user_id AS user_profile_id, + CASE + -- Government Roles + WHEN u.organization_id = 1 THEN + CASE + WHEN r.name = 'Admin' THEN 'ADMINISTRATOR' + WHEN r.name = 'GovUser' THEN 'ANALYST' + WHEN r.name IN ('GovDirector', 'GovDeputyDirector') THEN 'DIRECTOR' + WHEN r.name = 'GovComplianceManager' THEN 'COMPLIANCE_MANAGER' + END + -- Supplier Roles + WHEN u.organization_id > 1 THEN + CASE + WHEN r.name = 'FSAdmin' THEN 'MANAGE_USERS' + WHEN r.name = 'FSUser' THEN 'TRANSFER' + WHEN r.name = 'FSManager' THEN 'SIGNING_AUTHORITY' + WHEN r.name = 'FSNoAccess' THEN 'READ_ONLY' + WHEN r.name = 'ComplianceReporting' THEN 'COMPLIANCE_REPORTING' + END + END AS role_name, + u.organization_id + FROM public.user u + INNER JOIN user_role ur ON ur.user_id = u.id + INNER JOIN role r ON r.id = ur.role_id + WHERE r.name NOT IN ('FSDocSubmit', 'GovDoc') + ), + FilteredRoles AS ( + SELECT user_profile_id, + organization_id, + ARRAY_AGG(role_name) AS roles + FROM RoleData + WHERE role_name IS NOT NULL + GROUP BY user_profile_id, organization_id + ), + ProcessedRoles AS ( + SELECT + user_profile_id, + CASE + -- Rule 1: Government Users + WHEN organization_id = 1 THEN + CASE + -- Retain Administrator and one prioritized gov role + WHEN 'ADMINISTRATOR' = ANY(roles) THEN + ARRAY_REMOVE(ARRAY[ + 'ADMINISTRATOR', + CASE + WHEN 'DIRECTOR' = ANY(roles) THEN 'DIRECTOR' + WHEN 'COMPLIANCE_MANAGER' = ANY(roles) THEN 'COMPLIANCE_MANAGER' + WHEN 'ANALYST' = ANY(roles) THEN 'ANALYST' + END + ], NULL) + -- Priority among gov roles (no Administrator) + ELSE ARRAY_REMOVE(ARRAY[ + CASE + WHEN 'DIRECTOR' = ANY(roles) THEN 'DIRECTOR' + WHEN 'COMPLIANCE_MANAGER' = ANY(roles) THEN 'COMPLIANCE_MANAGER' + WHEN 'ANALYST' = ANY(roles) THEN 'ANALYST' + END + ], NULL) + END + -- Rule 2: Supplier Users + WHEN organization_id > 1 THEN + CASE + -- Return empty array if READ_ONLY exists + WHEN 'READ_ONLY' = ANY(roles) THEN + ARRAY[]::text[] + ELSE ARRAY( + SELECT UNNEST(roles) + EXCEPT + SELECT UNNEST(ARRAY['ADMINISTRATOR', 'ANALYST', 'DIRECTOR', 'COMPLIANCE_MANAGER']) + ) + END + END AS filtered_roles, + organization_id + FROM FilteredRoles + ) + SELECT + user_profile_id, + organization_id, + array_to_string(filtered_roles, ',') as roles_string + FROM ProcessedRoles; """ // SQL queries to insert user profiles and roles into destination tables with ON CONFLICT handling -def insertUserProfileSQL = """ +def insertUserProfileSQL = ''' INSERT INTO user_profile (user_profile_id, keycloak_user_id, keycloak_email, keycloak_username, email, title, phone, mobile_phone, first_name, last_name, is_active, organization_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (user_profile_id) DO UPDATE @@ -58,17 +124,17 @@ def insertUserProfileSQL = """ last_name = EXCLUDED.last_name, is_active = EXCLUDED.is_active, organization_id = EXCLUDED.organization_id; -""" +''' -def insertUserRoleSQL = """ +def insertUserRoleSQL = ''' INSERT INTO user_role (user_profile_id, role_id) VALUES (?, (SELECT role_id FROM role WHERE name = ?::role_enum)) ON CONFLICT (user_profile_id, role_id) DO NOTHING; -""" +''' // Fetch connections to both source and destination databases -def sourceDbcpService = context.controllerServiceLookup.getControllerService("3245b078-0192-1000-ffff-ffffba20c1eb") -def destinationDbcpService = context.controllerServiceLookup.getControllerService("3244bf63-0192-1000-ffff-ffffc8ec6d93") +def sourceDbcpService = context.controllerServiceLookup.getControllerService('3245b078-0192-1000-ffff-ffffba20c1eb') +def destinationDbcpService = context.controllerServiceLookup.getControllerService('3244bf63-0192-1000-ffff-ffffc8ec6d93') Connection sourceConn = null Connection destinationConn = null @@ -88,18 +154,18 @@ try { // Process the result set for user profiles while (userProfileResultSet.next()) { - def userProfileId = userProfileResultSet.getInt("user_profile_id") - def keycloakUserId = userProfileResultSet.getString("keycloak_user_id") - def keycloakEmail = userProfileResultSet.getString("keycloak_email") - def keycloakUsername = userProfileResultSet.getString("keycloak_username") - def email = userProfileResultSet.getString("email") - def title = userProfileResultSet.getString("title") - def phone = userProfileResultSet.getString("phone") - def mobilePhone = userProfileResultSet.getString("mobile_phone") - def firstName = userProfileResultSet.getString("first_name") - def lastName = userProfileResultSet.getString("last_name") - def isActive = userProfileResultSet.getBoolean("is_active") - def organizationId = userProfileResultSet.getObject("organization_id") // Nullable + def userProfileId = userProfileResultSet.getInt('user_profile_id') + def keycloakUserId = userProfileResultSet.getString('keycloak_user_id') + def keycloakEmail = userProfileResultSet.getString('keycloak_email') + def keycloakUsername = userProfileResultSet.getString('keycloak_username') + def email = userProfileResultSet.getString('email') + def title = userProfileResultSet.getString('title') + def phone = userProfileResultSet.getString('phone') + def mobilePhone = userProfileResultSet.getString('mobile_phone') + def firstName = userProfileResultSet.getString('first_name') + def lastName = userProfileResultSet.getString('last_name') + def isActive = userProfileResultSet.getBoolean('is_active') + def organizationId = userProfileResultSet.getObject('organization_id') // Nullable // Bind values to the prepared statement insertUserProfileStmt.setInt(1, userProfileId) @@ -136,25 +202,32 @@ try { PreparedStatement sourceRoleStmt = sourceConn.prepareStatement(userRoleQuery) ResultSet userRoleResultSet = sourceRoleStmt.executeQuery() - // Process the result set for user roles while (userRoleResultSet.next()) { - def userProfileId = userRoleResultSet.getInt("user_profile_id") - def roleName = userRoleResultSet.getString("role_name") - - // Bind values to the prepared statement - insertUserRoleStmt.setInt(1, userProfileId) - insertUserRoleStmt.setString(2, roleName) - - // Execute the insert/update for user roles - insertUserRoleStmt.executeUpdate() + def userProfileId = userRoleResultSet.getInt('user_profile_id') + def rolesString = userRoleResultSet.getString('roles_string') + + if (rolesString) { + def roles = rolesString.split(',') + + roles.each { role -> + try { + insertUserRoleStmt.setInt(1, userProfileId) + insertUserRoleStmt.setString(2, role) + insertUserRoleStmt.executeUpdate() + log.info("Successfully inserted role ${role} for user ${userProfileId}") + } catch (Exception e) { + log.error("Failed to insert role ${role} for user ${userProfileId}: ${e.message}") + } + } + } else { + log.warn("No roles found for user ${userProfileId}") + } } - } catch (Exception e) { - log.error("Error occurred while processing data", e) + log.error('Error occurred during ETL process', e) } finally { - // Close the connections - if (sourceConn != null) sourceConn.close() - if (destinationConn != null) destinationConn.close() + if (sourceConn) sourceConn.close() + if (destinationConn) destinationConn.close() } -log.warn("**** COMPLETED USER ETL ****") +log.warn('**** COMPLETED USER ETL ****')