diff --git a/src/main/java/de/zalando/zmon/scheduler/ng/CeleryBody.java b/src/main/java/de/zalando/zmon/scheduler/ng/CeleryBody.java index dd977ab..79f6439 100644 --- a/src/main/java/de/zalando/zmon/scheduler/ng/CeleryBody.java +++ b/src/main/java/de/zalando/zmon/scheduler/ng/CeleryBody.java @@ -65,6 +65,10 @@ public static class TrialRunCeleryAlertArg { public int priority = 1; public String team; } + + public static class EntityCleanUpArg { + public String check_id; + } } /* diff --git a/src/main/java/de/zalando/zmon/scheduler/ng/CommandSerializer.java b/src/main/java/de/zalando/zmon/scheduler/ng/CommandSerializer.java index 087a890..0ebe4f0 100644 --- a/src/main/java/de/zalando/zmon/scheduler/ng/CommandSerializer.java +++ b/src/main/java/de/zalando/zmon/scheduler/ng/CommandSerializer.java @@ -7,10 +7,7 @@ import io.opentracing.Tracer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.List; +import java.util.*; /** * Created by jmussler on 30.06.16. @@ -24,21 +21,43 @@ public CommandSerializer(TaskSerializerType type, Tracer tracer) { } public String expiresTime(long interval) { - Date exp = new Date(System.currentTimeMillis()+(interval * 1000L)); + Date exp = new Date(System.currentTimeMillis() + (interval * 1000L)); return LocalDateFormatter.get().format(exp); } + public byte[] writeCleanUp(Set removedIds) { + CeleryBody body = new CeleryBody(); + + body.task = "cleanup"; + body.expires = ""; + + String uuid = UUID.randomUUID().toString(); + body.id = "entity-CU:" + uuid; + + body.kwargs.put("cleanup_entities", removedIds); + + //For open-tracing + CeleryBody.EntityCleanUpArg command = new CeleryBody.EntityCleanUpArg(); + command.check_id = "CU:" + uuid; + body.args.add(command); + + body.timelimit.add(3600L); + body.timelimit.add(7200L); + + return writer.asCeleryTask(body); + } + public byte[] writeTrialRun(Entity entity, TrialRunRequest request) { CeleryBody body = new CeleryBody(); body.expires = expiresTime(request.interval); // "2015-12-31T00:00:00.000+00:00" - body.id="check-TR:"+request.id+"-"+entity.getId()+"-"+System.currentTimeMillis(); + body.id = "check-TR:" + request.id + "-" + entity.getId() + "-" + System.currentTimeMillis(); body.timelimit.add(request.interval); body.timelimit.add(request.interval * 2L); CeleryBody.TrialRunCeleryCommand command = new CeleryBody.TrialRunCeleryCommand(); - command.check_id = "TR:"+request.id; + command.check_id = "TR:" + request.id; command.check_name = request.name; command.interval = request.interval; command.command = request.checkCommand; @@ -56,7 +75,7 @@ public byte[] writeTrialRun(Entity entity, TrialRunRequest request) { alertArg.condition = request.alertCondition; alertArg.name = request.name; alertArg.period = request.period; - if(alertArg.period == null) { + if (alertArg.period == null) { alertArg.period = ""; } alertArg.team = "TRIAL RUN"; @@ -85,13 +104,13 @@ public byte[] write(Entity entity, Check check, Collection alerts, long s command.interval = checkDef.getInterval(); command.command = checkDef.getCommand(); command.entity = entity.getProperties(); - command.schedule_time = ((double)scheduledTime) / 1000.0; + command.schedule_time = ((double) scheduledTime) / 1000.0; body.args.add(command); List alertList = new ArrayList<>(); body.args.add(alertList); - for(Alert alert : alerts) { + for (Alert alert : alerts) { CeleryBody.CeleryAlertArg alertArg = new CeleryBody.CeleryAlertArg(); AlertDefinition alertDef = alert.getAlertDefinition(); @@ -104,7 +123,7 @@ public byte[] write(Entity entity, Check check, Collection alerts, long s alertArg.priority = alertDef.getPriority(); alertArg.tags = alertDef.getTags(); - if(alertArg.period == null) { + if (alertArg.period == null) { alertArg.period = ""; } diff --git a/src/main/java/de/zalando/zmon/scheduler/ng/DataCenterSubscriber.java b/src/main/java/de/zalando/zmon/scheduler/ng/DataCenterSubscriber.java index 5e7fcbb..f1fbb38 100644 --- a/src/main/java/de/zalando/zmon/scheduler/ng/DataCenterSubscriber.java +++ b/src/main/java/de/zalando/zmon/scheduler/ng/DataCenterSubscriber.java @@ -79,4 +79,7 @@ public void notifyEntityRemove(EntityRepository repo, Entity e) { public void notifyEntityChange(EntityRepository repo, Entity oldEntity, Entity newEntity) { } + + @Override + public void notifyBatchEntityRemove (EntityRepository repo, Set removedEntities) {} } diff --git a/src/main/java/de/zalando/zmon/scheduler/ng/cleanup/BatchEntityCleanup.java b/src/main/java/de/zalando/zmon/scheduler/ng/cleanup/BatchEntityCleanup.java new file mode 100644 index 0000000..1032868 --- /dev/null +++ b/src/main/java/de/zalando/zmon/scheduler/ng/cleanup/BatchEntityCleanup.java @@ -0,0 +1,39 @@ +package de.zalando.zmon.scheduler.ng.cleanup; + +import de.zalando.zmon.scheduler.ng.entities.Entity; +import de.zalando.zmon.scheduler.ng.entities.EntityChangeListener; +import de.zalando.zmon.scheduler.ng.entities.EntityRepository; +import de.zalando.zmon.scheduler.ng.scheduler.Scheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +public class BatchEntityCleanup implements EntityChangeListener { + private Scheduler scheduler; + private final static Logger LOG = LoggerFactory.getLogger(BatchEntityCleanup.class); + + public BatchEntityCleanup(Scheduler scheduler) { + this.scheduler = scheduler; + } + + @Override + public void notifyEntityChange(EntityRepository repo, Entity entityOld, Entity entityNew) { + + } + + @Override + public void notifyEntityRemove(EntityRepository repo, Entity e) { + } + + @Override + public void notifyBatchEntityRemove(EntityRepository repo, Set removedEntities) { + scheduler.scheduleEntityCleanUp(removedEntities); + LOG.info("Batch cleanup scheduled for {} entities", removedEntities.size()); + } + + @Override + public void notifyEntityAdd(EntityRepository repo, Entity e) { + + } +} diff --git a/src/main/java/de/zalando/zmon/scheduler/ng/cleanup/EntityChangedCleaner.java b/src/main/java/de/zalando/zmon/scheduler/ng/cleanup/EntityChangedCleaner.java index 8cb4cd0..a8ec95f 100644 --- a/src/main/java/de/zalando/zmon/scheduler/ng/cleanup/EntityChangedCleaner.java +++ b/src/main/java/de/zalando/zmon/scheduler/ng/cleanup/EntityChangedCleaner.java @@ -11,6 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -73,6 +74,9 @@ public void notifyEntityRemove(EntityRepository repo, Entity e) { } + @Override + public void notifyBatchEntityRemove (EntityRepository repo, Set removedEntities) {} + protected void notifyEntityChangeNoWait(EntityRepository repo, Entity entityOld, Entity entityNew) { executor.schedule(new EntityChangeCleanupTask(entityOld, entityNew), 0, TimeUnit.SECONDS); } diff --git a/src/main/java/de/zalando/zmon/scheduler/ng/cleanup/SingleEntityCleanup.java b/src/main/java/de/zalando/zmon/scheduler/ng/cleanup/SingleEntityCleanup.java index acec243..6152cb6 100644 --- a/src/main/java/de/zalando/zmon/scheduler/ng/cleanup/SingleEntityCleanup.java +++ b/src/main/java/de/zalando/zmon/scheduler/ng/cleanup/SingleEntityCleanup.java @@ -2,10 +2,12 @@ import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import de.zalando.zmon.scheduler.ng.scheduler.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +20,7 @@ import de.zalando.zmon.scheduler.ng.entities.Entity; import de.zalando.zmon.scheduler.ng.entities.EntityChangeListener; import de.zalando.zmon.scheduler.ng.entities.EntityRepository; +import org.springframework.beans.factory.annotation.Autowired; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; @@ -25,7 +28,10 @@ /** * Created by jmussler on 30.06.16. */ -public class SingleEntityCleanup implements EntityChangeListener{ +public class SingleEntityCleanup implements EntityChangeListener { + + @Autowired + private Scheduler scheduler; private final static Logger LOG = LoggerFactory.getLogger(SingleEntityCleanup.class); @@ -56,6 +62,9 @@ public void notifyEntityRemove(EntityRepository repo, Entity e) { executor.schedule(new EntityCleanupTask(e), 300, TimeUnit.SECONDS); } + @Override + public void notifyBatchEntityRemove (EntityRepository repo, Set removedEntities) {} + @Override public void notifyEntityAdd(EntityRepository repo, Entity e) { diff --git a/src/main/java/de/zalando/zmon/scheduler/ng/config/BatchEntityCleanupConfiguration.java b/src/main/java/de/zalando/zmon/scheduler/ng/config/BatchEntityCleanupConfiguration.java new file mode 100644 index 0000000..085e424 --- /dev/null +++ b/src/main/java/de/zalando/zmon/scheduler/ng/config/BatchEntityCleanupConfiguration.java @@ -0,0 +1,23 @@ +package de.zalando.zmon.scheduler.ng.config; + +import de.zalando.zmon.scheduler.ng.cleanup.BatchEntityCleanup; +import de.zalando.zmon.scheduler.ng.entities.EntityRepository; +import de.zalando.zmon.scheduler.ng.scheduler.Scheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Configuration +public class BatchEntityCleanupConfiguration { + private static final Logger LOG = LoggerFactory.getLogger(BatchEntityCleanupConfiguration.class); + + @Bean + BatchEntityCleanup getBatchEntityCleanup(Scheduler scheduler, EntityRepository entityRepository) { + LOG.info("Registering BatchEntityCleanup job"); + BatchEntityCleanup cleanup = new BatchEntityCleanup(scheduler); + entityRepository.registerListener(cleanup); + return cleanup; + } +} diff --git a/src/main/java/de/zalando/zmon/scheduler/ng/entities/EntityChangeListener.java b/src/main/java/de/zalando/zmon/scheduler/ng/entities/EntityChangeListener.java index 48bc4ec..78a9327 100644 --- a/src/main/java/de/zalando/zmon/scheduler/ng/entities/EntityChangeListener.java +++ b/src/main/java/de/zalando/zmon/scheduler/ng/entities/EntityChangeListener.java @@ -1,5 +1,7 @@ package de.zalando.zmon.scheduler.ng.entities; +import java.util.Set; + /** * Created by jmussler on 4/17/15. */ @@ -8,5 +10,7 @@ public interface EntityChangeListener { void notifyEntityRemove(EntityRepository repo, Entity e); + void notifyBatchEntityRemove(EntityRepository repo, Set removedEntities); + void notifyEntityAdd(EntityRepository repo, Entity e); } diff --git a/src/main/java/de/zalando/zmon/scheduler/ng/entities/EntityRepository.java b/src/main/java/de/zalando/zmon/scheduler/ng/entities/EntityRepository.java index 80348bd..72cc6fe 100644 --- a/src/main/java/de/zalando/zmon/scheduler/ng/entities/EntityRepository.java +++ b/src/main/java/de/zalando/zmon/scheduler/ng/entities/EntityRepository.java @@ -30,6 +30,9 @@ public class EntityRepository extends CachedRepository changeListeners = new ArrayList<>(); + //Need to differentiate between Global and Local schedulers + private final boolean isGlobal; + // Need this to write globally aware change listener private Map unfilteredEntities; @@ -42,6 +45,7 @@ public class EntityRepository extends CachedRepository(); @@ -58,6 +62,7 @@ public EntityRepository(EntityAdapterRegistry registry, Tracer tracer) { public EntityRepository(EntityAdapterRegistry registry, SchedulerConfig config, Tracer tracer) { super(registry, tracer); + this.isGlobal = config.isEnableGlobalEntity(); this.skipField = config.getEntitySkipOnField(); this.redisHost = config.getRedisHost(); @@ -192,6 +197,13 @@ public synchronized void fill() { } } + //Handover cleanup to worker -- Skip for Global scheduler + if (!(isGlobal || removedIds.isEmpty())) { + for (EntityChangeListener l : currentListeners) { + l.notifyBatchEntityRemove(this, removedIds); + } + } + for (String k : changedFilterProperties) { for (EntityChangeListener l : currentListeners) { l.notifyEntityChange(this, oldUnfiltered.get(k), unfilteredEntities.get(k)); diff --git a/src/main/java/de/zalando/zmon/scheduler/ng/scheduler/Scheduler.java b/src/main/java/de/zalando/zmon/scheduler/ng/scheduler/Scheduler.java index 1fe632c..4de4210 100644 --- a/src/main/java/de/zalando/zmon/scheduler/ng/scheduler/Scheduler.java +++ b/src/main/java/de/zalando/zmon/scheduler/ng/scheduler/Scheduler.java @@ -20,11 +20,7 @@ import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import redis.clients.jedis.Jedis; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -236,4 +232,9 @@ public void scheduleTrialRun(TrialRunRequest request) { service.schedule(new TrialRunCleanupTask(request.id, schedulerConfig), 300, TimeUnit.SECONDS); } } + + public void scheduleEntityCleanUp(Set removedIds){ + byte [] command = taskSerializer.writeCleanUp(removedIds); + queueSelector.execute(null, command); + } } diff --git a/src/test/java/de/zalando/zmon/scheduler/ng/SchedulerTest.java b/src/test/java/de/zalando/zmon/scheduler/ng/SchedulerTest.java index bfe7070..5368237 100644 --- a/src/test/java/de/zalando/zmon/scheduler/ng/SchedulerTest.java +++ b/src/test/java/de/zalando/zmon/scheduler/ng/SchedulerTest.java @@ -16,8 +16,7 @@ import org.junit.Test; import org.mockito.Mockito; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import static java.util.Arrays.asList; import static org.mockito.AdditionalMatchers.gt; @@ -121,4 +120,26 @@ public void scheduleTrialRun() { verify(queueSelector, never()).execute(eq(entityExcluded), any(), eq("zmon:queue:default")); } + @Test + public void scheduleEntityCleanUp() { + final EntityRepository entityRepo = mock(EntityRepository.class); + + //Entities to cleanup + Entity entity1 = new Entity("included-entity"); + entity1.addProperty("type", "host"); + Entity entity2 = new Entity("included-entity"); + entity2.addProperty("type", "instance"); + + QueueSelector queueSelector = mock(QueueSelector.class); + SchedulerConfig config = new SchedulerConfig(); + MetricRegistry metricRegistry = new MetricRegistry(); + Scheduler scheduler = new Scheduler(null, null, entityRepo, queueSelector, config, metricRegistry, NoopTracerFactory.create()); + + Set removedIds = new HashSet<>(Arrays.asList(entity1.getId(), entity2.getId())); + + //Now schedule the cleanup + scheduler.scheduleEntityCleanUp(removedIds); + + verify(queueSelector).execute( eq(null), any()); + } } diff --git a/src/test/java/de/zalando/zmon/scheduler/ng/entities/EntityRepositoryTest.java b/src/test/java/de/zalando/zmon/scheduler/ng/entities/EntityRepositoryTest.java index d0408d1..b5bbaed 100644 --- a/src/test/java/de/zalando/zmon/scheduler/ng/entities/EntityRepositoryTest.java +++ b/src/test/java/de/zalando/zmon/scheduler/ng/entities/EntityRepositoryTest.java @@ -1,11 +1,12 @@ package de.zalando.zmon.scheduler.ng.entities; import de.zalando.zmon.scheduler.ng.config.SchedulerConfig; - import io.opentracing.noop.NoopTracerFactory; import org.junit.Test; import org.mockito.Mockito; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import static java.util.Arrays.asList; @@ -18,7 +19,7 @@ public class EntityRepositoryTest { @Test - public void BaseFilterTest() { + public void baseFilterTest() { SchedulerConfig config = new SchedulerConfig(); config.setEntityBaseFilterStr("[{\"type\":\"host\"}]"); @@ -44,7 +45,7 @@ public void BaseFilterTest() { } @Test - public void TestNoChangeOnException() { + public void testNoChangeOnException() { SchedulerConfig config = new SchedulerConfig(); config.setEntityBaseFilterStr("[{\"type\":\"host\"}]"); @@ -78,7 +79,7 @@ public void TestNoChangeOnException() { } @Test - public void TestNotify() { + public void testNotify() { SchedulerConfig config = new SchedulerConfig(); config.setEntityBaseFilterStr("[{\"type\":\"host\"}]"); @@ -120,6 +121,42 @@ public void TestNotify() { verify(listener).notifyEntityAdd(eq(repository), eq(host2)); verify(listener).notifyEntityRemove(eq(repository), eq(host3)); + verify(listener, never()).notifyBatchEntityRemove(eq(repository), eq(new HashSet<>(Arrays.asList(host2.getId())))); verify(listener).notifyEntityChange(eq(repository), eq(host1), eq(host1_changed)); } + + @Test + public void testNotifyGlobalScheduler() { + SchedulerConfig config = new SchedulerConfig(); + config.setEntityBaseFilterStr("[{\"type\":\"host\"}]"); + //Marks it as a Global scheduler + config.setEnableGlobalEntity(true); + + EntityAdapterRegistry registry = Mockito.mock(EntityAdapterRegistry.class); + + Entity instance = new Entity("instance-1"); + instance.addProperty("type", "instance"); + + Entity host2 = new Entity("host-2"); + host2.addProperty("type", "host"); + + List entities = asList(instance, host2); + List entities2 = asList(instance); + + EntityAdapter adapter = Mockito.mock(EntityAdapter.class); + when(adapter.getCollection()).thenReturn(entities).thenReturn(entities2); + + when(registry.getSourceNames()).thenReturn(asList("entities")); + when(registry.get("entities")).thenReturn(adapter); + + EntityChangeListener listener = Mockito.mock(EntityChangeListener.class); + + EntityRepository repository = new EntityRepository(registry, config, NoopTracerFactory.create()); + repository.registerListener(listener); + + repository.fill(); + + verify(listener).notifyEntityRemove(eq(repository), eq(host2)); + verify(listener, never()).notifyBatchEntityRemove(eq(repository), eq(new HashSet<>(Arrays.asList(host2.getId())))); + } }