Skip to content

Commit caaff93

Browse files
committed
Add a script for rebuilding the users table from scratch
1 parent 77998d4 commit caaff93

File tree

5 files changed

+83
-57
lines changed

5 files changed

+83
-57
lines changed

package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
"run-docker": "docker run -p 8080:8080 --env-file=.env -d --name user-service-container user-service",
3434
"prepare": "[[ -z \"$NODE_END\" || \"$NODE_ENV\" = \"development\" ]] && husky || true",
3535
"knex": "tsx ./node_modules/knex/bin/cli.js",
36-
"create-user": "tsx ./scripts/create_user.ts"
36+
"create-user": "tsx ./scripts/create_user.ts",
37+
"rebuild-users": "tsx ./scripts/rebuild_users.ts"
3738
},
3839
"dependencies": {
3940
"@sentry/node": "^8.25.0",

scripts/rebuild_users.ts

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import UserService from "../src/services/UserService";
2+
3+
async function main() {
4+
await UserService.rebuild();
5+
process.exit(0);
6+
}
7+
8+
main();

src/dao/UserDao.ts

+4
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,10 @@ export class UserDao implements Dao<UserDatabaseObject> {
241241
};
242242
return Promise.resolve(this.knex<UserDatabaseObject>(tableName).insert(savedObj));
243243
}
244+
245+
public async clear() {
246+
await this.knex(tableName).del();
247+
}
244248
}
245249

246250
export default new UserDao();

src/services/NatsService.ts

+12-10
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ export default class NatsService {
103103
await this.setup();
104104
}
105105

