Skip to content

[WIP] feat: support delete event reconiliation #2762

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@ default String fieldManager() {
}

<C> C getConfigurationFor(DependentResourceSpec<?, P, C> spec);

default boolean reconcileOnPrimaryDelete() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
@SuppressWarnings("unused")
IndexedResourceCache<P> getPrimaryCache();

boolean isPrimaryDeleted();

/**
* Determines whether a new reconciliation will be triggered right after the current
* reconciliation is finished. This allows to optimize certain situations, helping avoid unneeded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,10 @@ MaxReconciliationInterval maxReconciliationInterval() default
* @return the name used as field manager for SSA operations
*/
String fieldManager() default CONTROLLER_NAME_AS_FIELD_MANAGER;

/**
* Will trigger reconciliation on delete event of the primary resource. Can be set to true only if
* the reconciler does not implement {@link Cleaner} interface.
*/
boolean reconcileOnPrimaryDelete() default false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ public class DefaultContext<P extends HasMetadata> implements Context<P> {
private final ControllerConfiguration<P> controllerConfiguration;
private final DefaultManagedWorkflowAndDependentResourceContext<P>
defaultManagedDependentResourceContext;
private final boolean isPrimaryDeleted;

public DefaultContext(RetryInfo retryInfo, Controller<P> controller, P primaryResource) {
public DefaultContext(
RetryInfo retryInfo, Controller<P> controller, P primaryResource, boolean isPrimaryDeleted) {
this.retryInfo = retryInfo;
this.controller = controller;
this.primaryResource = primaryResource;
this.controllerConfiguration = controller.getConfiguration();
this.isPrimaryDeleted = isPrimaryDeleted;
this.defaultManagedDependentResourceContext =
new DefaultManagedWorkflowAndDependentResourceContext<>(controller, primaryResource, this);
}
Expand All @@ -49,6 +52,11 @@ public IndexedResourceCache<P> getPrimaryCache() {
return controller.getEventSourceManager().getControllerEventSource();
}

@Override
public boolean isPrimaryDeleted() {
return isPrimaryDeleted;
}

@Override
public boolean isNextReconciliationImminent() {
return controller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
private final Cache<P> cache;
private final EventSourceManager<P> eventSourceManager;
private final RateLimiter<? extends RateLimitState> rateLimiter;
private final ResourceStateManager resourceStateManager = new ResourceStateManager();
private final ResourceStateManager<P> resourceStateManager = new ResourceStateManager<>();
private final Map<String, Object> metricsMetadata;
private ExecutorService executor;

// todo handle/test case when there is finalizer but not ours
public EventProcessor(
EventSourceManager<P> eventSourceManager, ConfigurationService configurationService) {
this(
Expand Down Expand Up @@ -121,38 +122,54 @@ public synchronized void handleEvent(Event event) {
}
}

private void handleMarkedEventForResource(ResourceState state) {
if (state.deleteEventPresent()) {
private void handleMarkedEventForResource(ResourceState<P> state) {
if (doCleanupForDeleteEvent(state)) {
cleanupForDeletedEvent(state.getId());
} else if (!state.processedMarkForDeletionPresent()) {
} else if (!state.processedMarkForDeletionPresent()
&& !state.deleteEventReconciliationSubmitted()) {
submitReconciliationExecution(state);
}
}

private void submitReconciliationExecution(ResourceState state) {
private boolean doCleanupForDeleteEvent(ResourceState<P> state) {
return state.deleteEventPresent() && !controllerConfiguration.reconcileOnPrimaryDelete();
}

private void submitReconciliationExecution(ResourceState<P> state) {
try {
boolean controllerUnderExecution = isControllerUnderExecution(state);
final var resourceID = state.getId();
Optional<P> maybeLatest = cache.get(resourceID);
Optional<P> maybeLatest = getCachedResource(resourceID, state);
maybeLatest.ifPresent(MDCUtils::addResourceInfo);
if (!controllerUnderExecution && maybeLatest.isPresent()) {
var rateLimit = state.getRateLimit();
if (rateLimit == null) {
rateLimit = rateLimiter.initState();
state.setRateLimit(rateLimit);
}
// todo rate limit handling
var rateLimiterPermission = rateLimiter.isLimited(rateLimit);
if (rateLimiterPermission.isPresent()) {
handleRateLimitedSubmission(resourceID, rateLimiterPermission.get());
return;
}
state.setUnderProcessing(true);
final var latest = maybeLatest.get();
ExecutionScope<P> executionScope = new ExecutionScope<>(state.getRetry());
state.unMarkEventReceived();
ExecutionScope<P> executionScope =
new ExecutionScope<>(state.getRetry(), state.deleteEventPresent());

if (state.deleteEventPresent()) {
state.markDeleteEventReconciliationSubmitted();
} else if (!state.deleteEventReconciliationSubmitted()) { // if there is a retry
state.unMarkEventReceived();
}
metrics.reconcileCustomResource(latest, state.getRetry(), metricsMetadata);
log.debug("Executing events for custom resource. Scope: {}", executionScope);
executor.execute(new ReconcilerExecutor(resourceID, executionScope));
executor.execute(
new ReconcilerExecutor(
resourceID,
executionScope,
state.deleteEventReconciliationSubmitted() ? latest : null));
} else {
log.debug(
"Skipping executing controller for resource id: {}. Controller in execution: {}. Latest"
Expand All @@ -164,7 +181,7 @@ private void submitReconciliationExecution(ResourceState state) {
// there can be multiple reasons why the primary resource is not present, one is that the
// informer is currently disconnected from k8s api server, but will eventually receive the
// resource. Other is that simply there is no primary resource present for an event, this
// might indicate issue with the implementation, but could happen also naturally, thus
// might indicate an issue with the implementation, but could happen also naturally, thus
// this is not necessarily a problem.
log.debug("no primary resource found in cache with resource id: {}", resourceID);
}
Expand All @@ -174,17 +191,35 @@ private void submitReconciliationExecution(ResourceState state) {
}
}

private void handleEventMarking(Event event, ResourceState state) {
private Optional<P> getCachedResource(ResourceID resourceID, ResourceState<P> state) {
var resource = cache.get(resourceID);
if (resource.isPresent()) {
return resource;
}
if (controllerConfiguration.reconcileOnPrimaryDelete()
&& (state.deleteEventPresent() || state.deleteEventReconciliationSubmitted())) {
return Optional.of(state.getDeletedResource());
}
return Optional.empty();
}

@SuppressWarnings("unchecked")
private void handleEventMarking(Event event, ResourceState<P> state) {
final var relatedCustomResourceID = event.getRelatedCustomResourceID();
if (event instanceof ResourceEvent resourceEvent) {
if (resourceEvent.getAction() == ResourceAction.DELETED) {
log.debug("Marking delete event received for: {}", relatedCustomResourceID);
state.markDeleteEventReceived();
// todo check can there be delete event without resource?
state.markDeleteEventReceived((P) resourceEvent.getResource().orElseThrow());
} else {
if (state.processedMarkForDeletionPresent() && isResourceMarkedForDeletion(resourceEvent)) {
if (state.deleteEventReconciliationSubmitted()
|| (state.processedMarkForDeletionPresent()
&& isResourceMarkedForDeletion(resourceEvent))) {
log.debug(
"Skipping mark of event received, since already processed mark for deletion and"
+ " resource marked for deletion: {}",
"Skipping mark of event received, delete event reconciliation submitted ({}), or"
+ " marked for deletion but already processed mark. resource marked for deletion:"
+ " {}",
state.deleteEventReconciliationSubmitted(),
relatedCustomResourceID);
return;
}
Expand All @@ -196,18 +231,21 @@ private void handleEventMarking(Event event, ResourceState state) {
// event as below.
markEventReceived(state);
}
} else if (!state.deleteEventPresent() || !state.processedMarkForDeletionPresent()) {
} else if (!state.deleteEventPresent()
&& !state.processedMarkForDeletionPresent()
&& !state.deleteEventReconciliationSubmitted()) {
markEventReceived(state);
} else if (log.isDebugEnabled()) {
log.debug(
"Skipped marking event as received. Delete event present: {}, processed mark for"
+ " deletion: {}",
+ " deletion: {}, delete event reconciliation submitted: {}",
state.deleteEventPresent(),
state.processedMarkForDeletionPresent());
state.processedMarkForDeletionPresent(),
state.deleteEventReconciliationSubmitted());
}
}

private void markEventReceived(ResourceState state) {
private void markEventReceived(ResourceState<P> state) {
log.debug("Marking event received for: {}", state.getId());
state.markEventReceived();
}
Expand Down Expand Up @@ -251,7 +289,7 @@ synchronized void eventProcessingFinished(
}
cleanupOnSuccessfulExecution(executionScope);
metrics.finishedReconciliation(executionScope.getResource(), metricsMetadata);
if (state.deleteEventPresent()) {
if (state.deleteEventPresent() || state.deleteEventReconciliationSubmitted()) {
cleanupForDeletedEvent(executionScope.getResourceID());
} else if (postExecutionControl.isFinalizerRemoved()) {
state.markProcessedMarkForDeletion();
Expand Down Expand Up @@ -383,7 +421,7 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<P> executionScope) {
retryEventSource().cancelOnceSchedule(executionScope.getResourceID());
}

private ResourceState getOrInitRetryExecution(ExecutionScope<P> executionScope) {
private ResourceState<P> getOrInitRetryExecution(ExecutionScope<P> executionScope) {
final var state = resourceStateManager.getOrCreate(executionScope.getResourceID());
RetryExecution retryExecution = state.getRetry();
if (retryExecution == null) {
Expand All @@ -399,7 +437,7 @@ private void cleanupForDeletedEvent(ResourceID resourceID) {
metrics.cleanupDoneFor(resourceID, metricsMetadata);
}

private boolean isControllerUnderExecution(ResourceState state) {
private boolean isControllerUnderExecution(ResourceState<P> state) {
return state.isUnderProcessing();
}

Expand Down Expand Up @@ -443,10 +481,13 @@ private void handleAlreadyMarkedEvents() {
private class ReconcilerExecutor implements Runnable {
private final ExecutionScope<P> executionScope;
private final ResourceID resourceID;
private final P deleteEventResource;

private ReconcilerExecutor(ResourceID resourceID, ExecutionScope<P> executionScope) {
private ReconcilerExecutor(
ResourceID resourceID, ExecutionScope<P> executionScope, P deleteEventResource) {
this.executionScope = executionScope;
this.resourceID = resourceID;
this.deleteEventResource = deleteEventResource;
}

@Override
Expand All @@ -462,7 +503,9 @@ public void run() {
final var thread = Thread.currentThread();
final var name = thread.getName();
try {
var actualResource = cache.get(resourceID);

var actualResource =
deleteEventResource != null ? Optional.of(deleteEventResource) : cache.get(resourceID);
if (actualResource.isEmpty()) {
log.debug("Skipping execution; primary resource missing from cache: {}", resourceID);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ class ExecutionScope<R extends HasMetadata> {
// the latest custom resource from cache
private R resource;
private final RetryInfo retryInfo;
private boolean isPrimaryDeleted = false;

ExecutionScope(RetryInfo retryInfo) {
ExecutionScope(RetryInfo retryInfo, boolean isPrimaryDeleted) {
this.retryInfo = retryInfo;
this.isPrimaryDeleted = isPrimaryDeleted;
}

public ExecutionScope<R> setResource(R resource) {
Expand Down Expand Up @@ -42,4 +44,12 @@ public String toString() {
public RetryInfo getRetryInfo() {
return retryInfo;
}

public void setPrimaryDeleted(boolean primaryDeleted) {
isPrimaryDeleted = primaryDeleted;
}

public boolean isPrimaryDeleted() {
return isPrimaryDeleted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ private PostExecutionControl<P> handleDispatch(ExecutionScope<P> executionScope)
}

Context<P> context =
new DefaultContext<>(executionScope.getRetryInfo(), controller, resourceForExecution);
new DefaultContext<>(
executionScope.getRetryInfo(),
controller,
resourceForExecution,
executionScope.isPrimaryDeleted());
if (markedForDeletion) {
return handleCleanup(resourceForExecution, originalResource, context);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package io.javaoperatorsdk.operator.processing.event;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;

class ResourceState {
class ResourceState<P extends HasMetadata> {

/**
* Manages the state of received events. Basically there can be only three distinct states
Expand All @@ -21,6 +22,7 @@ private enum EventingState {
PROCESSED_MARK_FOR_DELETION,
/** Delete event present, from this point other events are not relevant */
DELETE_EVENT_PRESENT,
DELETE_EVENT_RECONCILIATION_SUBMITTED
}

private final ResourceID id;
Expand All @@ -30,6 +32,8 @@ private enum EventingState {
private EventingState eventing;
private RateLimitState rateLimit;

private P deletedResource;

public ResourceState(ResourceID id) {
this.id = id;
eventing = EventingState.NO_EVENT_PRESENT;
Expand Down Expand Up @@ -63,8 +67,13 @@ public void setUnderProcessing(boolean underProcessing) {
this.underProcessing = underProcessing;
}

public void markDeleteEventReceived() {
public void markDeleteEventReceived(P deletedResource) {
eventing = EventingState.DELETE_EVENT_PRESENT;
this.deletedResource = deletedResource;
}

public void markDeleteEventReconciliationSubmitted() {
this.eventing = EventingState.DELETE_EVENT_RECONCILIATION_SUBMITTED;
}

public boolean deleteEventPresent() {
Expand All @@ -75,8 +84,16 @@ public boolean processedMarkForDeletionPresent() {
return eventing == EventingState.PROCESSED_MARK_FOR_DELETION;
}

public boolean deleteEventReconciliationSubmitted() {
return eventing == EventingState.DELETE_EVENT_RECONCILIATION_SUBMITTED;
}

public P getDeletedResource() {
return deletedResource;
}

public void markEventReceived() {
if (deleteEventPresent()) {
if (deleteEventPresent() || deleteEventReconciliationSubmitted()) {
throw new IllegalStateException("Cannot receive event after a delete event received");
}
eventing = EventingState.EVENT_PRESENT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,27 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

class ResourceStateManager {
import io.fabric8.kubernetes.api.model.HasMetadata;

class ResourceStateManager<P extends HasMetadata> {
// maybe we should have a way for users to specify a hint on the amount of CRs their reconciler
// will process to avoid under- or over-sizing the state maps and avoid too many resizing that
// take time and memory?
private final Map<ResourceID, ResourceState> states = new ConcurrentHashMap<>(100);
private final Map<ResourceID, ResourceState<P>> states = new ConcurrentHashMap<>(100);

public ResourceState getOrCreate(ResourceID resourceID) {
public ResourceState<P> getOrCreate(ResourceID resourceID) {
return states.computeIfAbsent(resourceID, ResourceState::new);
}

public ResourceState remove(ResourceID resourceID) {
public ResourceState<P> remove(ResourceID resourceID) {
return states.remove(resourceID);
}

public boolean contains(ResourceID resourceID) {
return states.containsKey(resourceID);
}

public List<ResourceState> resourcesWithEventPresent() {
public List<ResourceState<P>> resourcesWithEventPresent() {
return states.values().stream()
.filter(state -> !state.noEventPresent())
.collect(Collectors.toList());
Expand Down
Loading