@@ -10,6 +10,7 @@ import { hashPasswordAsync, validatePasswordHashAsync } from "../utils/UserHelpe
10
10
import NatsService , { ConsumerAbortSignal } from "../services/NatsService" ;
11
11
import { JetStreamPublishOptions , JsMsg , headers , JsHeaders , PubAck } from "nats" ;
12
12
import z from "zod" ;
13
+ import { isEqual } from "lodash" ;
13
14
14
15
const UserFields = z . record ( z . string ( ) , z . any ( ) ) . transform ( v => v as UserDatabaseObject ) ;
15
16
@@ -387,11 +388,75 @@ class UserService {
387
388
const nats = await NatsService . get ( ) ;
388
389
389
390
await this . transaction ( async tsx => {
391
+ const usersBefore = new Map (
392
+ ( await tsx . dao . findAll ( ) ) . map ( user => [ user . id , user ] as [ number , UserDatabaseObject ] ) ,
393
+ ) ;
394
+
390
395
await tsx . dao . clear ( ) ;
391
396
await nats . fetch ( "members.*" , async msg => {
392
397
const data = msg . json ( ) ;
393
398
await tsx . handleMessage ( data , msg ) ;
394
399
} ) ;
400
+
401
+ let error = false ;
402
+
403
+ const usersAfter = new Map (
404
+ ( await tsx . dao . findAll ( ) ) . map ( user => [ user . id , user ] as [ number , UserDatabaseObject ] ) ,
405
+ ) ;
406
+
407
+ const ids = new Set ( [ ...usersBefore . keys ( ) , ...usersAfter . keys ( ) ] ) ;
408
+
409
+ ids . forEach ( id => {
410
+ const before = usersBefore . get ( id ) ;
411
+ const after = usersAfter . get ( id ) ;
412
+
413
+ if ( ! after && before ) {
414
+ console . log ( `User ${ before . id } (${ before . username } ) disappeared during rebuild!` ) ;
415
+ error = true ;
416
+ return ;
417
+ }
418
+
419
+ if ( ! before && after ) {
420
+ console . log ( `New user ${ after . id } (${ after . username } ) appeared during rebuild!` ) ;
421
+ error = true ;
422
+ return ;
423
+ }
424
+
425
+ if ( ! before || ! after ) {
426
+ throw new Error ( "Unreachable." ) ;
427
+ }
428
+
429
+ if ( ! isEqual ( before , after ) ) {
430
+ const diff = Object . entries ( before ) . flatMap ( ( [ key , beforeValue ] ) => {
431
+ const afterValue = after [ key as keyof UserDatabaseObject ] ;
432
+
433
+ if ( isEqual ( beforeValue , afterValue ) ) {
434
+ return [ ] ;
435
+ }
436
+
437
+ return [ [ key , beforeValue , afterValue ] ] ;
438
+ } ) ;
439
+
440
+ if ( diff . length === 0 ) {
441
+ console . log ( `User ${ id } changed during rebuild!` ) ;
442
+ } else {
443
+ diff . forEach ( ( [ key , before , after ] ) => {
444
+ console . log ( `Field ${ key } of user ${ id } changed: ${ before } -> ${ after } ` ) ;
445
+ } ) ;
446
+ }
447
+ }
448
+ } ) ;
449
+
450
+ if ( usersAfter . size !== usersBefore . size ) {
451
+ console . log (
452
+ `User count does not match before (${ usersBefore . size } ) and after (${ usersAfter . size } ) the rebuild.` ,
453
+ ) ;
454
+ error = true ;
455
+ }
456
+
457
+ if ( error ) {
458
+ throw new Error ( "Abnormalities were detected after the rebuild. Rolling back..." ) ;
459
+ }
395
460
} ) ;
396
461
}
397
462
0 commit comments