Skip to content
This repository has been archived by the owner on Apr 22, 2020. It is now read-only.

Commit

Permalink
Revert "Use worker cleanup task instead of direct jedis cleanup"
Browse files Browse the repository at this point in the history
  • Loading branch information
vetinari authored Jul 3, 2019
1 parent c19e7a3 commit b3357ed
Show file tree
Hide file tree
Showing 12 changed files with 23 additions and 199 deletions.
4 changes: 0 additions & 4 deletions src/main/java/de/zalando/zmon/scheduler/ng/CeleryBody.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ public static class TrialRunCeleryAlertArg {
public int priority = 1;
public String team;
}

public static class EntityCleanUpArg {
public String check_id;
}
}

/*
Expand Down
41 changes: 11 additions & 30 deletions src/main/java/de/zalando/zmon/scheduler/ng/CommandSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

import io.opentracing.Tracer;

import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;

/**
* Created by jmussler on 30.06.16.
Expand All @@ -21,43 +24,21 @@ public CommandSerializer(TaskSerializerType type, Tracer tracer) {
}

public String expiresTime(long interval) {
Date exp = new Date(System.currentTimeMillis() + (interval * 1000L));
Date exp = new Date(System.currentTimeMillis()+(interval * 1000L));
return LocalDateFormatter.get().format(exp);
}

public byte[] writeCleanUp(Set<String> removedIds) {
CeleryBody body = new CeleryBody();

body.task = "cleanup";
body.expires = "";

String uuid = UUID.randomUUID().toString();
body.id = "entity-CU:" + uuid;

body.kwargs.put("cleanup_entities", removedIds);

//For open-tracing
CeleryBody.EntityCleanUpArg command = new CeleryBody.EntityCleanUpArg();
command.check_id = "CU:" + uuid;
body.args.add(command);

body.timelimit.add(3600L);
body.timelimit.add(7200L);

return writer.asCeleryTask(body);
}

public byte[] writeTrialRun(Entity entity, TrialRunRequest request) {
CeleryBody body = new CeleryBody();

body.expires = expiresTime(request.interval); // "2015-12-31T00:00:00.000+00:00"
body.id = "check-TR:" + request.id + "-" + entity.getId() + "-" + System.currentTimeMillis();
body.id="check-TR:"+request.id+"-"+entity.getId()+"-"+System.currentTimeMillis();

body.timelimit.add(request.interval);
body.timelimit.add(request.interval * 2L);

CeleryBody.TrialRunCeleryCommand command = new CeleryBody.TrialRunCeleryCommand();
command.check_id = "TR:" + request.id;
command.check_id = "TR:"+request.id;
command.check_name = request.name;
command.interval = request.interval;
command.command = request.checkCommand;
Expand All @@ -75,7 +56,7 @@ public byte[] writeTrialRun(Entity entity, TrialRunRequest request) {
alertArg.condition = request.alertCondition;
alertArg.name = request.name;
alertArg.period = request.period;
if (alertArg.period == null) {
if(alertArg.period == null) {
alertArg.period = "";
}
alertArg.team = "TRIAL RUN";
Expand Down Expand Up @@ -104,13 +85,13 @@ public byte[] write(Entity entity, Check check, Collection<Alert> alerts, long s
command.interval = checkDef.getInterval();
command.command = checkDef.getCommand();
command.entity = entity.getProperties();
command.schedule_time = ((double) scheduledTime) / 1000.0;
command.schedule_time = ((double)scheduledTime) / 1000.0;
body.args.add(command);

List<CeleryBody.CeleryAlertArg> alertList = new ArrayList<>();
body.args.add(alertList);

for (Alert alert : alerts) {
for(Alert alert : alerts) {
CeleryBody.CeleryAlertArg alertArg = new CeleryBody.CeleryAlertArg();
AlertDefinition alertDef = alert.getAlertDefinition();

Expand All @@ -123,7 +104,7 @@ public byte[] write(Entity entity, Check check, Collection<Alert> alerts, long s
alertArg.priority = alertDef.getPriority();
alertArg.tags = alertDef.getTags();

if (alertArg.period == null) {
if(alertArg.period == null) {
alertArg.period = "";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,4 @@ public void notifyEntityRemove(EntityRepository repo, Entity e) {
public void notifyEntityChange(EntityRepository repo, Entity oldEntity, Entity newEntity) {

}

@Override
public void notifyBatchEntityRemove (EntityRepository repo, Set<String> removedEntities) {}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -74,9 +73,6 @@ public void notifyEntityRemove(EntityRepository repo, Entity e) {

}

@Override
public void notifyBatchEntityRemove (EntityRepository repo, Set<String> removedEntities) {}

protected void notifyEntityChangeNoWait(EntityRepository repo, Entity entityOld, Entity entityNew) {
executor.schedule(new EntityChangeCleanupTask(entityOld, entityNew), 0, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@

import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import de.zalando.zmon.scheduler.ng.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -20,18 +18,14 @@
import de.zalando.zmon.scheduler.ng.entities.Entity;
import de.zalando.zmon.scheduler.ng.entities.EntityChangeListener;
import de.zalando.zmon.scheduler.ng.entities.EntityRepository;
import org.springframework.beans.factory.annotation.Autowired;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
* Created by jmussler on 30.06.16.
*/
public class SingleEntityCleanup implements EntityChangeListener {

@Autowired
private Scheduler scheduler;
public class SingleEntityCleanup implements EntityChangeListener{

private final static Logger LOG = LoggerFactory.getLogger(SingleEntityCleanup.class);

Expand Down Expand Up @@ -62,9 +56,6 @@ public void notifyEntityRemove(EntityRepository repo, Entity e) {
executor.schedule(new EntityCleanupTask(e), 300, TimeUnit.SECONDS);
}

@Override
public void notifyBatchEntityRemove (EntityRepository repo, Set<String> removedEntities) {}

@Override
public void notifyEntityAdd(EntityRepository repo, Entity e) {

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package de.zalando.zmon.scheduler.ng.entities;

import java.util.Set;

/**
* Created by jmussler on 4/17/15.
*/
Expand All @@ -10,7 +8,5 @@ public interface EntityChangeListener {

void notifyEntityRemove(EntityRepository repo, Entity e);

void notifyBatchEntityRemove(EntityRepository repo, Set<String> removedEntities);

void notifyEntityAdd(EntityRepository repo, Entity e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ public class EntityRepository extends CachedRepository<String, EntityAdapterRegi
private final String skipField;
private final List<EntityChangeListener> changeListeners = new ArrayList<>();

//Need to differentiate between Global and Local schedulers
private final boolean isGlobal;

// Need this to write globally aware change listener
private Map<String, Entity> unfilteredEntities;

Expand All @@ -45,7 +42,6 @@ public class EntityRepository extends CachedRepository<String, EntityAdapterRegi
public EntityRepository(EntityAdapterRegistry registry, Tracer tracer) {
super(registry, tracer);

isGlobal = false;
skipField = null;

baseFilter = new ArrayList<>();
Expand All @@ -62,7 +58,6 @@ public EntityRepository(EntityAdapterRegistry registry, Tracer tracer) {
public EntityRepository(EntityAdapterRegistry registry, SchedulerConfig config, Tracer tracer) {
super(registry, tracer);

this.isGlobal = config.isEnableGlobalEntity();
this.skipField = config.getEntitySkipOnField();

this.redisHost = config.getRedisHost();
Expand Down Expand Up @@ -197,13 +192,6 @@ public synchronized void fill() {
}
}

//Handover cleanup to worker -- Skip for Global scheduler
if (!(isGlobal || removedIds.isEmpty())) {
for (EntityChangeListener l : currentListeners) {
l.notifyBatchEntityRemove(this, removedIds);
}
}

for (String k : changedFilterProperties) {
for (EntityChangeListener l : currentListeners) {
l.notifyEntityChange(this, oldUnfiltered.get(k), unfilteredEntities.get(k));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import redis.clients.jedis.Jedis;

import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -232,9 +236,4 @@ public void scheduleTrialRun(TrialRunRequest request) {
service.schedule(new TrialRunCleanupTask(request.id, schedulerConfig), 300, TimeUnit.SECONDS);
}
}

public void scheduleEntityCleanUp(Set<String> removedIds){
byte [] command = taskSerializer.writeCleanUp(removedIds);
queueSelector.execute(null, command);
}
}
25 changes: 2 additions & 23 deletions src/test/java/de/zalando/zmon/scheduler/ng/SchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
import org.junit.Test;
import org.mockito.Mockito;

import java.util.*;
import java.util.HashMap;
import java.util.Map;

import static java.util.Arrays.asList;
import static org.mockito.AdditionalMatchers.gt;
Expand Down Expand Up @@ -120,26 +121,4 @@ public void scheduleTrialRun() {
verify(queueSelector, never()).execute(eq(entityExcluded), any(), eq("zmon:queue:default"));
}

@Test
public void scheduleEntityCleanUp() {
final EntityRepository entityRepo = mock(EntityRepository.class);

//Entities to cleanup
Entity entity1 = new Entity("included-entity");
entity1.addProperty("type", "host");
Entity entity2 = new Entity("included-entity");
entity2.addProperty("type", "instance");

QueueSelector queueSelector = mock(QueueSelector.class);
SchedulerConfig config = new SchedulerConfig();
MetricRegistry metricRegistry = new MetricRegistry();
Scheduler scheduler = new Scheduler(null, null, entityRepo, queueSelector, config, metricRegistry, NoopTracerFactory.create());

Set<String> removedIds = new HashSet<>(Arrays.asList(entity1.getId(), entity2.getId()));

//Now schedule the cleanup
scheduler.scheduleEntityCleanUp(removedIds);

verify(queueSelector).execute( eq(null), any());
}
}
Loading

0 comments on commit b3357ed

Please sign in to comment.