Skip to content

Commit

Permalink
Fetch streams,consumers, records
Browse files Browse the repository at this point in the history
  • Loading branch information
swarnadipa-dev committed Jan 27, 2025
1 parent eae94a5 commit a71d76f
Show file tree
Hide file tree
Showing 20 changed files with 798 additions and 30 deletions.
6 changes: 5 additions & 1 deletion nuxeo-admin-console-web/angular-app/src/app/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import * as HomeEffects from "./features/home/store/effects";
import * as ProbesEffects from "./features/sub-features/probes-data/store/effects";
import * as ReindexEffects from "./features/sub-features/generic-multi-feature-layout/store/effects";
import * as BulkActionMonitoringEffects from "./features/bulk-action-monitoring/store/effects";
import * as StreamEffects from "./features/stream/store/effects";
import {
folderActionReducer,
documentActionReducer,
Expand All @@ -51,6 +52,7 @@ import {
import { CustomSnackBarComponent } from "./shared/components/custom-snack-bar/custom-snack-bar.component";
import { AuthInterceptorService } from "./auth/services/auth-interceptor.service";
import { StreamModule } from "./features/stream/stream.module";
import { streamsReducer } from "./features/stream/store/reducers";

@NgModule({
declarations: [
Expand Down Expand Up @@ -80,14 +82,16 @@ import { StreamModule } from "./features/stream/stream.module";
nxqlAction: nxqlActionReducer,
bulkActionMonitoring: bulkActionMonitoringReducer,
probes: ProbeDataReducer,
streams: streamsReducer
}),
StoreRouterConnectingModule.forRoot(),
EffectsModule.forRoot(
authEffects,
HomeEffects,
ReindexEffects,
BulkActionMonitoringEffects,
ProbesEffects
ProbesEffects,
StreamEffects
),
MatIconModule,
MatTooltipModule,
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,42 @@
<p>I am the form</p>
<div class="stream-form-container">
<form [formGroup]="streamForm" (ngSubmit)="onStreamFormSubmit()" class="stream-form">
<div class="stream-form-container--stream-container">
<p id="requiredField"> {{ GENERIC_LABELS.REQUIRED_FIELD_INDICATOR }}</p>
<p class="label">{{ STREAM_LABELS.STREAMS }}<span class="required-indicator">*</span></p>
<mat-form-field appearance="outline" class="input-field">
<mat-select matNativeControl formControlName="stream" id="stream"
(selectionChange)="onStreamChange($event.value)">
<mat-option *ngFor="let stream of streams" [value]="stream.name">
{{ stream.name }}
</mat-option>
</mat-select>
</mat-form-field>
</div>

<hr>

<div class="stream-form-container--position-container">
<p class="label">
{{ STREAM_LABELS.POSIITON }}<span class="required-indicator">*</span>
</p>
<mat-radio-group class="radio-group-container" matNativeControl formControlName="position" id="position">
<mat-radio-button [value]="selectedConsumer" class="radio-item">
<p class="label">{{ STREAM_LABELS.POSITION_OPTIONS.CONSUMER }}</p>
<mat-form-field appearance="outline" class="input-field">
<mat-select matNativeControl id="consumer"
(selectionChange)="onConsumerOptionChange($event.value)" [value]="selectedConsumer">
<mat-option *ngFor="let consumer of consumers" [value]="consumer.consumer">
{{ consumer.consumer }}
</mat-option>
</mat-select>
</mat-form-field>
</mat-radio-button>
</mat-radio-group>
</div>
<br><br>
<button mat-flat-button color="primary" type="submit" class="stream-form-container__button"
[disabled]="isSubmitBtnDisabled">
{{ STREAM_LABELS.VIEW_RECORDS }}
</button>
</form>
</div>
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
.stream-form-container {
padding: 10px 0px 0px 10px;
.required-indicator {
color: rgb(218, 21, 0);
font-family: "Open Sans", sans-serif;
padding-left: 2px;
}
#requiredField {
font-size: 14px;
color: rgb(218, 21, 0);
font-family: "Open Sans";
line-height: 1;
}

.label {
line-height: 1;
margin: 5px;
font-size: 14px;
font-weight: 400;
font-family: "Open Sans", sans-serif;
}

&--stream-container {
margin-top: 20px;
}

&--position-container {
display: flex;
flex-direction: column;

#position {
margin-top: 15px;
margin-left: -5px;
}

mat-radio-button .mdc-radio {
bottom: 35px;
}

.radio-group-container {
display: flex;
flex-wrap: wrap;
gap: 16px;
}

.radio-item {
display: flex;
align-items: center;
width: 48%;
}
}

&__button {
border-radius: 4px !important;
width: 140px;
height: 45px;
position: relative;
left: 20px;
font-size: 16px;
bottom: 30px;
font-family: "Open Sans", sans-serif;
font-weight: 550;
letter-spacing: normal;
background-color: #0a60ce !important;
color: #fff;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,199 @@
import { Component, ViewEncapsulation } from "@angular/core";
import { GENERIC_LABELS } from "./../../../sub-features/generic-multi-feature-layout/generic-multi-feature-layout.constants";
import {
Component,
EventEmitter,
OnDestroy,
OnInit,
Output,
ViewEncapsulation,
} from "@angular/core";
import { STREAM_LABELS } from "../../stream.constants";
import { FormBuilder, FormGroup, Validators } from "@angular/forms";
import * as StreamActions from "../../store/actions";
import { Store, select } from "@ngrx/store";
import { StreamsState } from "../../store/reducers";
import { Observable, Subscription } from "rxjs";
import { Stream } from "../../stream.interface";
import { HttpErrorResponse } from "@angular/common/http";

@Component({
selector: "stream-form",
templateUrl: "./stream-form.component.html",
styleUrls: ["./stream-form.component.scss"],
encapsulation: ViewEncapsulation.None ,
encapsulation: ViewEncapsulation.None,
})
export class StreamFormComponent {
export class StreamFormComponent implements OnInit, OnDestroy {
STREAM_LABELS = STREAM_LABELS;
GENERIC_LABELS = GENERIC_LABELS;
streamForm: FormGroup;
selectedPositionValue: string = "";

Check failure on line 29 in nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.ts

View workflow job for this annotation

GitHub Actions / lint

Type string trivially inferred from a string literal, remove type annotation
selectedConsumerOption: string = "";

Check failure on line 30 in nuxeo-admin-console-web/angular-app/src/app/features/stream/components/stream-form/stream-form.component.ts

View workflow job for this annotation

GitHub Actions / lint

Type string trivially inferred from a string literal, remove type annotation
isSubmitBtnDisabled = false;
fetchStreamsSuccess$: Observable<Stream[]>;
fetchStreamsError$: Observable<unknown>;
fetchConsumersSuccess$: Observable<{ stream: string; consumer: string }[]>;
fetchConsumersError$: Observable<unknown>;
fetchRecordsSuccess$: Observable<unknown[]>;
fetchRecordsError$: Observable<unknown>;
streams: Stream[] = [];
records: unknown[] = [];
consumers: { stream: string; consumer: string }[] = [];
fetchStreamsErrorSubscription = new Subscription();
fetchStreamsSuccessSubscription = new Subscription();
fetchConsumersErrorSubscription = new Subscription();
fetchConsumersSuccessSubscription = new Subscription();
fetchRecordsErrorSubscription = new Subscription();
fetchRecordsSuccessSubscription = new Subscription();
selectedConsumer = "";
@Output() setRecordsData = new EventEmitter<unknown | null>();

constructor(
private fb: FormBuilder,
private store: Store<{ streams: StreamsState }>
) {
this.streamForm = this.fb.group({
stream: ["", Validators.required],
position: [null, Validators.required],
});

this.fetchStreamsSuccess$ = this.store.pipe(
select((state) => state.streams?.streams)
);

this.fetchStreamsError$ = this.store.pipe(
select((state) => state.streams?.error)
);

this.fetchConsumersSuccess$ = this.store.pipe(
select((state) => state.streams?.consumers)
);

this.fetchConsumersError$ = this.store.pipe(
select((state) => state.streams?.error)
);

this.fetchRecordsSuccess$ = this.store.pipe(
select((state) => state.streams?.records)
);

this.fetchRecordsError$ = this.store.pipe(
select((state) => state.streams?.error)
);
}

ngOnInit(): void {
this.fetchStreamsSuccessSubscription = this.fetchStreamsSuccess$.subscribe(
(data: Stream[]) => {
if (data?.length > 0) {
this.streams = data;
this.streamForm.patchValue({
stream: data[0]?.name,
});
const params = {
stream: this.streamForm.controls["stream"].value,
};

this.store.dispatch(StreamActions.fetchConsumers({ params }));
} else {
this.store.dispatch(StreamActions.fetchStreams());
}
}
);

this.fetchStreamsErrorSubscription = this.fetchStreamsError$.subscribe(
(error) => {
if (error instanceof HttpErrorResponse ? error?.error : error) {
console.log(error);
}
}
);

this.fetchConsumersSuccessSubscription =
this.fetchConsumersSuccess$.subscribe(
(data: { stream: string; consumer: string }[]) => {
if (data?.length > 0) {
this.consumers = data;
this.selectedConsumer = this.consumers
? this.consumers[0]?.consumer
: "";
this.streamForm.get("position")?.setValue(this.selectedConsumer);
}
}
);

this.fetchStreamsErrorSubscription = this.fetchStreamsError$.subscribe(
(error) => {
if (error instanceof HttpErrorResponse ? error?.error : error) {
console.log(error);
}
}
);

/* this.fetchRecordsSuccessSubscription = this.fetchRecordsSuccess$.subscribe(
(data: unknown[]) => {
if (data?.length > 0) {
this.records = data;
this.setRecordsData.emit(this.records);
console.log(this.records);
}
}
);
this.fetchRecordsErrorSubscription = this.fetchRecordsError$.subscribe(
(error) => {
if (error) {
this.setRecordsData.emit(null);
console.log(error);
}
}
); */
}

onConsumerOptionChange(selectedValue: string) {
this.selectedConsumer = selectedValue;
this.streamForm.get("position")?.setValue(selectedValue);
}

onStreamChange(value: string) {
this.streamForm.patchValue({
stream: value,
});
// this.store.dispatch(StreamActions.fetchConsumers({ stream: value }));
const params = {
stream: this.streamForm.controls["stream"].value,
};

this.store.dispatch(StreamActions.fetchConsumers({ params }));
}

onStreamFormSubmit() {
/* if (!this.streamForm?.valid && !this.isSubmitBtnDisabled) {
this.isSubmitBtnDisabled = true;
} else {
console.log(this.streamForm);
} */
console.log(this.streamForm);
const params = {
stream: this.streamForm?.get("stream")?.value,
fromGroup: this.streamForm?.get("position")?.value,
rewind: 0,
timeout: "1ms",
limit: 1,
};
this.store.dispatch(StreamActions.triggerRecordsSSEStream({ params }));
}


ngOnDestroy(): void {
// TODO: Use form values instead of hardcoded values for rewind, limit & timeout. Dynamically add position param
this.store.dispatch(StreamActions.resetFetchStreamsState());
this.store.dispatch(StreamActions.resetFetchConsumersState());
this.store.dispatch(StreamActions.resetFetchRecordsState());
this.fetchStreamsSuccessSubscription?.unsubscribe();
this.fetchStreamsErrorSubscription?.unsubscribe();
this.fetchConsumersSuccessSubscription?.unsubscribe();
this.fetchConsumersErrorSubscription?.unsubscribe();
this.fetchRecordsSuccessSubscription?.unsubscribe();
this.fetchRecordsErrorSubscription?.unsubscribe();
}
}
Original file line number Diff line number Diff line change
@@ -1 +1,25 @@
<p>I am the records </p>
<div class="stream-records">
<div class="stream-records__summary">
<!-- <p>{{docsProcessedText}}</p> -->
<p>Fetched 10 records</p>
</div>
<!-- <div class="title">
{{STREAM_LABELS.RECORDS_TITLE}}
Records
</div> -->
<mat-card class="stream-records__data" role="region" tabindex="0" aria-label="records data">
<mat-card-content>
<div class="probes-title">
Records
</div>
<!-- <div *ngIf="recordsData">
{{recordsData}}
</div> -->
<ul>
<li *ngFor="let record of records">
{{ record }}
</li>
</ul>
</mat-card-content>
</mat-card>
</div>
Loading

0 comments on commit a71d76f

Please sign in to comment.