@@ -27,137 +27,134 @@ import {
27
27
JobOutboundMessageKind ,
28
28
} from './api' ;
29
29
30
- // eslint-disable-next-line @typescript-eslint/no-namespace
31
- export namespace strategy {
32
- export type JobStrategy <
33
- A extends JsonValue = JsonValue ,
34
- I extends JsonValue = JsonValue ,
35
- O extends JsonValue = JsonValue ,
36
- > = (
37
- handler : JobHandler < A , I , O > ,
38
- options ?: Partial < Readonly < JobDescription > > ,
39
- ) => JobHandler < A , I , O > ;
40
-
41
- /**
42
- * Creates a JobStrategy that serializes every call. This strategy can be mixed between jobs.
43
- */
44
- export function serialize <
45
- A extends JsonValue = JsonValue ,
46
- I extends JsonValue = JsonValue ,
47
- O extends JsonValue = JsonValue ,
48
- > ( ) : JobStrategy < A , I , O > {
49
- let latest : Observable < JobOutboundMessage < O > > = of ( ) ;
50
-
51
- return ( handler , options ) => {
52
- const newHandler = ( argument : A , context : JobHandlerContext < A , I , O > ) => {
53
- const previous = latest ;
54
- latest = concat (
55
- previous . pipe ( ignoreElements ( ) ) ,
56
- new Observable < JobOutboundMessage < O > > ( ( o ) => handler ( argument , context ) . subscribe ( o ) ) ,
57
- ) . pipe ( shareReplay ( 0 ) ) ;
58
-
59
- return latest ;
60
- } ;
61
-
62
- return Object . assign ( newHandler , {
63
- jobDescription : Object . assign ( { } , handler . jobDescription , options ) ,
64
- } ) ;
30
+ export type JobStrategy <
31
+ A extends JsonValue = JsonValue ,
32
+ I extends JsonValue = JsonValue ,
33
+ O extends JsonValue = JsonValue ,
34
+ > = (
35
+ handler : JobHandler < A , I , O > ,
36
+ options ?: Partial < Readonly < JobDescription > > ,
37
+ ) => JobHandler < A , I , O > ;
38
+
39
+ /**
40
+ * Creates a JobStrategy that serializes every call. This strategy can be mixed between jobs.
41
+ */
42
+ export function serialize <
43
+ A extends JsonValue = JsonValue ,
44
+ I extends JsonValue = JsonValue ,
45
+ O extends JsonValue = JsonValue ,
46
+ > ( ) : JobStrategy < A , I , O > {
47
+ let latest : Observable < JobOutboundMessage < O > > = of ( ) ;
48
+
49
+ return ( handler , options ) => {
50
+ const newHandler = ( argument : A , context : JobHandlerContext < A , I , O > ) => {
51
+ const previous = latest ;
52
+ latest = concat (
53
+ previous . pipe ( ignoreElements ( ) ) ,
54
+ new Observable < JobOutboundMessage < O > > ( ( o ) => handler ( argument , context ) . subscribe ( o ) ) ,
55
+ ) . pipe ( shareReplay ( 0 ) ) ;
56
+
57
+ return latest ;
65
58
} ;
66
- }
67
-
68
- /**
69
- * Creates a JobStrategy that will always reuse a running job, and restart it if the job ended.
70
- * @param replayMessages Replay ALL messages if a job is reused, otherwise just hook up where it
71
- * is.
72
- */
73
- export function reuse <
74
- A extends JsonValue = JsonValue ,
75
- I extends JsonValue = JsonValue ,
76
- O extends JsonValue = JsonValue ,
77
- > ( replayMessages = false ) : JobStrategy < A , I , O > {
78
- let inboundBus = new Subject < JobInboundMessage < I > > ( ) ;
79
- let run : Observable < JobOutboundMessage < O > > | null = null ;
80
- let state : JobOutboundMessage < O > | null = null ;
81
-
82
- return ( handler , options ) => {
83
- const newHandler = ( argument : A , context : JobHandlerContext < A , I , O > ) => {
84
- // Forward inputs.
85
- const subscription = context . inboundBus . subscribe ( inboundBus ) ;
86
-
87
- if ( run ) {
88
- return concat (
89
- // Update state.
90
- of ( state ) ,
91
- run ,
92
- ) . pipe ( finalize ( ( ) => subscription . unsubscribe ( ) ) ) ;
93
- }
94
-
95
- run = handler ( argument , { ...context , inboundBus : inboundBus . asObservable ( ) } ) . pipe (
96
- tap (
97
- ( message ) => {
98
- if (
99
- message . kind == JobOutboundMessageKind . Start ||
100
- message . kind == JobOutboundMessageKind . OnReady ||
101
- message . kind == JobOutboundMessageKind . End
102
- ) {
103
- state = message ;
104
- }
105
- } ,
106
- undefined ,
107
- ( ) => {
108
- subscription . unsubscribe ( ) ;
109
- inboundBus = new Subject < JobInboundMessage < I > > ( ) ;
110
- run = null ;
111
- } ,
112
- ) ,
113
- replayMessages ? shareReplay ( ) : share ( ) ,
114
- ) ;
115
-
116
- return run ;
117
- } ;
118
-
119
- return Object . assign ( newHandler , handler , options || { } ) ;
59
+
60
+ return Object . assign ( newHandler , {
61
+ jobDescription : Object . assign ( { } , handler . jobDescription , options ) ,
62
+ } ) ;
63
+ } ;
64
+ }
65
+
66
+ /**
67
+ * Creates a JobStrategy that will always reuse a running job, and restart it if the job ended.
68
+ * @param replayMessages Replay ALL messages if a job is reused, otherwise just hook up where it
69
+ * is.
70
+ */
71
+ export function reuse <
72
+ A extends JsonValue = JsonValue ,
73
+ I extends JsonValue = JsonValue ,
74
+ O extends JsonValue = JsonValue ,
75
+ > ( replayMessages = false ) : JobStrategy < A , I , O > {
76
+ let inboundBus = new Subject < JobInboundMessage < I > > ( ) ;
77
+ let run : Observable < JobOutboundMessage < O > > | null = null ;
78
+ let state : JobOutboundMessage < O > | null = null ;
79
+
80
+ return ( handler , options ) => {
81
+ const newHandler = ( argument : A , context : JobHandlerContext < A , I , O > ) => {
82
+ // Forward inputs.
83
+ const subscription = context . inboundBus . subscribe ( inboundBus ) ;
84
+
85
+ if ( run ) {
86
+ return concat (
87
+ // Update state.
88
+ of ( state ) ,
89
+ run ,
90
+ ) . pipe ( finalize ( ( ) => subscription . unsubscribe ( ) ) ) ;
91
+ }
92
+
93
+ run = handler ( argument , { ...context , inboundBus : inboundBus . asObservable ( ) } ) . pipe (
94
+ tap (
95
+ ( message ) => {
96
+ if (
97
+ message . kind == JobOutboundMessageKind . Start ||
98
+ message . kind == JobOutboundMessageKind . OnReady ||
99
+ message . kind == JobOutboundMessageKind . End
100
+ ) {
101
+ state = message ;
102
+ }
103
+ } ,
104
+ undefined ,
105
+ ( ) => {
106
+ subscription . unsubscribe ( ) ;
107
+ inboundBus = new Subject < JobInboundMessage < I > > ( ) ;
108
+ run = null ;
109
+ } ,
110
+ ) ,
111
+ replayMessages ? shareReplay ( ) : share ( ) ,
112
+ ) ;
113
+
114
+ return run ;
120
115
} ;
121
- }
122
-
123
- /**
124
- * Creates a JobStrategy that will reuse a running job if the argument matches.
125
- * @param replayMessages Replay ALL messages if a job is reused, otherwise just hook up where it
126
- * is.
127
- */
128
- export function memoize <
129
- A extends JsonValue = JsonValue ,
130
- I extends JsonValue = JsonValue ,
131
- O extends JsonValue = JsonValue ,
132
- > ( replayMessages = false ) : JobStrategy < A , I , O > {
133
- const runs = new Map < string , Observable < JobOutboundMessage < O > > > ( ) ;
134
-
135
- return ( handler , options ) = > {
136
- const newHandler = ( argument : A , context : JobHandlerContext < A , I , O > ) => {
137
- const argumentJson = JSON . stringify (
138
- isJsonObject ( argument )
139
- ? Object . keys ( argument )
140
- . sort ( )
141
- . reduce ( ( result , key ) => {
142
- result [ key ] = argument [ key ] ;
143
-
144
- return result ;
145
- } , { } as JsonObject )
146
- : argument ,
147
- ) ;
148
- const maybeJob = runs . get ( argumentJson ) ;
149
-
150
- if ( maybeJob ) {
151
- return maybeJob ;
152
- }
153
-
154
- const run = handler ( argument , context ) . pipe ( replayMessages ? shareReplay ( ) : share ( ) ) ;
155
- runs . set ( argumentJson , run ) ;
156
-
157
- return run ;
158
- } ;
159
-
160
- return Object . assign ( newHandler , handler , options || { } ) ;
116
+
117
+ return Object . assign ( newHandler , handler , options || { } ) ;
118
+ } ;
119
+ }
120
+
121
+ /**
122
+ * Creates a JobStrategy that will reuse a running job if the argument matches.
123
+ * @param replayMessages Replay ALL messages if a job is reused, otherwise just hook up where it
124
+ * is.
125
+ */
126
+ export function memoize <
127
+ A extends JsonValue = JsonValue ,
128
+ I extends JsonValue = JsonValue ,
129
+ O extends JsonValue = JsonValue ,
130
+ > ( replayMessages = false ) : JobStrategy < A , I , O > {
131
+ const runs = new Map < string , Observable < JobOutboundMessage < O > > > ( ) ;
132
+
133
+ return ( handler , options ) => {
134
+ const newHandler = ( argument : A , context : JobHandlerContext < A , I , O > ) => {
135
+ const argumentJson = JSON . stringify (
136
+ isJsonObject ( argument )
137
+ ? Object . keys ( argument )
138
+ . sort ( )
139
+ . reduce ( ( result , key ) => {
140
+ result [ key ] = argument [ key ] ;
141
+
142
+ return result ;
143
+ } , { } as JsonObject )
144
+ : argument ,
145
+ ) ;
146
+ const maybeJob = runs . get ( argumentJson ) ;
147
+
148
+ if ( maybeJob ) {
149
+ return maybeJob ;
150
+ }
151
+
152
+ const run = handler ( argument , context ) . pipe ( replayMessages ? shareReplay ( ) : share ( ) ) ;
153
+ runs . set ( argumentJson , run ) ;
154
+
155
+ return run ;
161
156
} ;
162
- }
157
+
158
+ return Object . assign ( newHandler , handler , options || { } ) ;
159
+ } ;
163
160
}
0 commit comments