Skip to content

Commit

Permalink
TASK: Inline discoverDetachedSubscriptions
Browse files Browse the repository at this point in the history
To reduce additional sql query and lock, and do it in the main transaction
  • Loading branch information
mhsdesign committed Nov 23, 2024
1 parent d582766 commit 5dfa592
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,46 @@ public function projectionIsDetachedIfConfigurationIsRemoved()
$this->getObject(ContentRepositoryRegistry::class)->resetFactoryInstance($this->contentRepository->id);
$this->setupContentRepositoryDependencies($this->contentRepository->id);

// todo status is stale??, should be DETACHED
// todo status is stale??, should be DETACHED, and also cr:setup should marke detached projections?!!
// $this->expectOkayStatus('Vendor.Package:FakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none());

$this->fakeProjection->expects(self::never())->method('apply');
// catchup or anything that finds detached subscribers
$result = $this->subscriptionEngine->catchUpActive();
// todo result should reflect that there was an detachment? Throw error in CR?
self::assertEquals(ProcessedResult::success(1), $result);

$expectedDetachedState = SubscriptionAndProjectionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'),
subscriptionStatus: SubscriptionStatus::DETACHED,
subscriptionPosition: SequenceNumber::none(),
subscriptionError: null,
projectionStatus: null // not calculate-able at this point!
);
self::assertEquals(
$expectedDetachedState,
$this->subscriptionStatus('Vendor.Package:FakeProjection')
);

// other projections are not interrupted
self::assertEquals(
[SequenceNumber::fromInteger(1)],
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);

// succeeding catchup's do not handle the detached one:
$this->commitExampleContentStreamEvent();
$result = $this->subscriptionEngine->catchUpActive();
self::assertEquals(ProcessedResult::success(1), $result);

self::assertEquals(
[SequenceNumber::fromInteger(1), SequenceNumber::fromInteger(2)],
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);

// still detached
self::assertEquals(
SubscriptionAndProjectionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:FakeProjection'),
subscriptionStatus: SubscriptionStatus::DETACHED,
subscriptionPosition: SequenceNumber::none(),
subscriptionError: null,
projectionStatus: null // not calculate-able at this point!
),
$expectedDetachedState,
$this->subscriptionStatus('Vendor.Package:FakeProjection')
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,27 +160,6 @@ function (Subscriptions $subscriptions) {
);
}

private function discoverDetachedSubscriptions(SubscriptionEngineCriteria $criteria): void
{
$registeredSubscriptions = $this->subscriptionStore->findByCriteria(SubscriptionCriteria::create(
$criteria->ids,
SubscriptionStatusFilter::fromArray([SubscriptionStatus::ACTIVE]),
));
foreach ($registeredSubscriptions as $subscription) {
if ($this->subscribers->contain($subscription->id)) {
continue;
}
$subscription->set(
status: SubscriptionStatus::DETACHED,
);
$this->subscriptionManager->update($subscription);
$this->logger?->info(sprintf('Subscription Engine: Subscriber for "%s" not found and has been marked as detached.', $subscription->id->value));
}
// todo use transaction here for discovery as well!!!
$this->subscriptionManager->flush();
}


/**
* Set up the subscription by retrieving the corresponding subscriber and calling the setUp method on its handler
* If the setup fails, the subscription will be in the {@see SubscriptionStatus::ERROR} state and a corresponding {@see Error} is returned
Expand Down Expand Up @@ -227,11 +206,21 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs
$this->logger?->info(sprintf('Subscription Engine: Start catching up subscriptions in state "%s".', $subscriptionStatus->value));

$this->discoverNewSubscriptions();
$this->discoverDetachedSubscriptions($criteria);

return $this->subscriptionManager->findForAndUpdate(
SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, $subscriptionStatus),
function (Subscriptions $subscriptions) use ($subscriptionStatus, $progressClosure) {
foreach ($subscriptions as $subscription) {
if (!$this->subscribers->contain($subscription->id)) {
// mark detached subscriptions as we cannot handle them and exclude them from catchup
$subscription->set(
status: SubscriptionStatus::DETACHED,
);
$this->subscriptionManager->update($subscription);
$this->logger?->info(sprintf('Subscription Engine: Subscriber for "%s" not found and has been marked as detached.', $subscription->id->value));
$subscriptions = $subscriptions->without($subscription->id);
}
}
if ($subscriptions->isEmpty()) {
$this->logger?->info(sprintf('Subscription Engine: No subscriptions in state "%s". Finishing catch up', $subscriptionStatus->value));

Expand Down

0 comments on commit 5dfa592

Please sign in to comment.