Skip to content
This repository has been archived by the owner on Apr 22, 2020. It is now read-only.

Use worker cleanup task instead of direct jedis cleanup #89

Merged
merged 14 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/de/zalando/zmon/scheduler/ng/CeleryBody.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public static class TrialRunCeleryAlertArg {
public int priority = 1;
public String team;
}

public static class EntityCleanUpArg {
public String check_id;
}
}

/*
Expand Down
41 changes: 30 additions & 11 deletions src/main/java/de/zalando/zmon/scheduler/ng/CommandSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@

import io.opentracing.Tracer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.*;

/**
* Created by jmussler on 30.06.16.
Expand All @@ -24,21 +21,43 @@ public CommandSerializer(TaskSerializerType type, Tracer tracer) {
}

public String expiresTime(long interval) {
Date exp = new Date(System.currentTimeMillis()+(interval * 1000L));
Date exp = new Date(System.currentTimeMillis() + (interval * 1000L));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's error-prone to have interval as long. E.g. if interval == Long.MAX_VALUE, what do you expect interval * 1000 to be equal to? I'd change interval to int and maybe rename it some way to clarify it's in seconds...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexkorotkikh this piece of code was previously there. Only formatting changes were introduced by me. I understand your point. However, I would solve this in another PR rather than using this PR to solve it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I was inattentive. Sorry about that.

return LocalDateFormatter.get().format(exp);
}

public byte[] writeCleanUp(Set<String> removedIds) {
CeleryBody body = new CeleryBody();

body.task = "cleanup";
body.expires = "";

String uuid = UUID.randomUUID().toString();
body.id = "entity-CU:" + uuid;

body.kwargs.put("cleanup_entities", removedIds);

//For open-tracing
CeleryBody.EntityCleanUpArg command = new CeleryBody.EntityCleanUpArg();
command.check_id = "CU:" + uuid;
body.args.add(command);

body.timelimit.add(3600L);
body.timelimit.add(7200L);

return writer.asCeleryTask(body);
}

public byte[] writeTrialRun(Entity entity, TrialRunRequest request) {
CeleryBody body = new CeleryBody();

body.expires = expiresTime(request.interval); // "2015-12-31T00:00:00.000+00:00"
body.id="check-TR:"+request.id+"-"+entity.getId()+"-"+System.currentTimeMillis();
body.id = "check-TR:" + request.id + "-" + entity.getId() + "-" + System.currentTimeMillis();

body.timelimit.add(request.interval);
body.timelimit.add(request.interval * 2L);

CeleryBody.TrialRunCeleryCommand command = new CeleryBody.TrialRunCeleryCommand();
command.check_id = "TR:"+request.id;
command.check_id = "TR:" + request.id;
command.check_name = request.name;
command.interval = request.interval;
command.command = request.checkCommand;
Expand All @@ -56,7 +75,7 @@ public byte[] writeTrialRun(Entity entity, TrialRunRequest request) {
alertArg.condition = request.alertCondition;
alertArg.name = request.name;
alertArg.period = request.period;
if(alertArg.period == null) {
if (alertArg.period == null) {
alertArg.period = "";
}
alertArg.team = "TRIAL RUN";
Expand Down Expand Up @@ -85,13 +104,13 @@ public byte[] write(Entity entity, Check check, Collection<Alert> alerts, long s
command.interval = checkDef.getInterval();
command.command = checkDef.getCommand();
command.entity = entity.getProperties();
command.schedule_time = ((double)scheduledTime) / 1000.0;
command.schedule_time = ((double) scheduledTime) / 1000.0;
body.args.add(command);

List<CeleryBody.CeleryAlertArg> alertList = new ArrayList<>();
body.args.add(alertList);

for(Alert alert : alerts) {
for (Alert alert : alerts) {
CeleryBody.CeleryAlertArg alertArg = new CeleryBody.CeleryAlertArg();
AlertDefinition alertDef = alert.getAlertDefinition();

Expand All @@ -104,7 +123,7 @@ public byte[] write(Entity entity, Check check, Collection<Alert> alerts, long s
alertArg.priority = alertDef.getPriority();
alertArg.tags = alertDef.getTags();

if(alertArg.period == null) {
if (alertArg.period == null) {
alertArg.period = "";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,7 @@ public void notifyEntityRemove(EntityRepository repo, Entity e) {
public void notifyEntityChange(EntityRepository repo, Entity oldEntity, Entity newEntity) {

}

@Override
public void notifyBatchEntityRemove (EntityRepository repo, Set<String> removedEntities) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package de.zalando.zmon.scheduler.ng.cleanup;

import de.zalando.zmon.scheduler.ng.entities.Entity;
import de.zalando.zmon.scheduler.ng.entities.EntityChangeListener;
import de.zalando.zmon.scheduler.ng.entities.EntityRepository;
import de.zalando.zmon.scheduler.ng.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;

public class BatchEntityCleanup implements EntityChangeListener {
private Scheduler scheduler;
private final static Logger LOG = LoggerFactory.getLogger(BatchEntityCleanup.class);

public BatchEntityCleanup(Scheduler scheduler) {
this.scheduler = scheduler;
}

@Override
public void notifyEntityChange(EntityRepository repo, Entity entityOld, Entity entityNew) {

}

@Override
public void notifyEntityRemove(EntityRepository repo, Entity e) {
}

@Override
public void notifyBatchEntityRemove(EntityRepository repo, Set<String> removedEntities) {
scheduler.scheduleEntityCleanUp(removedEntities);
LOG.info("Batch cleanup scheduled for {} entities", removedEntities.size());
}

@Override
public void notifyEntityAdd(EntityRepository repo, Entity e) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -73,6 +74,9 @@ public void notifyEntityRemove(EntityRepository repo, Entity e) {

}

@Override
public void notifyBatchEntityRemove (EntityRepository repo, Set<String> removedEntities) {}

protected void notifyEntityChangeNoWait(EntityRepository repo, Entity entityOld, Entity entityNew) {
executor.schedule(new EntityChangeCleanupTask(entityOld, entityNew), 0, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import de.zalando.zmon.scheduler.ng.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -18,14 +20,18 @@
import de.zalando.zmon.scheduler.ng.entities.Entity;
import de.zalando.zmon.scheduler.ng.entities.EntityChangeListener;
import de.zalando.zmon.scheduler.ng.entities.EntityRepository;
import org.springframework.beans.factory.annotation.Autowired;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
* Created by jmussler on 30.06.16.
*/
public class SingleEntityCleanup implements EntityChangeListener{
public class SingleEntityCleanup implements EntityChangeListener {

@Autowired
private Scheduler scheduler;

private final static Logger LOG = LoggerFactory.getLogger(SingleEntityCleanup.class);

Expand Down Expand Up @@ -56,6 +62,9 @@ public void notifyEntityRemove(EntityRepository repo, Entity e) {
executor.schedule(new EntityCleanupTask(e), 300, TimeUnit.SECONDS);
}

@Override
public void notifyBatchEntityRemove (EntityRepository repo, Set<String> removedEntities) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but it sounds semantically incorrect to have batch operation method in a class called SingleEntityCleanup. Did you think about introducing new Listener interface? Like BatchEntityChangeListener

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@Override
public void notifyEntityAdd(EntityRepository repo, Entity e) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.zalando.zmon.scheduler.ng.config;

import de.zalando.zmon.scheduler.ng.cleanup.BatchEntityCleanup;
import de.zalando.zmon.scheduler.ng.entities.EntityRepository;
import de.zalando.zmon.scheduler.ng.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class BatchEntityCleanupConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(BatchEntityCleanupConfiguration.class);

@Bean
BatchEntityCleanup getBatchEntityCleanup(Scheduler scheduler, EntityRepository entityRepository) {
LOG.info("Registering BatchEntityCleanup job");
BatchEntityCleanup cleanup = new BatchEntityCleanup(scheduler);
entityRepository.registerListener(cleanup);
return cleanup;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package de.zalando.zmon.scheduler.ng.entities;

import java.util.Set;

/**
* Created by jmussler on 4/17/15.
*/
Expand All @@ -8,5 +10,7 @@ public interface EntityChangeListener {

void notifyEntityRemove(EntityRepository repo, Entity e);

void notifyBatchEntityRemove(EntityRepository repo, Set<String> removedEntities);

void notifyEntityAdd(EntityRepository repo, Entity e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class EntityRepository extends CachedRepository<String, EntityAdapterRegi
private final String skipField;
private final List<EntityChangeListener> changeListeners = new ArrayList<>();

//Need to differentiate between Global and Local schedulers
private final boolean isGlobal;

// Need this to write globally aware change listener
private Map<String, Entity> unfilteredEntities;

Expand All @@ -42,6 +45,7 @@ public class EntityRepository extends CachedRepository<String, EntityAdapterRegi
public EntityRepository(EntityAdapterRegistry registry, Tracer tracer) {
super(registry, tracer);

isGlobal = false;
skipField = null;

baseFilter = new ArrayList<>();
Expand All @@ -58,6 +62,7 @@ public EntityRepository(EntityAdapterRegistry registry, Tracer tracer) {
public EntityRepository(EntityAdapterRegistry registry, SchedulerConfig config, Tracer tracer) {
super(registry, tracer);

this.isGlobal = config.isEnableGlobalEntity();
this.skipField = config.getEntitySkipOnField();

this.redisHost = config.getRedisHost();
Expand Down Expand Up @@ -192,6 +197,13 @@ public synchronized void fill() {
}
}

//Handover cleanup to worker -- Skip for Global scheduler
if (!(isGlobal || removedIds.isEmpty())) {
for (EntityChangeListener l : currentListeners) {
l.notifyBatchEntityRemove(this, removedIds);
}
}

for (String k : changedFilterProperties) {
for (EntityChangeListener l : currentListeners) {
l.notifyEntityChange(this, oldUnfiltered.get(k), unfilteredEntities.get(k));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -236,4 +232,9 @@ public void scheduleTrialRun(TrialRunRequest request) {
service.schedule(new TrialRunCleanupTask(request.id, schedulerConfig), 300, TimeUnit.SECONDS);
}
}

public void scheduleEntityCleanUp(Set<String> removedIds){
byte [] command = taskSerializer.writeCleanUp(removedIds);
queueSelector.execute(null, command);
}
}
25 changes: 23 additions & 2 deletions src/test/java/de/zalando/zmon/scheduler/ng/SchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
import org.junit.Test;
import org.mockito.Mockito;

import java.util.HashMap;
import java.util.Map;
import java.util.*;

import static java.util.Arrays.asList;
import static org.mockito.AdditionalMatchers.gt;
Expand Down Expand Up @@ -121,4 +120,26 @@ public void scheduleTrialRun() {
verify(queueSelector, never()).execute(eq(entityExcluded), any(), eq("zmon:queue:default"));
}

@Test
public void scheduleEntityCleanUp() {
final EntityRepository entityRepo = mock(EntityRepository.class);

//Entities to cleanup
Entity entity1 = new Entity("included-entity");
entity1.addProperty("type", "host");
Entity entity2 = new Entity("included-entity");
entity2.addProperty("type", "instance");

QueueSelector queueSelector = mock(QueueSelector.class);
SchedulerConfig config = new SchedulerConfig();
MetricRegistry metricRegistry = new MetricRegistry();
Scheduler scheduler = new Scheduler(null, null, entityRepo, queueSelector, config, metricRegistry, NoopTracerFactory.create());

Set<String> removedIds = new HashSet<>(Arrays.asList(entity1.getId(), entity2.getId()));

//Now schedule the cleanup
scheduler.scheduleEntityCleanUp(removedIds);

verify(queueSelector).execute( eq(null), any());
}
}
Loading