From a71d76f04372fc64c348309e40616af46d8be26e Mon Sep 17 00:00:00 2001 From: swarnadipa-dev Date: Mon, 27 Jan 2025 18:24:21 +0530 Subject: [PATCH] Fetch streams,consumers, records --- .../angular-app/src/app/app.module.ts | 6 +- .../stream-form/stream-form.component.html | 43 +++- .../stream-form/stream-form.component.scss | 67 ++++++ .../stream-form/stream-form.component.ts | 196 +++++++++++++++++- .../stream-records.component.html | 26 ++- .../stream-records.component.ts | 28 ++- .../stream/components/stream.component.html | 13 +- .../stream/components/stream.component.scss | 16 ++ .../stream/components/stream.component.ts | 57 ++++- .../stream/services/stream.service.ts | 67 +++++- .../src/app/features/stream/store/actions.ts | 50 +++++ .../src/app/features/stream/store/effects.ts | 96 +++++++++ .../src/app/features/stream/store/reducers.ts | 73 +++++++ .../app/features/stream/stream.constants.ts | 12 ++ .../app/features/stream/stream.interface.ts | 10 + .../src/app/features/stream/stream.module.ts | 19 +- ...generic-multi-feature-endpoints.service.ts | 6 +- .../constants/rest-end-ponts.constants.ts | 19 +- .../app/shared/services/network.service.ts | 22 +- .../angular-app/src/styles.scss | 2 +- 20 files changed, 798 insertions(+), 30 deletions(-) create mode 100644 nuxeo-admin-console-web/angular-app/src/app/features/stream/stream.interface.ts diff --git a/nuxeo-admin-console-web/angular-app/src/app/app.module.ts b/nuxeo-admin-console-web/angular-app/src/app/app.module.ts index 711b9666..4f658722 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/app.module.ts +++ b/nuxeo-admin-console-web/angular-app/src/app/app.module.ts @@ -36,6 +36,7 @@ import * as HomeEffects from "./features/home/store/effects"; import * as ProbesEffects from "./features/sub-features/probes-data/store/effects"; import * as ReindexEffects from "./features/sub-features/generic-multi-feature-layout/store/effects"; import * as BulkActionMonitoringEffects from "./features/bulk-action-monitoring/store/effects"; +import * as StreamEffects from "./features/stream/store/effects"; import { folderActionReducer, documentActionReducer, @@ -51,6 +52,7 @@ import { import { CustomSnackBarComponent } from "./shared/components/custom-snack-bar/custom-snack-bar.component"; import { AuthInterceptorService } from "./auth/services/auth-interceptor.service"; import { StreamModule } from "./features/stream/stream.module"; +import { streamsReducer } from "./features/stream/store/reducers"; @NgModule({ declarations: [ @@ -80,6 +82,7 @@ import { StreamModule } from "./features/stream/stream.module"; nxqlAction: nxqlActionReducer, bulkActionMonitoring: bulkActionMonitoringReducer, probes: ProbeDataReducer, + streams: streamsReducer }), StoreRouterConnectingModule.forRoot(), EffectsModule.forRoot( @@ -87,7 +90,8 @@ import { StreamModule } from "./features/stream/stream.module"; HomeEffects, ReindexEffects, BulkActionMonitoringEffects, - ProbesEffects + ProbesEffects, + StreamEffects ), MatIconModule, MatTooltipModule, diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.html b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.html index d0d15ccb..9bc3bb37 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.html +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.html @@ -1 +1,42 @@ -

I am the form

\ No newline at end of file +
+
+
+

{{ GENERIC_LABELS.REQUIRED_FIELD_INDICATOR }}

+

{{ STREAM_LABELS.STREAMS }}*

+ + + + {{ stream.name }} + + + +
+ +
+ +
+

+ {{ STREAM_LABELS.POSIITON }}* +

+ + +

{{ STREAM_LABELS.POSITION_OPTIONS.CONSUMER }}

+ + + + {{ consumer.consumer }} + + + +
+
+
+

+ +
+
\ No newline at end of file diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.scss b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.scss index e69de29b..98942de9 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.scss +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.scss @@ -0,0 +1,67 @@ +.stream-form-container { + padding: 10px 0px 0px 10px; + .required-indicator { + color: rgb(218, 21, 0); + font-family: "Open Sans", sans-serif; + padding-left: 2px; + } + #requiredField { + font-size: 14px; + color: rgb(218, 21, 0); + font-family: "Open Sans"; + line-height: 1; + } + + .label { + line-height: 1; + margin: 5px; + font-size: 14px; + font-weight: 400; + font-family: "Open Sans", sans-serif; + } + + &--stream-container { + margin-top: 20px; + } + + &--position-container { + display: flex; + flex-direction: column; + + #position { + margin-top: 15px; + margin-left: -5px; + } + + mat-radio-button .mdc-radio { + bottom: 35px; + } + + .radio-group-container { + display: flex; + flex-wrap: wrap; + gap: 16px; + } + + .radio-item { + display: flex; + align-items: center; + width: 48%; + } + } + + &__button { + border-radius: 4px !important; + width: 140px; + height: 45px; + position: relative; + left: 20px; + font-size: 16px; + bottom: 30px; + font-family: "Open Sans", sans-serif; + font-weight: 550; + letter-spacing: normal; + background-color: #0a60ce !important; + color: #fff; + } +} diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.ts b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.ts index 0c1aa358..9b2c8188 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.ts +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.ts @@ -1,9 +1,199 @@ -import { Component, ViewEncapsulation } from "@angular/core"; +import { GENERIC_LABELS } from "./../../../sub-features/generic-multi-feature-layout/generic-multi-feature-layout.constants"; +import { + Component, + EventEmitter, + OnDestroy, + OnInit, + Output, + ViewEncapsulation, +} from "@angular/core"; +import { STREAM_LABELS } from "../../stream.constants"; +import { FormBuilder, FormGroup, Validators } from "@angular/forms"; +import * as StreamActions from "../../store/actions"; +import { Store, select } from "@ngrx/store"; +import { StreamsState } from "../../store/reducers"; +import { Observable, Subscription } from "rxjs"; +import { Stream } from "../../stream.interface"; +import { HttpErrorResponse } from "@angular/common/http"; + @Component({ selector: "stream-form", templateUrl: "./stream-form.component.html", styleUrls: ["./stream-form.component.scss"], - encapsulation: ViewEncapsulation.None , + encapsulation: ViewEncapsulation.None, }) -export class StreamFormComponent { +export class StreamFormComponent implements OnInit, OnDestroy { + STREAM_LABELS = STREAM_LABELS; + GENERIC_LABELS = GENERIC_LABELS; + streamForm: FormGroup; + selectedPositionValue: string = ""; + selectedConsumerOption: string = ""; + isSubmitBtnDisabled = false; + fetchStreamsSuccess$: Observable; + fetchStreamsError$: Observable; + fetchConsumersSuccess$: Observable<{ stream: string; consumer: string }[]>; + fetchConsumersError$: Observable; + fetchRecordsSuccess$: Observable; + fetchRecordsError$: Observable; + streams: Stream[] = []; + records: unknown[] = []; + consumers: { stream: string; consumer: string }[] = []; + fetchStreamsErrorSubscription = new Subscription(); + fetchStreamsSuccessSubscription = new Subscription(); + fetchConsumersErrorSubscription = new Subscription(); + fetchConsumersSuccessSubscription = new Subscription(); + fetchRecordsErrorSubscription = new Subscription(); + fetchRecordsSuccessSubscription = new Subscription(); + selectedConsumer = ""; + @Output() setRecordsData = new EventEmitter(); + + constructor( + private fb: FormBuilder, + private store: Store<{ streams: StreamsState }> + ) { + this.streamForm = this.fb.group({ + stream: ["", Validators.required], + position: [null, Validators.required], + }); + + this.fetchStreamsSuccess$ = this.store.pipe( + select((state) => state.streams?.streams) + ); + + this.fetchStreamsError$ = this.store.pipe( + select((state) => state.streams?.error) + ); + + this.fetchConsumersSuccess$ = this.store.pipe( + select((state) => state.streams?.consumers) + ); + + this.fetchConsumersError$ = this.store.pipe( + select((state) => state.streams?.error) + ); + + this.fetchRecordsSuccess$ = this.store.pipe( + select((state) => state.streams?.records) + ); + + this.fetchRecordsError$ = this.store.pipe( + select((state) => state.streams?.error) + ); + } + + ngOnInit(): void { + this.fetchStreamsSuccessSubscription = this.fetchStreamsSuccess$.subscribe( + (data: Stream[]) => { + if (data?.length > 0) { + this.streams = data; + this.streamForm.patchValue({ + stream: data[0]?.name, + }); + const params = { + stream: this.streamForm.controls["stream"].value, + }; + + this.store.dispatch(StreamActions.fetchConsumers({ params })); + } else { + this.store.dispatch(StreamActions.fetchStreams()); + } + } + ); + + this.fetchStreamsErrorSubscription = this.fetchStreamsError$.subscribe( + (error) => { + if (error instanceof HttpErrorResponse ? error?.error : error) { + console.log(error); + } + } + ); + + this.fetchConsumersSuccessSubscription = + this.fetchConsumersSuccess$.subscribe( + (data: { stream: string; consumer: string }[]) => { + if (data?.length > 0) { + this.consumers = data; + this.selectedConsumer = this.consumers + ? this.consumers[0]?.consumer + : ""; + this.streamForm.get("position")?.setValue(this.selectedConsumer); + } + } + ); + + this.fetchStreamsErrorSubscription = this.fetchStreamsError$.subscribe( + (error) => { + if (error instanceof HttpErrorResponse ? error?.error : error) { + console.log(error); + } + } + ); + + /* this.fetchRecordsSuccessSubscription = this.fetchRecordsSuccess$.subscribe( + (data: unknown[]) => { + if (data?.length > 0) { + this.records = data; + this.setRecordsData.emit(this.records); + console.log(this.records); + } + } + ); + + this.fetchRecordsErrorSubscription = this.fetchRecordsError$.subscribe( + (error) => { + if (error) { + this.setRecordsData.emit(null); + console.log(error); + } + } + ); */ + } + + onConsumerOptionChange(selectedValue: string) { + this.selectedConsumer = selectedValue; + this.streamForm.get("position")?.setValue(selectedValue); + } + + onStreamChange(value: string) { + this.streamForm.patchValue({ + stream: value, + }); + // this.store.dispatch(StreamActions.fetchConsumers({ stream: value })); + const params = { + stream: this.streamForm.controls["stream"].value, + }; + + this.store.dispatch(StreamActions.fetchConsumers({ params })); + } + + onStreamFormSubmit() { + /* if (!this.streamForm?.valid && !this.isSubmitBtnDisabled) { + this.isSubmitBtnDisabled = true; + } else { + console.log(this.streamForm); + } */ + console.log(this.streamForm); + const params = { + stream: this.streamForm?.get("stream")?.value, + fromGroup: this.streamForm?.get("position")?.value, + rewind: 0, + timeout: "1ms", + limit: 1, + }; + this.store.dispatch(StreamActions.triggerRecordsSSEStream({ params })); + } + + + ngOnDestroy(): void { + // TODO: Use form values instead of hardcoded values for rewind, limit & timeout. Dynamically add position param + this.store.dispatch(StreamActions.resetFetchStreamsState()); + this.store.dispatch(StreamActions.resetFetchConsumersState()); + this.store.dispatch(StreamActions.resetFetchRecordsState()); + this.fetchStreamsSuccessSubscription?.unsubscribe(); + this.fetchStreamsErrorSubscription?.unsubscribe(); + this.fetchConsumersSuccessSubscription?.unsubscribe(); + this.fetchConsumersErrorSubscription?.unsubscribe(); + this.fetchRecordsSuccessSubscription?.unsubscribe(); + this.fetchRecordsErrorSubscription?.unsubscribe(); + } } diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-records/stream-records.component.html b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-records/stream-records.component.html index 0c5ad4fa..b71aad9c 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-records/stream-records.component.html +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-records/stream-records.component.html @@ -1 +1,25 @@ -

I am the records

\ No newline at end of file +
+
+ +

Fetched 10 records

+
+ + + +
+ Records +
+ +
    +
  • + {{ record }} +
  • +
+
+
+
\ No newline at end of file diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-records/stream-records.component.ts b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-records/stream-records.component.ts index 60d464e2..0b469cd7 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-records/stream-records.component.ts +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-records/stream-records.component.ts @@ -1,9 +1,31 @@ -import { Component, ViewEncapsulation } from "@angular/core"; +import { + Component, + Input, + OnChanges, + OnInit, + SimpleChanges, + ViewEncapsulation, +} from "@angular/core"; +import { Store } from "@ngrx/store"; +import { Observable } from "rxjs"; +import { StreamsState } from "../../store/reducers"; @Component({ selector: "stream-records", templateUrl: "./stream-records.component.html", styleUrls: ["./stream-records.component.scss"], - encapsulation: ViewEncapsulation.None , + encapsulation: ViewEncapsulation.None, }) -export class StreamRecordsComponent { +export class StreamRecordsComponent implements OnInit { + @Input() records: unknown[] = []; + // records$!: Observable; + + constructor(private store: Store<{ stream: StreamsState }>) {} + + ngOnInit(): void { + // this.records$ = this.store.select((state) => state?.stream?.records); + } + + ngOnChanges() { + console.log(this.records); + } } diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream.component.html b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream.component.html index 20710b4e..2bf400dd 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream.component.html +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream.component.html @@ -1,2 +1,11 @@ - - \ No newline at end of file +
+

+ {{ pageTitle }} +

+
+ +
+ +
+
+
\ No newline at end of file diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream.component.scss b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream.component.scss index e69de29b..1f053489 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream.component.scss +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream.component.scss @@ -0,0 +1,16 @@ +.stream-container { + background-color: #f4f4f4; + margin: 0px; + padding: 20px; + height: 100%; + h1 { + font-size: 20px; + font-weight: 500; + width: fit-content; + } + &__placeholder { + background-color: #fff; + min-height: 95%; + height: auto; + } + } \ No newline at end of file diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream.component.ts b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream.component.ts index b0c94eba..89aa21a8 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream.component.ts +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream.component.ts @@ -1,8 +1,61 @@ -import { Component } from "@angular/core"; +import { Component, OnDestroy, OnInit } from "@angular/core"; +import { STREAM_LABELS } from "../stream.constants"; +import { Observable, Subscription } from "rxjs"; +import { Store, select } from "@ngrx/store"; +import { StreamsState } from "../store/reducers"; +import { Stream } from "../stream.interface"; @Component({ selector: "stream", templateUrl: "./stream.component.html", styleUrls: ["./stream.component.scss"], }) -export class StreamComponent {} +export class StreamComponent implements OnInit, OnDestroy { + pageTitle = STREAM_LABELS.STREAM_PAGE_TITLE; + records: unknown[] = []; + recordsAvailable$!: Observable; + fetchRecordsErrorSubscription = new Subscription(); + fetchRecordsSuccessSubscription = new Subscription(); + fetchRecordsSuccess$!: Observable; + fetchRecordsError$!: Observable; + // fetchStreamsSuccess$: Observable; + // fetchStreamsError$: Observable; + constructor(private store: Store<{ stream: StreamsState }>) { + + /* this.fetchStreamsSuccess$ = this.store.pipe( + select((state) => state.streams?.streams) + ); + + this.fetchStreamsError$ = this.store.pipe( + select((state) => state.streams?.error) + ); */ + + } + + ngOnInit(): void { + // this.recordsAvailable$ = this.store.select( + // (state) => state?.stream?.records?.length > 0 + // ); + + this.fetchRecordsSuccessSubscription = this.fetchRecordsSuccess$.subscribe( + (data: unknown[]) => { + if (data?.length > 0) { + this.records = data; + } + } + ); + + this.fetchRecordsErrorSubscription = this.fetchRecordsError$.subscribe( + (error) => { + if (error) { + console.log(error); + } + } + ); + } + + ngOnDestroy(): void { + this.fetchRecordsSuccessSubscription?.unsubscribe(); + this.fetchRecordsErrorSubscription?.unsubscribe(); + } +} diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/services/stream.service.ts b/nuxeo-admin-console-web/angular-app/src/app/features/stream/services/stream.service.ts index 046ec772..b6c91b24 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/services/stream.service.ts +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/services/stream.service.ts @@ -1,8 +1,71 @@ +import { REST_END_POINTS } from "./../../../shared/constants/rest-end-ponts.constants"; import { Injectable } from "@angular/core"; -import { HttpClient } from "@angular/common/http"; import { NetworkService } from "../../../shared/services/network.service"; +import { Observable } from "rxjs"; +import { Stream } from "../stream.interface"; +import * as StreamActions from "../store/actions"; +import { Store } from "@ngrx/store"; @Injectable({ providedIn: "root", }) -export class StreamService {} +export class StreamService { + constructor(private networkService: NetworkService, private store: Store) {} + getStreams(): Observable { + return this.networkService.makeHttpRequest( + REST_END_POINTS.STREAM + ); + } + + getConsumers(params: { + [key: string]: string; + }): Observable<{ stream: string; consumer: string }[]> { + return this.networkService.makeHttpRequest< + { stream: string; consumer: string }[] + >(REST_END_POINTS.STREAM_CONSUMERS, { queryParam: params }); + } + + getRecords( + params: { [key: string]: string | number | boolean } + ): Observable { + return this.networkService.makeHttpRequest( + REST_END_POINTS.STREAM_RECORDS, + { + queryParam: params + } + ); + } + + startSSEStream(params: Record) { + // Use networkService to get the correct URL for the endpoint + const url = this.networkService.getAPIEndpoint( + REST_END_POINTS.STREAM_RECORDS + ); + + // You can now use the URL and params to start the SSE stream + const fullUrl = this.appendParamsToUrl(url, params); + + return new Observable((observer) => { + const eventSource = new EventSource(fullUrl, { withCredentials: true}); + + eventSource.onmessage = (event) => { + observer.next(event.data); + }; + + eventSource.onerror = (error) => { + observer.error(error); + }; + + // Cleanup when the observable is unsubscribed + return () => { + eventSource.close(); + }; + }); + } + + // Helper method to append params to URL + private appendParamsToUrl(url: string, params: Record) { + const queryString = new URLSearchParams(params as any).toString(); + return `${url}?${queryString}`; + } +} diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/store/actions.ts b/nuxeo-admin-console-web/angular-app/src/app/features/stream/store/actions.ts index e69de29b..78bf46b4 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/store/actions.ts +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/store/actions.ts @@ -0,0 +1,50 @@ +import { createAction, props } from "@ngrx/store"; +import { HttpErrorResponse } from "@angular/common/http"; +import { Stream } from "../stream.interface"; + +export const fetchStreams = createAction("[Admin] Fetch Streams"); +export const onFetchStreamsLaunch = createAction( + "[Admin] On Fetch Streams Launch", + props<{ streamsData: Stream[] }>() +); +export const onFetchStreamsFailure = createAction( + "[Admin] On Fetch Streams Failure", + props<{ error: HttpErrorResponse }>() +); +export const resetFetchStreamsState = createAction( + "[Admin] Reset Fetch Streams State" +); + +export const fetchConsumers = createAction( + "[Admin] Fetch Consumers", + props<{ params: { [key: string]: string }, }>() +); + +export const onFetchConsumersLaunch = createAction( + "[Admin] On Fetch Consumers Launch", + props<{ consumersData: { stream: string; consumer: string }[] }>() +); +export const onFetchConsumersFailure = createAction( + "[Admin] On Fetch Consumers Failure", + props<{ error: HttpErrorResponse }>() +); +export const resetFetchConsumersState = createAction( + "[Admin] Reset Fetch Consumers State" +); + +export const triggerRecordsSSEStream = createAction( + '[Admin] Trigger Records SSE Stream', + props<{ params: Record }>() +); + +export const onFetchRecordsLaunch = createAction( + "[Admin] On Fetch Records Launch", + props<{ recordsData: unknown[] }>() +); +export const onFetchRecordsFailure = createAction( + "[Admin] On Fetch Records Failure", + props<{ error: unknown }>() +); +export const resetFetchRecordsState = createAction( + "[Admin] Reset Fetch Records State" +); diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/store/effects.ts b/nuxeo-admin-console-web/angular-app/src/app/features/stream/store/effects.ts index e69de29b..2e1f31e4 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/store/effects.ts +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/store/effects.ts @@ -0,0 +1,96 @@ +import { HttpErrorResponse } from "@angular/common/http"; +import { of } from "rxjs"; +import { + catchError, + concatMap, + map, + mergeMap, + switchMap, + tap, +} from "rxjs/operators"; +import { createEffect } from "@ngrx/effects"; +import { Actions, ofType } from "@ngrx/effects"; +import * as StreamActions from "../store/actions"; +import { inject } from "@angular/core"; +import { StreamService } from "../services/stream.service"; +import { Stream } from "../stream.interface"; + +export const loadFetchStreamsEffect = createEffect( + (actions$ = inject(Actions), streamService = inject(StreamService)) => { + return actions$.pipe( + ofType(StreamActions.fetchStreams), + switchMap(() => { + return streamService.getStreams().pipe( + map((data: Stream[]) => { + return StreamActions.onFetchStreamsLaunch({ + streamsData: data, + }); + }), + catchError((error: HttpErrorResponse) => { + return of( + StreamActions.onFetchStreamsFailure({ + error, + }) + ); + }) + ); + }) + ); + }, + { functional: true } +); + +export const loadFetchConsumersEffect = createEffect( + (actions$ = inject(Actions), streamService = inject(StreamService)) => { + return actions$.pipe( + ofType(StreamActions.fetchConsumers), + switchMap((action) => { + return streamService.getConsumers(action?.params).pipe( + map((data: { stream: string; consumer: string }[]) => { + return StreamActions.onFetchConsumersLaunch({ + consumersData: data, + }); + }), + catchError((error: HttpErrorResponse) => { + return of( + StreamActions.onFetchConsumersFailure({ + error, + }) + ); + }) + ); + }) + ); + }, + { functional: true } +); + + +export const triggerRecordsSSEStream$ = createEffect( + ( + actions$ = inject(Actions), + streamService = inject(StreamService) + ) => { + return actions$.pipe( + ofType(StreamActions.triggerRecordsSSEStream), // Listen for the 'triggerRecordsSSEStream' action + mergeMap((action) => { + // Call the startSSEStream method with the params from the action + return streamService.startSSEStream(action.params).pipe( + map((response: any) => { + // Process the SSE data here (or handle it as needed) + // Dispatch an action with the processed data if necessary + console.log(typeof response); + return StreamActions.onFetchRecordsLaunch({ recordsData: response }); + }), + catchError((error) => { + // Handle any errors from the SSE stream + return of(StreamActions.onFetchRecordsFailure({ error })); + }) + ); + }) + ); + }, + { functional: true } // Ensure 'dispatch' is set to true +); + + diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/store/reducers.ts b/nuxeo-admin-console-web/angular-app/src/app/features/stream/store/reducers.ts index e69de29b..dccf4103 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/store/reducers.ts +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/store/reducers.ts @@ -0,0 +1,73 @@ +import { createReducer, on } from "@ngrx/store"; +import * as StreamActions from "./actions"; +import { HttpErrorResponse } from "@angular/common/http"; +import { Stream } from "../stream.interface"; + +export interface StreamsState { + streams: Stream[]; + consumers: { stream: string; consumer: string }[]; + records: unknown[]; + error: unknown; +} + +export const initialStreamsState: StreamsState = { + streams: [], + consumers: [], + records: [], + error: null, +}; + +export const streamsReducer = createReducer( + initialStreamsState, + on(StreamActions.fetchStreams, (state) => ({ + ...state, + error: null, + })), + on(StreamActions.onFetchStreamsLaunch, (state, { streamsData }) => ({ + ...state, + streams: streamsData, + })), + on(StreamActions.onFetchStreamsFailure, (state, { error }) => ({ + ...state, + error, + })), + on(StreamActions.resetFetchStreamsState, (state) => ({ + ...state, + streams: initialStreamsState.streams, + error: null, + })), + on(StreamActions.fetchConsumers, (state) => ({ + ...state, + error: null, + })), + on(StreamActions.onFetchConsumersLaunch, (state, { consumersData }) => ({ + ...state, + consumers: consumersData, + })), + on(StreamActions.onFetchConsumersFailure, (state, { error }) => ({ + ...state, + error, + })), + on(StreamActions.resetFetchConsumersState, (state) => ({ + ...state, + consumers: initialStreamsState.consumers, + error: null, + })), + on(StreamActions.triggerRecordsSSEStream, (state) => ({ + ...state, + error: null, + })), + on(StreamActions.onFetchRecordsLaunch, (state, { recordsData }) => ({ + ...state, + records: [...state.records, Array(recordsData)], + })), + on(StreamActions.onFetchRecordsFailure, (state, { error }) => ({ + ...state, + error, + })), + on(StreamActions.resetFetchRecordsState, (state) => ({ + ...state, + records: initialStreamsState.records, + error: null, + })) +); diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/stream.constants.ts b/nuxeo-admin-console-web/angular-app/src/app/features/stream/stream.constants.ts index e69de29b..43216bd7 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/stream.constants.ts +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/stream.constants.ts @@ -0,0 +1,12 @@ +export const STREAM_LABELS = { + STREAM_PAGE_TITLE: "Stream Management", + STREAMS: "Streams", + POSIITON: "Position", + POSITION_OPTIONS: { + CONSUMER: "From a consumer position", + BEGINNING: "From Beginning", + TAIL: "From Tail", + OFFSET: "From Offset" + }, + VIEW_RECORDS: "View Records" +}; diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/stream.interface.ts b/nuxeo-admin-console-web/angular-app/src/app/features/stream/stream.interface.ts new file mode 100644 index 00000000..7e89f602 --- /dev/null +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/stream.interface.ts @@ -0,0 +1,10 @@ +export interface Stream { + name: string | null; + partitions: number | null; + codec: string | null; +} + +export interface Consumer { + stream: string; + consumer: string; +} diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/stream/stream.module.ts b/nuxeo-admin-console-web/angular-app/src/app/features/stream/stream.module.ts index 33b8a5ff..8d932366 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/stream/stream.module.ts +++ b/nuxeo-admin-console-web/angular-app/src/app/features/stream/stream.module.ts @@ -4,9 +4,26 @@ import { StreamComponent } from "./components/stream.component"; import { StreamFormComponent } from "./components/stream-form/stream-form.component"; import { StreamRoutingModule } from "./stream-routing.module"; import { StreamRecordsComponent } from "./components/stream-records/stream-records.component"; +import { MatButtonModule } from "@angular/material/button"; +import { ReactiveFormsModule } from "@angular/forms"; +import { MatFormFieldModule } from "@angular/material/form-field"; +import { MatInputModule } from "@angular/material/input"; +import { MatSelectModule } from "@angular/material/select"; +import {MatRadioModule} from '@angular/material/radio'; +import { MatCardModule } from "@angular/material/card"; @NgModule({ declarations: [StreamComponent, StreamFormComponent, StreamRecordsComponent], - imports: [CommonModule, StreamRoutingModule], + imports: [ + CommonModule, + StreamRoutingModule, + ReactiveFormsModule, + MatFormFieldModule, + MatInputModule, + MatButtonModule, + MatSelectModule, + MatRadioModule, + MatCardModule + ], }) export class StreamModule {} diff --git a/nuxeo-admin-console-web/angular-app/src/app/features/sub-features/generic-multi-feature-layout/services/generic-multi-feature-endpoints.service.ts b/nuxeo-admin-console-web/angular-app/src/app/features/sub-features/generic-multi-feature-layout/services/generic-multi-feature-endpoints.service.ts index 89d2b99f..4fc7f400 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/features/sub-features/generic-multi-feature-layout/services/generic-multi-feature-endpoints.service.ts +++ b/nuxeo-admin-console-web/angular-app/src/app/features/sub-features/generic-multi-feature-layout/services/generic-multi-feature-endpoints.service.ts @@ -23,7 +23,7 @@ export class GenericMultiFeatureEndpointsService { No params for queryParam since, the only param, i.e. 'query' is appended to the url & no request url for bodyParam, since endpoint is already sent as the 1st parameter here, and query is part of body params */ - { queryParam: { requestUrl }, bodyParam: requestParams, requestHeaders } + { queryParam: { query: requestUrl }, bodyParam: requestParams, requestHeaders } ); } @@ -39,7 +39,7 @@ export class GenericMultiFeatureEndpointsService { No params for queryParam since, the only param, i.e. 'query' is appended to the url & no request url for bodyParam, since endpoint is already sent as the 1st parameter here, and query is part of body params */ - { queryParam: { requestUrl }, bodyParam: requestParams, requestHeaders } + { queryParam: { query: requestUrl }, bodyParam: requestParams, requestHeaders } ); } @@ -51,7 +51,7 @@ export class GenericMultiFeatureEndpointsService { ): Observable { return this.networkService.makeHttpRequest( REST_END_POINTS[featureEndpoint as keyof typeof REST_END_POINTS], - { queryParam: { requestUrl }, bodyParam: requestParams, requestHeaders } + { queryParam: { query: requestUrl }, bodyParam: requestParams, requestHeaders } ); } } diff --git a/nuxeo-admin-console-web/angular-app/src/app/shared/constants/rest-end-ponts.constants.ts b/nuxeo-admin-console-web/angular-app/src/app/shared/constants/rest-end-ponts.constants.ts index 31d10980..5718a2dc 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/shared/constants/rest-end-ponts.constants.ts +++ b/nuxeo-admin-console-web/angular-app/src/app/shared/constants/rest-end-ponts.constants.ts @@ -10,6 +10,9 @@ export const REST_END_POINTS = { PICTURE_RENDITIONS: "PICTURE_RENDITIONS", VIDEO_RENDITIONS_GENERATION: "VIDEO_RENDITIONS_GENERATION", FULLTEXT_REINDEX: "FULLTEXT_REINDEX", + STREAM: "STREAM", + STREAM_CONSUMERS: "STREAM_CONSUMERS", + STREAM_RECORDS: "STREAM_RECORDS", } as const; type RestEndpointKey = keyof typeof REST_END_POINTS; @@ -65,6 +68,18 @@ export const REST_END_POINT_CONFIG: Record< }, FULLTEXT_REINDEX: { endpoint: "/management/fulltext/extract", - method: "POST" - } + method: "POST", + }, + STREAM: { + endpoint: "/management/stream/streams", + method: "GET", + }, + STREAM_CONSUMERS: { + endpoint: "/management/stream/consumers", + method: "GET", + }, + STREAM_RECORDS: { + endpoint: "/management/stream/cat", + method: "GET", + }, }; diff --git a/nuxeo-admin-console-web/angular-app/src/app/shared/services/network.service.ts b/nuxeo-admin-console-web/angular-app/src/app/shared/services/network.service.ts index e45ac1c0..ee1ee449 100644 --- a/nuxeo-admin-console-web/angular-app/src/app/shared/services/network.service.ts +++ b/nuxeo-admin-console-web/angular-app/src/app/shared/services/network.service.ts @@ -35,6 +35,7 @@ export class NetworkService { const method = config.method || "PUT"; let params = new HttpParams(); + // Process URL parameters if (data?.["urlParam"] && Object.keys(data?.["urlParam"]).length > 0) { Object.entries(data?.["urlParam"]).forEach(([key, value]) => { if (url.indexOf(key) > -1) { @@ -44,14 +45,22 @@ export class NetworkService { delete data["urlParam"]; } + // Process query parameters if (data?.["queryParam"]) { - const queryParam = data["queryParam"] as { requestUrl: string }; - if (queryParam["requestUrl"] !== "") { - url += `?query=${queryParam["requestUrl"]}`; - delete data["queryParam"]; + const queryParams = data["queryParam"] as Record< + string, + string | number | boolean + >; + const queryString = Object.entries(queryParams) + .map(([key, value]) => `${key}=${String(value)}`) + .join("&"); + if (queryString) { + url += url.includes("?") ? `&${queryString}` : `?${queryString}`; } + delete data["queryParam"]; } + // Switch-case for HTTP methods switch (method) { case "POST": return this.http.post(url, data?.["bodyParam"] || {}, { @@ -59,14 +68,12 @@ export class NetworkService { ? new HttpHeaders(data?.["requestHeaders"] as Record) : {}, }); - break; case "PUT": return this.http.put(url, data || {}, { headers: data?.["requestHeaders"] ? new HttpHeaders(data?.["requestHeaders"] as Record) : {}, }); - break; case "DELETE": return this.http.delete(url, { body: data, @@ -74,7 +81,6 @@ export class NetworkService { ? new HttpHeaders(data?.["requestHeaders"] as Record) : {}, }); - break; case "GET": if (data) { Object.keys(data).forEach((key) => { @@ -87,7 +93,7 @@ export class NetworkService { ? new HttpHeaders(data?.["requestHeaders"] as Record) : {}, }); - break; + default: throw new Error(`Unsupported HTTP method: ${method}`); } diff --git a/nuxeo-admin-console-web/angular-app/src/styles.scss b/nuxeo-admin-console-web/angular-app/src/styles.scss index a08e6de9..6479792f 100644 --- a/nuxeo-admin-console-web/angular-app/src/styles.scss +++ b/nuxeo-admin-console-web/angular-app/src/styles.scss @@ -463,7 +463,7 @@ mat-form-field.input-field.nxql .mat-mdc-text-field-wrapper { height: auto !important; } -mat-form-field.input-field.recompute +mat-form-field.input-field .mat-mdc-text-field-wrapper.mdc-text-field--outlined .mat-mdc-form-field-infix { padding-top: 10px;