Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple transactional outbox #4

Closed
gtoselli opened this issue Jan 9, 2024 · 12 comments · Fixed by #32
Closed

Simple transactional outbox #4

gtoselli opened this issue Jan 9, 2024 · 12 comments · Fixed by #32
Assignees

Comments

@gtoselli
Copy link
Contributor

gtoselli commented Jan 9, 2024

No description provided.

@gtoselli gtoselli converted this from a draft issue Jan 9, 2024
@lucagiove lucagiove moved this from Todo to In Progress in ddd-toolkit Jan 10, 2024
@lucagiove lucagiove added this to the 0.1.0 milestone Jan 10, 2024
@lucagiove
Copy link
Contributor

In case there are multiple repositories using the outbox pattern probably we should add a key to separate them otherwise it might use the wrong function.
Imagine

  • Repo1 -> callback1 (not identified by the job payload)
  • Repo2 -> callback2
    The executeAllScheduledJobs on Repo1 can find scheduled jobs of Repo2 and execute callback1 on them instead of callback1

@gtoselli
Copy link
Contributor Author

Now the scheduledBy field in outbox collection is the hostname.

We can concatenate hostname with something like contextName.

Maybe we can add a new field and use both scheduledBy and contextName

@lucagiove
Copy link
Contributor

I would use a dedicated property in the document, that might be the name of the aggregateName that should be unique.
If we want to keep more generic for outbox contextName might work.
The issue with being too generic is that the aggregate method can't be saveAndPublish, but saveAndRunJobs that's not nice.
Maybe we can provide a dedicated interface of the outbox with names more for messaging that maps on a generic outbox, and same for repo might have different interfaces for different usages that maps to the same methods.

To discuss further the api interfaces of the lib here: #9

@gtoselli
Copy link
Contributor Author

Today I wrote another Outbox class here on the issue branch. I introduced the aggregateName to differentiate multiple instances on the same host.

For this milestone (0.1.0) to keep the outbox api closed to "messaging" concept. The outbox interface is

  • scheduleEvent
  • publishAllScheduledEvents
  • publishAllEventsScheduledByMe
  • startOutboxWatching

The OutboxEvent is

  • eventPayload: any
  • eventRoutingKey: string

Maybe in the future we will discuss a more generic interface.

@lucagiove
Copy link
Contributor

I was thinking that probably os.hostname might be good for k8s but not in other environments where you might have multi processes on the same machine or in case physical hardware where hostname as not been customized.
What would be the drawback of using a uuid?

@gtoselli
Copy link
Contributor Author

Yes I had thought of that.
scheduledBy field should be the identifier of something go down or up in the same time (pod in k8s, process in local machine for example)

In case of two app that use ddd-toolkit lib on the same machine I think PID should be used.

In this implementation, the hostname is used as the default value of the hostname argument of the Outbox class constructor.
We should find a better name!

If you use ddd-toolkit in the cloud, you will use the default value (os.hostame), but if you want you can pass something else (like process.pid). Will it be easy to explain this clearly in the documentation?

@gtoselli gtoselli mentioned this issue Jan 12, 2024
@gtoselli gtoselli self-assigned this Jan 12, 2024
@lucagiove
Copy link
Contributor

lucagiove commented Jan 12, 2024

But if you go up and down pid changes. In k8s a restart probably the hostname is kept but at startup or shutdown all events are sent so it's not really useful restoring a previous used value.

If we use a certainly unique value this mechanism can be transparent to the library user and it's a value under the library control.
What are the drawbacks?

@gtoselli
Copy link
Contributor Author

So you would put in the scheduledBy field a uuid generated for each instance of the outbox? Because then aggregateName becomes useless.

Let's decide if it makes sense to combine the outbox instances in the same process (host in the case of k8s)

@lucagiove
Copy link
Contributor

aggregateName meaning is mapping data with code callback, the uuid could be the same per node but yes if each instance is in charge than one random uuid would be useful for both aims

@gtoselli
Copy link
Contributor Author

gtoselli commented Jan 16, 2024

Is the uuid the same for every instance of the Outbox class in the same app?

In any case the aggregateName is needed for the data-fn mapping.
If however the uuid you want to make it global (for the whole app) it cannot be transparent and must be passed from the app to the library (and managed as a global provider in the nest DI)

@gtoselli
Copy link
Contributor Author

gtoselli commented Feb 6, 2024

I might have a good idea.
In order not to complicate our lives with the leader instance and without having a single point of failure, a simple solution could be this

  • each instance of the app has the cdc active
  • when the change is received, each instance tries to update the document (simultaneously or almost simultaneously) by setting the status to processing.
  • mongo guarantees that the updateOne is atomic and that only one of the competing operations can modify the document.
  • the event publication is only taken by the instance that succeed (via modifiedCount) to update the status

What do you think @lucagiove?

// cdc async iterator
for await (const change of this.activeChangeStream) {
	console.log('change!');
	await this.publishEvent((change as any).fullDocument);
}

private async publishEvent(outBoxModel: OutBoxModel) {
	const { modifiedCount } = await this.collection.updateOne(
		{ _id: outBoxModel._id },
		{ $set: { status: 'processing' } },
	);
	if (modifiedCount !== 1) {
		this.logger.log(`Already processing by another outbox`);
		return;
	}

	try {
		await this.publishEventFn(outBoxModel.event);
		await this.collection.updateOne(
			{ _id: outBoxModel._id },
			{
				$set: {
					status: 'published',
					publishedAt: new Date(),
				},
			},
		);
	} catch (e) {
		this.logger.warn(`Failed publishEventFn with ${JSON.stringify(outBoxModel.event)}`);
	}
}

@lucagiove lucagiove removed this from the 0.1.0 milestone Mar 30, 2024
@lucagiove
Copy link
Contributor

I would remove this from the first beta and keep repo, event bus and command bus

@gtoselli gtoselli linked a pull request Mar 30, 2024 that will close this issue
@github-project-automation github-project-automation bot moved this from In Progress to Done in ddd-toolkit Apr 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

2 participants