From e12c454bb2d85dc0fd0789965cbdb544929140d2 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 30 Jan 2025 17:09:47 -0500 Subject: [PATCH] fix(notifications): contextual override for WebSocket notification URL (#1544) --- src/app/AppLayout/AppLayout.tsx | 3 +- src/app/Shared/Services/Api.service.tsx | 333 ++++++++++-------- .../Services/NotificationChannel.service.tsx | 77 +++- src/app/Shared/Services/Services.tsx | 6 +- src/app/utils/fakeData.ts | 2 +- src/mirage/index.ts | 7 +- 6 files changed, 263 insertions(+), 165 deletions(-) diff --git a/src/app/AppLayout/AppLayout.tsx b/src/app/AppLayout/AppLayout.tsx index c60dd8871..0d3470af8 100644 --- a/src/app/AppLayout/AppLayout.tsx +++ b/src/app/AppLayout/AppLayout.tsx @@ -131,7 +131,8 @@ export const AppLayout: React.FC = ({ children }) => { React.useEffect(() => { serviceContext.api.testBaseServer(); - }, [serviceContext.api]); + serviceContext.notificationChannel.connect(); + }, [serviceContext.api, serviceContext.notificationChannel]); React.useEffect(() => { addSubscription( diff --git a/src/app/Shared/Services/Api.service.tsx b/src/app/Shared/Services/Api.service.tsx index 5885ad09a..ef24575b9 100644 --- a/src/app/Shared/Services/Api.service.tsx +++ b/src/app/Shared/Services/Api.service.tsx @@ -19,6 +19,7 @@ import { createBlobURL } from '@app/utils/utils'; import { ValidatedOptions } from '@patternfly/react-core'; import { BehaviorSubject, + combineLatest, EMPTY, forkJoin, from, @@ -238,62 +239,74 @@ export class ApiService { abortSignal?: Observable, ): Observable { window.onbeforeunload = (event: BeforeUnloadEvent) => event.preventDefault(); - - const headers = this.ctx.headers({ - 'Content-Type': 'application/json', - }); - return this.sendLegacyRequest('v4', 'rules', 'Rule Upload Failed', { - method: 'POST', - body: JSON.stringify(rule), - headers: headers, - listeners: { - onUploadProgress: (event) => { - onUploadProgress && onUploadProgress(Math.floor((event.loaded * 100) / event.total)); - }, - }, - abortSignal, - }).pipe( - map((resp) => resp.ok), - tap({ - next: () => (window.onbeforeunload = null), - error: () => (window.onbeforeunload = null), - }), - first(), - ); + return this.ctx + .headers({ + 'Content-Type': 'application/json', + }) + .pipe( + concatMap((headers) => + this.sendLegacyRequest('v4', 'rules', 'Rule Upload Failed', { + method: 'POST', + body: JSON.stringify(rule), + headers, + listeners: { + onUploadProgress: (event) => { + onUploadProgress && onUploadProgress(Math.floor((event.loaded * 100) / event.total)); + }, + }, + abortSignal, + }), + ), + map((resp) => resp.ok), + tap({ + next: () => (window.onbeforeunload = null), + error: () => (window.onbeforeunload = null), + }), + first(), + ); } createRule(rule: Rule): Observable { - const headers = this.ctx.headers({ - 'Content-Type': 'application/json', - }); - return this.sendRequest('v4', 'rules', { - method: 'POST', - body: JSON.stringify(rule), - headers, - }).pipe( - map((resp) => resp.ok), - catchError((_) => of(false)), - first(), - ); + return this.ctx + .headers({ + 'Content-Type': 'application/json', + }) + .pipe( + concatMap((headers) => + this.sendRequest('v4', 'rules', { + method: 'POST', + body: JSON.stringify(rule), + headers, + }), + ), + map((resp) => resp.ok), + catchError((_) => of(false)), + first(), + ); } updateRule(rule: Rule, clean = true): Observable { - const headers = this.ctx.headers({ - 'Content-Type': 'application/json', - }); - return this.sendRequest( - 'v4', - `rules/${rule.name}`, - { - method: 'PATCH', - body: JSON.stringify(rule), - headers, - }, - new URLSearchParams({ clean: String(clean) }), - ).pipe( - map((resp) => resp.ok), - first(), - ); + return this.ctx + .headers({ + 'Content-Type': 'application/json', + }) + .pipe( + concatMap((headers) => + this.sendRequest( + 'v4', + `rules/${rule.name}`, + { + method: 'PATCH', + body: JSON.stringify(rule), + headers, + }, + new URLSearchParams({ clean: String(clean) }), + ), + ), + + map((resp) => resp.ok), + first(), + ); } deleteRule(name: string, clean = true): Observable { @@ -598,20 +611,22 @@ export class ApiService { abortSignal?: Observable, ): Observable { window.onbeforeunload = (event: BeforeUnloadEvent) => event.preventDefault(); - const body = new window.FormData(); body.append('template', file); - return this.sendLegacyRequest('v4', 'event_templates', 'Template Upload Failed', { - body: body, - method: 'POST', - headers: this.ctx.headers(), - listeners: { - onUploadProgress: (event) => { - onUploadProgress && onUploadProgress(Math.floor((event.loaded * 100) / event.total)); - }, - }, - abortSignal, - }).pipe( + return this.ctx.headers().pipe( + concatMap((headers) => + this.sendLegacyRequest('v4', 'event_templates', 'Template Upload Failed', { + body: body, + method: 'POST', + headers, + listeners: { + onUploadProgress: (event) => { + onUploadProgress && onUploadProgress(Math.floor((event.loaded * 100) / event.total)); + }, + }, + abortSignal, + }), + ), map((resp) => resp.ok), tap({ next: () => (window.onbeforeunload = null), @@ -687,20 +702,22 @@ export class ApiService { abortSignal?: Observable, ): Observable { window.onbeforeunload = (event: BeforeUnloadEvent) => event.preventDefault(); - const body = new window.FormData(); body.append('probeTemplate', file); - return this.sendLegacyRequest('v4', `probes/${file.name}`, 'Custom Probe Template Upload Failed', { - method: 'POST', - body: body, - headers: this.ctx.headers(), - listeners: { - onUploadProgress: (event) => { - onUploadProgress && onUploadProgress(Math.floor((event.loaded * 100) / event.total)); - }, - }, - abortSignal, - }).pipe( + return this.ctx.headers().pipe( + concatMap((headers) => + this.sendLegacyRequest('v4', `probes/${file.name}`, 'Custom Probe Template Upload Failed', { + method: 'POST', + body: body, + headers, + listeners: { + onUploadProgress: (event) => { + onUploadProgress && onUploadProgress(Math.floor((event.loaded * 100) / event.total)); + }, + }, + abortSignal, + }), + ), map((resp) => resp.ok), tap({ next: () => (window.onbeforeunload = null), @@ -804,34 +821,38 @@ export class ApiService { suppressNotifications?: boolean, skipStatusCheck?: boolean, ): Observable { - const headers = this.ctx.headers({ - 'Content-Type': 'application/json', - }); const req = () => - this.sendRequest( - 'v4', - 'graphql', - { - method: 'POST', - body: JSON.stringify({ - query: query.replace(/[\s]+/g, ' '), - variables, + this.ctx + .headers({ + 'Content-Type': 'application/json', + }) + .pipe( + concatMap((headers) => + this.sendRequest( + 'v4', + 'graphql', + { + method: 'POST', + body: JSON.stringify({ + query: query.replace(/[\s]+/g, ' '), + variables, + }), + headers, + }, + undefined, + suppressNotifications, + skipStatusCheck, + ), + ), + map((resp) => resp.json()), + concatMap(from), + tap((resp) => { + if (isGraphQLError(resp)) { + this.handleError(new GraphQLError(resp.errors), req); + } }), - headers, - }, - undefined, - suppressNotifications, - skipStatusCheck, - ).pipe( - map((resp) => resp.json()), - concatMap(from), - tap((resp) => { - if (isGraphQLError(resp)) { - this.handleError(new GraphQLError(resp.errors), req); - } - }), - first(), - ); + first(), + ); return req(); } @@ -884,17 +905,20 @@ export class ApiService { body.append('recording', file); body.append('labels', JSON.stringify(labels)); - return this.sendLegacyRequest('v4', 'recordings', 'Recording Upload Failed', { - method: 'POST', - body: body, - headers: this.ctx.headers(), - listeners: { - onUploadProgress: (event) => { - onUploadProgress && onUploadProgress(Math.floor((event.loaded * 100) / event.total)); - }, - }, - abortSignal, - }).pipe( + return this.ctx.headers().pipe( + concatMap((headers) => + this.sendLegacyRequest('v4', 'recordings', 'Recording Upload Failed', { + method: 'POST', + body: body, + headers, + listeners: { + onUploadProgress: (event) => { + onUploadProgress && onUploadProgress(Math.floor((event.loaded * 100) / event.total)); + }, + }, + abortSignal, + }), + ), map((resp) => { if (resp.ok) { return resp.body as string; @@ -918,17 +942,20 @@ export class ApiService { const body = new window.FormData(); body.append('cert', file); - return this.sendLegacyRequest('v4', 'certificates', 'Certificate Upload Failed', { - method: 'POST', - body, - headers: this.ctx.headers(), - listeners: { - onUploadProgress: (event) => { - onUploadProgress && onUploadProgress(Math.floor((event.loaded * 100) / event.total)); - }, - }, - abortSignal, - }).pipe( + return this.ctx.headers().pipe( + concatMap((headers) => + this.sendLegacyRequest('v4', 'certificates', 'Certificate Upload Failed', { + method: 'POST', + body, + headers, + listeners: { + onUploadProgress: (event) => { + onUploadProgress && onUploadProgress(Math.floor((event.loaded * 100) / event.total)); + }, + }, + abortSignal, + }), + ), map((resp) => resp.ok), tap({ next: () => (window.onbeforeunload = null), @@ -1122,25 +1149,29 @@ export class ApiService { matchExpression, targets: targets.map((t) => this.transformTarget(t)), }); - const headers = this.ctx.headers({ - 'Content-Type': 'application/json', - }); - return this.sendRequest( - 'v4', - 'matchExpressions', - { - method: 'POST', - body, - headers, - }, - undefined, - true, - true, - ).pipe( - first(), - concatMap((resp: Response) => resp.json()), - map((r) => r.targets), - ); + return this.ctx + .headers({ + 'Content-Type': 'application/json', + }) + .pipe( + concatMap((headers) => + this.sendRequest( + 'v4', + 'matchExpressions', + { + method: 'POST', + body, + headers, + }, + undefined, + true, + true, + ), + ), + first(), + concatMap((resp: Response) => resp.json()), + map((r) => r.targets), + ); } isTargetMatched(matchExpression: string, target: Target): Observable { @@ -1457,14 +1488,22 @@ export class ApiService { suppressNotifications = false, skipStatusCheck = false, ): Observable { - if (!config) { - config = {}; - } - config.headers = this.ctx.headers(config.headers); const p = apiVersion === 'unversioned' ? path : `/api/${apiVersion}/${path}`; const req = () => - this.ctx.url(`${p}${params ? '?' + params : ''}`).pipe( - concatMap((u) => fromFetch(u, config)), + combineLatest([ + this.ctx.url(`${p}${params ? '?' + params : ''}`), + this.ctx.headers((config || {}).headers).pipe( + map((headers) => { + let cfg = config; + if (!cfg) { + cfg = {}; + } + cfg.headers = headers; + return cfg; + }), + ), + ]).pipe( + concatMap((parts) => fromFetch(parts[0], parts[1])), map((resp) => { if (resp.ok) return resp; throw new HttpError(resp); diff --git a/src/app/Shared/Services/NotificationChannel.service.tsx b/src/app/Shared/Services/NotificationChannel.service.tsx index ffcd71785..203930522 100644 --- a/src/app/Shared/Services/NotificationChannel.service.tsx +++ b/src/app/Shared/Services/NotificationChannel.service.tsx @@ -16,13 +16,14 @@ import { AlertVariant } from '@patternfly/react-core'; import _ from 'lodash'; import { BehaviorSubject, combineLatest, Observable, Subject, timer } from 'rxjs'; -import { distinctUntilChanged, filter } from 'rxjs/operators'; +import { concatMap, distinctUntilChanged, filter, first, map } from 'rxjs/operators'; import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; import { NotificationMessage, ReadyState, CloseStatus, NotificationCategory } from './api.types'; import { messageKeys } from './api.utils'; import { LoginService } from './Login.service'; import { NotificationService } from './Notifications.service'; import { SessionState } from './service.types'; +import { CryostatContext } from './Services'; export class NotificationChannel { private ws: WebSocketSubject | null = null; @@ -30,6 +31,7 @@ export class NotificationChannel { private readonly _ready = new BehaviorSubject({ ready: false }); constructor( + private readonly ctx: CryostatContext, private readonly notifications: NotificationService, private readonly login: LoginService, ) { @@ -65,11 +67,65 @@ export class NotificationChannel { }); }); - combineLatest([this.login.getSessionState(), timer(0, 5000)]) + this.login.loggedOut().subscribe({ + next: () => { + this.ws?.complete(); + }, + error: (err: Error) => this.logError('Notifications URL configuration', err), + }); + } + + connect(): void { + combineLatest([ + this.login.getSessionState(), + this.ctx.url('/api/notifications').pipe( + first(), + map((u) => { + let wsUrl: URL; + try { + wsUrl = new URL(u); + } catch (e) { + // wasn't a URL - assume it was a relative path alone, which is OK + wsUrl = new URL(window.location.href); + wsUrl.pathname = u; + } + // set the proper protocol for WebSocket connection upgrade + wsUrl.protocol = wsUrl.protocol.replace('http', 'ws'); + return wsUrl.toString(); + }), + concatMap((url) => { + // set the instance namespace and name headers as query parameters instead. + // This is not used by normal Cryostat Web operation, where the instance is + // always the server that is hosting the web instance itself. In the console + // plugin case, the instance selector is normally sent to + // the plugin backend by custom HTTP request headers so that the plugin backend + // can proxy to the correct Cryostat server instance. We cannot set custom + // request headers in the WebSocket connection request, so we set them as query + // parameters instead so that the plugin backend can fall back to finding those. + return this.ctx.headers().pipe( + map((headers) => { + const searchParams = new URLSearchParams(); + if (headers.has('CRYOSTAT-SVC-NS')) { + searchParams.append('ns', headers.get('CRYOSTAT-SVC-NS')!); + } + if (headers.has('CRYOSTAT-SVC-NAME')) { + searchParams.append('name', headers.get('CRYOSTAT-SVC-NAME')!); + } + if (searchParams.size > 0) { + return `${url}?${searchParams}`; + } + return url; + }), + ); + }), + ), + timer(0, 5000), + ]) .pipe(distinctUntilChanged(_.isEqual)) .subscribe({ next: (parts: string[]) => { const sessionState = parseInt(parts[0]); + const url = parts[1]; if (sessionState !== SessionState.CREATING_USER_SESSION) { return; @@ -79,11 +135,8 @@ export class NotificationChannel { this.ws.complete(); } - const url = new URL(window.location.href); - url.protocol = url.protocol.replace('http', 'ws'); - url.pathname = '/api/notifications'; this.ws = webSocket({ - url: url.toString(), + url, protocol: '', openObserver: { next: () => { @@ -141,13 +194,13 @@ export class NotificationChannel { }, error: (err: Error) => this.logError('Notifications URL configuration', err), }); + } - this.login.loggedOut().subscribe({ - next: () => { - this.ws?.complete(); - }, - error: (err: Error) => this.logError('Notifications URL configuration', err), - }); + disconnect(): void { + if (this.ws) { + this.ws.complete(); + } + this.login.setSessionState(SessionState.CREATING_USER_SESSION); } isReady(): Observable { diff --git a/src/app/Shared/Services/Services.tsx b/src/app/Shared/Services/Services.tsx index 3edc1184b..ff2c8246e 100644 --- a/src/app/Shared/Services/Services.tsx +++ b/src/app/Shared/Services/Services.tsx @@ -36,20 +36,20 @@ export interface Services { export interface CryostatContext { url: (path?: string) => Observable; - headers: (init?: HeadersInit) => Headers; + headers: (init?: HeadersInit) => Observable; } const authority: string = process.env.CRYOSTAT_AUTHORITY || '.'; export const defaultContext: CryostatContext = { url: (path?: string): Observable => of(`${authority}/${path}`.replace(/([^:]\/)\/+/g, '$1')), - headers: (init?: HeadersInit) => new Headers(init), + headers: (init?: HeadersInit): Observable => of(new Headers(init)), }; const target = new TargetService(); const settings = new SettingsService(); const login = new LoginService(defaultContext.url, settings); const api = new ApiService(defaultContext, target, NotificationsInstance); -const notificationChannel = new NotificationChannel(NotificationsInstance, login); +const notificationChannel = new NotificationChannel(defaultContext, NotificationsInstance, login); const reports = new ReportService(NotificationsInstance, notificationChannel); const targets = new TargetsService(api, NotificationsInstance, notificationChannel); diff --git a/src/app/utils/fakeData.ts b/src/app/utils/fakeData.ts index 261507c76..aad91498c 100644 --- a/src/app/utils/fakeData.ts +++ b/src/app/utils/fakeData.ts @@ -250,7 +250,7 @@ class FakeApiService extends ApiService { super( { url: (path) => of(`/${path}`), - headers: () => new Headers(), + headers: () => of(new Headers()), }, target, notifications, diff --git a/src/mirage/index.ts b/src/mirage/index.ts index 8a8676341..2c9f33282 100644 --- a/src/mirage/index.ts +++ b/src/mirage/index.ts @@ -22,7 +22,12 @@ import models from './models'; import { Resource } from './typings'; export const startMirage = ({ environment = 'development' } = {}) => { - const wsUrl = new URL(window.location.href); + let wsUrl: URL; + if (environment === 'development') { + wsUrl = new URL('http://localhost:8181'); + } else { + wsUrl = new URL(window.location.href); + } wsUrl.protocol = wsUrl.protocol.replace('http', 'ws'); wsUrl.pathname = '/api/notifications'; const wsServer = new WSServer(wsUrl.toString());