106+
async resetConsumer() {
107+
const jsm = await this.conn.jetstreamManager();
108+
await jsm.consumers.delete(STREAM, CONSUMER);
109+
await this.setup();
110+
}
111+
106112
private async setup() {
107113
// Luodaan tarvittavat JetStream streamit ja consumerit.
108114

@@ -298,14 +304,14 @@ export default class NatsService {
298304
* Tämän toteutus on jotenkin vaikeampi kuin NATS muuten antaisi olettaa,
299305
* joten luulen että tähän on joku fiksumpikin tapa.
300306
*/
301-
public async fetch(subject: string): Promise<JsMsg[]> {
307+
public async fetch(subject: string, callback?: (msg: JsMsg) => Promise<void>): Promise<JsMsg[]> {
302308
const js = this.conn.jetstream();
303309

304310
// Haetaan palvelimelta subjektin viimeisin viesti,
305311
// jotta saamme sen järjestysnumeron.
306312
const c1 = await js.consumers.get(STREAM, {
307313
// Haluamme vain subjektin viimeisimmän viestin.
308-
deliver_policy: DeliverPolicy.LastPerSubject,
314+
deliver_policy: DeliverPolicy.Last,
309315

310316
// Haluamme vain yhden subjektin viestin.
311317
filterSubjects: [subject],
@@ -328,22 +334,18 @@ export default class NatsService {
328334

329335
const results = [];
330336

331-
while (true) {
332-
// Haetaan viesti kerrallaan.
333-
const msg = await consumer.next();
334-
335-
// Ilmeisesti sieltä voi tulla myös null, en tiedä miksi.
336-
if (!msg) break;
337-
337+
for await (const msg of await consumer.consume()) {
338338
// Jos se ei ollut null, niin sitten otetaan se talteen.
339339
results.push(msg);
340340

341+
await callback?.(msg);
342+
341343
// Jos viestin järjestysnumero vastaa aiemmin hakemaamme viimeistintä viestiä,
342344
// lopetetaan viestien hakeminen.
343345
//
344346
// Tämän ei pitäisi olla tarpeellista, vaan NATS-ille pitäisi pystyä sanomaan myös,
345347
// että emme halua odottaa uusia viestejä. Enpäs saanut toimimaan, niin tehdään sitten näin.
346-
if (msg.seq === last.seq) {
348+
if (msg.seq === last.seq || msg.info.pending === 0) {
347349
break;
348350
}
349351
}

src/services/UserService.ts

+57-46
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,62 @@ class UserService {
383383
return this.dao.withTransaction(dao => callback(new UserService(dao)));
384384
}
385385

386+
public async rebuild() {
387+
const nats = await NatsService.get();
388+
389+
await this.transaction(async tsx => {
390+
await tsx.dao.clear();
391+
await nats.fetch("members.*", async msg => {
392+
const data = msg.json();
393+
await tsx.handleMessage(data, msg);
394+
});
395+
});
396+
}
397+
398+
public async handleMessage(pEvent: unknown, msg: JsMsg) {
399+
const event = UserEvent.parse(pEvent);
400+
401+
const userId = parseInt(msg.subject.split(".")[1], 10);
402+
403+
if (event.type === "set") {
404+
// Parsitaan käyttäjän ID viestin subjektista, joka on muotoa `members.{id}`.
405+
406+
if (event.fields.created) {
407+
event.fields.created = new Date(event.fields.created);
408+
}
409+
410+
if (event.fields.registration_ban_bypass_until) {
411+
event.fields.registration_ban_bypass_until = new Date(event.fields.registration_ban_bypass_until);
412+
}
413+
414+
// Päivitetään muokkausviestin mukaiset arvot tietokantaan.
415+
await this.dao.update(userId, event.fields);
416+
} else if (event.type === "create" || event.type === "import") {
417+
if (event.fields.created) {
418+
event.fields.created = new Date(event.fields.created);
419+
}
420+
421+
if (event.fields.modified) {
422+
event.fields.modified = new Date(event.fields.modified);
423+
}
424+
425+
await this.dao.save({
426+
...event.fields,
427+
id: userId,
428+
last_seq: msg.seq,
429+
});
430+
431+
return;
432+
} else if (event.type === "delete") {
433+
await this.dao.remove(userId);
434+
return;
435+
}
436+
437+
// Tallennetaan tietokantaan tieto siitä, että olemme käsitelleet tämän viestin,
438+
// vaikka viesti ei olisikaan meille relevantti.
439+
await this.dao.update(userId, { last_seq: msg.seq });
440+
}
441+
386442
/**
387443
* Taustaprosessi, joka kuuntelee jäsentieto-streamiin julkaistuja viestejä
388444
* ja käsittelee ne.
@@ -393,52 +449,7 @@ class UserService {
393449
this.abortSignal = new ConsumerAbortSignal();
394450

395451
return new Promise<void>(resolve => {
396-
const handler = async (pEvent: unknown, msg: JsMsg) => {
397-
const event = UserEvent.parse(pEvent);
398-
399-
const userId = parseInt(msg.subject.split(".")[1], 10);
400-
401-
if (event.type === "set") {
402-
// Parsitaan käyttäjän ID viestin subjektista, joka on muotoa `members.{id}`.
403-
404-
if (event.fields.created) {
405-
// Tämä piti tehdä jostain syystä. Syyttäkää user-serviceä älkääkä NATSia.
406-
event.fields.created = new Date(event.fields.created);
407-
}
408-
409-
if (event.fields.registration_ban_bypass_until) {
410-
event.fields.registration_ban_bypass_until = new Date(event.fields.registration_ban_bypass_until);
411-
}
412-
413-
// Päivitetään muokkausviestin mukaiset arvot tietokantaan.
414-
await this.dao.update(userId, event.fields);
415-
} else if (event.type === "create" || event.type === "import") {
416-
if (event.fields.created) {
417-
event.fields.created = new Date(event.fields.created);
418-
}
419-
420-
if (event.fields.modified) {
421-
event.fields.modified = new Date(event.fields.modified);
422-
}
423-
424-
await this.dao.save({
425-
...event.fields,
426-
id: userId,
427-
last_seq: msg.seq,
428-
});
429-
430-
return;
431-
} else if (event.type === "delete") {
432-
await this.dao.remove(userId);
433-
return;
434-
}
435-
436-
// Tallennetaan tietokantaan tieto siitä, että olemme käsitelleet tämän viestin,
437-
// vaikka viesti ei olisikaan meille relevantti.
438-
await this.dao.update(userId, { last_seq: msg.seq });
439-
};
440-
441-
nats.subscribe(handler, {
452+
nats.subscribe(this.handleMessage.bind(this), {
442453
onReady: () => resolve(),
443454
signal: this.abortSignal,
444455
});

0 commit comments

Comments
 (0)