From 303abfe3c36c3d1f7286b89a2917c6e9d895b491 Mon Sep 17 00:00:00 2001 From: Nickz22 Date: Sun, 6 Oct 2024 21:03:32 -0500 Subject: [PATCH] admin login --- client/src/App.js | 16 ++-- client/src/components/Api/Api.js | 29 +++++-- client/src/pages/AdminLogin.js | 77 ++++++++++++++++++ client/src/pages/Login.js | 27 +++++- server/app/data_models.py | 5 +- server/app/database/session_selector.py | 46 ++++++++++- server/app/routes.py | 48 ++++++++++- server/app/salesforce_api.py | 104 +++++++++++++++++++----- 8 files changed, 313 insertions(+), 39 deletions(-) create mode 100644 client/src/pages/AdminLogin.js diff --git a/client/src/App.js b/client/src/App.js index ca1c5ba..500e123 100644 --- a/client/src/App.js +++ b/client/src/App.js @@ -12,6 +12,7 @@ import MainContent from "./components/MainContent"; import Login from "./pages/Login"; import Onboard from "./pages/Onboard"; import "./App.css"; +import AdminLogin from "./pages/AdminLogin"; const AppRoutes = () => { const navigate = useNavigate(); @@ -35,6 +36,7 @@ const AppRoutes = () => { return ( } /> + } /> } /> { if (error.response) { - if (error.response.data.message.toLowerCase().includes("session")) { window.location.href = "/"; return Promise.reject(error.response.data); @@ -142,7 +141,7 @@ export const getRefreshToken = async () => { export const fetchProspectingActivities = async (period, filterIds = []) => { const response = await api.post("/get_prospecting_activities_by_ids", { period, - filterIds + filterIds, }); return { ...response.data, statusCode: response.status }; }; @@ -155,12 +154,17 @@ export const fetchProspectingActivities = async (period, filterIds = []) => { * @param {string} searchTerm - Search term * @returns {Promise} */ -export const getPaginatedProspectingActivities = async (filterIds = [], page = 0, rowsPerPage = 10, searchTerm = "") => { +export const getPaginatedProspectingActivities = async ( + filterIds = [], + page = 0, + rowsPerPage = 10, + searchTerm = "" +) => { const response = await api.post("/get_paginated_prospecting_activities", { filterIds, page, rowsPerPage, - searchTerm + searchTerm, }); return { ...response.data, statusCode: response.status }; }; @@ -414,4 +418,19 @@ export const pauseStripePaymentSchedule = async (userId, email) => { email, }); return { ...response.data, statusCode: response.status }; -}; \ No newline at end of file +}; + +/** + * Performs an admin login with the given user ID + * @param {string} userId - The ID of the user to login as + * @returns {Promise} + */ +export const adminLogin = async (userId) => { + try { + const response = await api.post("/admin_login", { userId }); + return { ...response.data, statusCode: response.status }; + } catch (error) { + console.error("Error during admin login:", error); + throw error; + } +}; diff --git a/client/src/pages/AdminLogin.js b/client/src/pages/AdminLogin.js new file mode 100644 index 0000000..951b144 --- /dev/null +++ b/client/src/pages/AdminLogin.js @@ -0,0 +1,77 @@ +import React, { useState } from "react"; +import { Button, Box, Container, Typography, TextField } from "@mui/material"; +import logo from "../assets/images/logo.jpeg"; +import { adminLogin } from "../components/Api/Api"; + +const AdminLogin = () => { + const [userId, setUserId] = useState(""); + const [error, setError] = useState(""); + + const handleAdminLogin = async (e) => { + e.preventDefault(); + + if (!userId.trim()) { + setError("Please enter a User ID"); + return; + } + + try { + const response = await adminLogin(userId.trim()); + + if (response.success) { + // Store the session token + localStorage.setItem("sessionToken", response.session_token); + // Redirect to the main application page or dashboard + window.location.href = "/app/prospecting"; + } else { + setError(response.error || "An error occurred during login"); + } + } catch (error) { + console.error("Error during admin login:", error); + setError("An unexpected error occurred. Please try again."); + } + }; + + return ( + + + InsideOutbound Logo + + Admin Login + + setUserId(e.target.value)} + sx={{ marginBottom: "20px" }} + /> + {error && ( + + {error} + + )} + + + + ); +}; + +export default AdminLogin; diff --git a/client/src/pages/Login.js b/client/src/pages/Login.js index cbae4a8..3e88b58 100644 --- a/client/src/pages/Login.js +++ b/client/src/pages/Login.js @@ -1,11 +1,14 @@ import React from "react"; import { Button, Box, Container } from "@mui/material"; +import { useNavigate } from "react-router-dom"; import logo from "../assets/images/logo.jpeg"; import { generatePKCEChallenge } from "../utils/crypto"; import config from "../config"; const Login = () => { - const handleLogin = async (e, loginUrlBase) => { + const navigate = useNavigate(); + + const handleLogin = async (e, loginUrlBase, isAdmin = false) => { e.preventDefault(); const { codeVerifier, codeChallenge } = await generatePKCEChallenge(); @@ -16,11 +19,12 @@ const Login = () => { const clientId = config.clientId; const redirectUri = `${config.apiBaseUrl}/oauth/callback`; - // Encode the code verifier and sandbox flag in the state parameter + // Encode the code verifier, sandbox flag, and admin flag in the state parameter const state = encodeURIComponent( JSON.stringify({ codeVerifier, isSandbox, + isAdmin, }) ); @@ -33,6 +37,9 @@ const Login = () => { console.error("Error starting auth session:", error); } }; + + const showAdminLogin = process.env.REACT_APP_SHOW_ADMIN_LOGIN === "true"; + return ( @@ -64,11 +71,27 @@ const Login = () => { padding: "10px 20px", fontSize: "16px", display: "block", + marginBottom: "10px", width: "15rem", }} > Login to Sandbox + {showAdminLogin && ( + + )} diff --git a/server/app/data_models.py b/server/app/data_models.py index 3ec46c5..11cc848 100644 --- a/server/app/data_models.py +++ b/server/app/data_models.py @@ -368,6 +368,7 @@ class SettingsModel(SerializableModel): skipOpportunityCriteria: Optional[FilterContainerModel] = None userTimeZone: Optional[str] = None + class DataType(str, Enum): STRING = "string" NUMBER = "number" @@ -376,6 +377,7 @@ class DataType(str, Enum): SELECT = "select" IMAGE = "image" + class SObjectFieldModel(SerializableModel): type: str name: str @@ -395,7 +397,8 @@ class TokenData(SerializableModel): id: str token_type: str issued_at: str - + + class SessionState(SerializableModel): salesforce_id: str access_token: str diff --git a/server/app/database/session_selector.py b/server/app/database/session_selector.py index 5856967..1332715 100644 --- a/server/app/database/session_selector.py +++ b/server/app/database/session_selector.py @@ -1,8 +1,9 @@ from app.database.supabase_connection import get_supabase_admin_client -from app.data_models import AuthenticationError - +from app.data_models import AuthenticationError, ApiResponse +import json from app.database.supabase_retry import retry_on_temporary_unavailable + @retry_on_temporary_unavailable() def fetch_supabase_session(session_id: str): try: @@ -26,3 +27,44 @@ def fetch_supabase_session(session_id: str): raise # For any other exception, wrap it in an AuthenticationError raise AuthenticationError(f"Error fetching session: {str(e)}") + + +def fetch_session_by_salesforce_id(salesforce_id: str) -> ApiResponse: + try: + supabase = get_supabase_admin_client() + + # Query all sessions, ordered by expiry descending + response = supabase.table("Session").select("*").order("expiry", desc=True).execute() + + if not response.data: + return ApiResponse(success=False, message="No sessions found") + + # Filter sessions based on the salesforce_id in the state + matching_session = next( + ( + session + for session in response.data + if json.loads(session["state"]).get("salesforce_id") == salesforce_id + ), + None, + ) + + if matching_session: + # Extract the state dict + state_dict = json.loads(matching_session["state"]) + + # Extract the refresh_token + refresh_token = state_dict.get("refresh_token") + + return ApiResponse( + success=True, + data=[{"session": matching_session, "refresh_token": refresh_token}], + ) + else: + return ApiResponse( + success=False, + message=f"No session found for Salesforce ID: {salesforce_id}", + ) + + except Exception as e: + return ApiResponse(success=False, message=f"Error fetching session: {str(e)}") diff --git a/server/app/routes.py b/server/app/routes.py index 922411c..8884e2b 100644 --- a/server/app/routes.py +++ b/server/app/routes.py @@ -36,16 +36,22 @@ fetch_events_by_user_ids, fetch_logged_in_salesforce_user, get_task_query_count, + refresh_access_token, ) from config import Config from app.database.supabase_connection import ( get_supabase_admin_client, get_session_state, ) -from app.database.session_selector import fetch_supabase_session +from app.database.session_selector import ( + fetch_supabase_session, + fetch_session_by_salesforce_id, +) import stripe import asyncio from datetime import datetime +from app.data_models import TokenData +import logging stripe.api_key = Config.STRIPE_SECRET_KEY @@ -742,6 +748,46 @@ def set_supabase_user_status_to_paid(): return jsonify(api_response.to_dict()), get_status_code(api_response) +@bp.route("/admin_login", methods=["POST"]) +def admin_login(): + user_id = request.json.get("userId") + if not user_id: + return jsonify({"error": "User ID is required"}), 400 + + response = fetch_session_by_salesforce_id(user_id) + + if response.success: + session_data = response.data[0] + session = session_data["session"] + refresh_token = session_data["refresh_token"] + state_dict = json.loads(session["state"]) + is_sandbox = state_dict.get("is_sandbox", False) + + # Refresh the access token + refresh_response = refresh_access_token(refresh_token, is_sandbox) + if not refresh_response.success: + logging.error(f"Failed to refresh token: {refresh_response.message}") + if refresh_response.error_details: + logging.error(f"Error details: {refresh_response.error_details}") + return ( + jsonify( + { + "error": "Failed to refresh access token. Please try logging in again." + } + ), + 401, + ) + + new_token_data: TokenData = refresh_response.data[0] + + # Save the new session + new_session_token = save_session(new_token_data, is_sandbox) + + return jsonify({"success": True, "session_token": new_session_token}), 200 + else: + return jsonify({"error": response.message}), 404 + + @bp.app_errorhandler(Exception) def handle_exception(e): from app.data_models import AuthenticationError diff --git a/server/app/salesforce_api.py b/server/app/salesforce_api.py index 64d47a3..bff40c7 100644 --- a/server/app/salesforce_api.py +++ b/server/app/salesforce_api.py @@ -13,10 +13,13 @@ SObjectFieldModel, UserModel, UserSObject, + TokenData, ) from app.database.supabase_connection import get_session_state from app.constants import SESSION_EXPIRED, FILTER_OPERATOR_MAPPING import concurrent.futures +from config import Config +import logging def get_credentials(): @@ -277,6 +280,7 @@ def _process_contacts(contacts): import time + async def fetch_contact_by_id_map(contact_ids: List[str]) -> Dict[str, str]: start_time = time.time() print(f"Starting fetch_contact_by_id_map for {len(contact_ids)} contacts") @@ -299,28 +303,30 @@ async def fetch_contact_by_id_map(contact_ids: List[str]) -> Dict[str, str]: results = await fetch_contact_composite_batch(batch, session) for result in results: contact_by_id.update(result) - + if i < len(composite_batches) - 1: # Don't wait after the last batch await asyncio.sleep(0.5) # 0.5 second delay between composite batches end_time = time.time() - print(f"Completed fetch_contact_by_id_map. Total contacts processed: {len(contact_by_id)}. Time taken: {end_time - start_time:.2f} seconds") + print( + f"Completed fetch_contact_by_id_map. Total contacts processed: {len(contact_by_id)}. Time taken: {end_time - start_time:.2f} seconds" + ) return contact_by_id + import logging # Configure logging logging.basicConfig( level=logging.INFO, - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", ) # Create logger logger = logging.getLogger(__name__) - async def fetch_contact_composite_batch( contact_batches: List[List[str]], session: aiohttp.ClientSession ) -> List[Dict[str, Account]]: @@ -331,23 +337,29 @@ async def fetch_contact_composite_batch( async def make_request_with_retry(request_func, *args, **kwargs): for attempt in range(MAX_RETRIES): try: - return await asyncio.wait_for(request_func(*args, **kwargs), timeout=TIMEOUT) + return await asyncio.wait_for( + request_func(*args, **kwargs), timeout=TIMEOUT + ) except asyncio.TimeoutError: if attempt == MAX_RETRIES - 1: raise - logger.warning(f"Request timed out after {TIMEOUT} seconds. Retrying in {RETRY_DELAY} seconds... (Attempt {attempt + 1}/{MAX_RETRIES})") + logger.warning( + f"Request timed out after {TIMEOUT} seconds. Retrying in {RETRY_DELAY} seconds... (Attempt {attempt + 1}/{MAX_RETRIES})" + ) await asyncio.sleep(RETRY_DELAY) except Exception as e: if attempt == MAX_RETRIES - 1: raise - logger.warning(f"Request failed: {str(e)}. Retrying in {RETRY_DELAY} seconds... (Attempt {attempt + 1}/{MAX_RETRIES})") + logger.warning( + f"Request failed: {str(e)}. Retrying in {RETRY_DELAY} seconds... (Attempt {attempt + 1}/{MAX_RETRIES})" + ) await asyncio.sleep(RETRY_DELAY) async def make_composite_request(): async with session.post( f"{instance_url}/services/data/v55.0/composite", json=composite_request, - headers=headers + headers=headers, ) as response: if response.status != 200: raise Exception(f"API request failed: {await response.text()}") @@ -356,8 +368,7 @@ async def make_composite_request(): async def fetch_next_records(url): await asyncio.sleep(0.5) # 0.5 second delay before each pagination request async with session.get( - f"{instance_url}{url}", - headers=headers + f"{instance_url}{url}", headers=headers ) as next_response: if next_response.status != 200: raise Exception(f"API request failed: {await next_response.text()}") @@ -366,7 +377,7 @@ async def fetch_next_records(url): access_token, instance_url = get_credentials() if not access_token or not instance_url: raise Exception("Session expired") - + print(f"Fetching contacts for batch with {len(contact_batches)} batches") composite_request = {"allOrNone": False, "compositeRequest": []} @@ -416,7 +427,9 @@ async def fetch_next_records(url): start_time = time.time() try: data = await make_request_with_retry(make_composite_request) - logger.info(f"Composite request successful. Time taken: {time.time() - start_time:.2f} seconds") + logger.info( + f"Composite request successful. Time taken: {time.time() - start_time:.2f} seconds" + ) except Exception as e: logger.error(f"Composite request failed after {MAX_RETRIES} attempts: {str(e)}") return [] @@ -436,13 +449,21 @@ async def fetch_next_records(url): while next_records_url: print(f"Fetching next records url (page {page_count}): {next_records_url}") try: - next_data = await make_request_with_retry(fetch_next_records, next_records_url) - logger.info(f"Next records request successful. Time taken: {time.time() - start_time:.2f} seconds") + next_data = await make_request_with_retry( + fetch_next_records, next_records_url + ) + logger.info( + f"Next records request successful. Time taken: {time.time() - start_time:.2f} seconds" + ) except Exception as e: - logger.error(f"Next records request failed after {MAX_RETRIES} attempts: {str(e)}") + logger.error( + f"Next records request failed after {MAX_RETRIES} attempts: {str(e)}" + ) break - print(f"Processing {len(next_data['records'])} contacts from page {page_count}") + print( + f"Processing {len(next_data['records'])} contacts from page {page_count}" + ) for contact in next_data["records"]: if contact.get("AccountId"): result[contact["Id"]] = _process_contact(contact) @@ -452,7 +473,7 @@ async def fetch_next_records(url): print(f"Finished processing batch. Total contacts: {len(result)}") results.append(result) - + print(f"Completed fetch_contact_composite_batch. Total batches: {len(results)}") return results @@ -748,9 +769,8 @@ def fetch_logged_in_salesforce_user() -> ApiResponse: api_response.success = True except requests.exceptions.RequestException as e: api_response.message = f"Failed to fetch logged in user ID: {str(e)}" - if ( - isinstance(e, requests.exceptions.HTTPError) - and (e.response.status_code == 401 or e.response.status_code == 403) + if isinstance(e, requests.exceptions.HTTPError) and ( + e.response.status_code == 401 or e.response.status_code == 403 ): api_response.message = SESSION_EXPIRED api_response.type = "AuthenticationError" @@ -949,3 +969,45 @@ def get_http_error_message(response): return response.json().get("error", "An error occurred") else: return response.json()[0].get("message", "An error occurred") + + +def refresh_access_token(refresh_token: str, is_sandbox: bool) -> ApiResponse: + base_sf_domain = "test" if is_sandbox else "login" + token_url = f"https://{base_sf_domain}.salesforce.com/services/oauth2/token" + + payload = { + "grant_type": "refresh_token", + "client_id": Config.CLIENT_ID, + "client_secret": Config.CLIENT_SECRET, + "refresh_token": refresh_token, + } + + try: + response = requests.post(token_url, data=payload) + response.raise_for_status() + token_data = response.json() + + logging.info("Successfully refreshed access token") + + # Create TokenData object, setting refresh_token only if it's in the response + token_data_obj = TokenData( + access_token=token_data["access_token"], + refresh_token=token_data.get( + "refresh_token", refresh_token + ), # Use old refresh_token if not provided + instance_url=token_data["instance_url"], + id=token_data["id"], + token_type=token_data["token_type"], + issued_at=token_data["issued_at"], + ) + + return ApiResponse(success=True, data=[token_data_obj]) + except requests.RequestException as e: + logging.error(f"Failed to refresh access token: {str(e)}") + if e.response is not None: + logging.error(f"Response content: {e.response.content}") + return ApiResponse( + success=False, + message=f"Failed to refresh access token: {str(e)}", + error_details=e.response.json() if e.response is not None else None, + )