Skip to content
This repository has been archived by the owner on Apr 22, 2020. It is now read-only.

Use worker cleanup task instead of direct jedis cleanup #89

Merged
merged 14 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/de/zalando/zmon/scheduler/ng/CeleryBody.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public static class TrialRunCeleryAlertArg {
public int priority = 1;
public String team;
}

public static class EntityCleanUpArg{
Copy link
Contributor

@alexkorotkikh alexkorotkikh Jan 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EntityCleanUpArg<space>{

public String check_id;
}
}

/*
Expand Down
41 changes: 30 additions & 11 deletions src/main/java/de/zalando/zmon/scheduler/ng/CommandSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@

import io.opentracing.Tracer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.*;

/**
* Created by jmussler on 30.06.16.
Expand All @@ -24,21 +21,43 @@ public CommandSerializer(TaskSerializerType type, Tracer tracer) {
}

public String expiresTime(long interval) {
Date exp = new Date(System.currentTimeMillis()+(interval * 1000L));
Date exp = new Date(System.currentTimeMillis() + (interval * 1000L));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's error-prone to have interval as long. E.g. if interval == Long.MAX_VALUE, what do you expect interval * 1000 to be equal to? I'd change interval to int and maybe rename it some way to clarify it's in seconds...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexkorotkikh this piece of code was previously there. Only formatting changes were introduced by me. I understand your point. However, I would solve this in another PR rather than using this PR to solve it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I was inattentive. Sorry about that.

return LocalDateFormatter.get().format(exp);
}

public byte[] writeCleanUp(Set<String> removedIds) {
CeleryBody body = new CeleryBody();

body.task = "cleanup";
body.expires = "";

String uuid = UUID.randomUUID().toString();
body.id = "entity-CU:" + uuid;

body.kwargs.put("cleanup_entities", removedIds);

//For open-tracing
CeleryBody.EntityCleanUpArg command = new CeleryBody.EntityCleanUpArg();
command.check_id = "CU:" + uuid;
body.args.add(command);

body.timelimit.add(3600L);
body.timelimit.add(7200L);

return writer.asCeleryTask(body);
}

public byte[] writeTrialRun(Entity entity, TrialRunRequest request) {
CeleryBody body = new CeleryBody();

body.expires = expiresTime(request.interval); // "2015-12-31T00:00:00.000+00:00"
body.id="check-TR:"+request.id+"-"+entity.getId()+"-"+System.currentTimeMillis();
body.id = "check-TR:" + request.id + "-" + entity.getId() + "-" + System.currentTimeMillis();

body.timelimit.add(request.interval);
body.timelimit.add(request.interval * 2L);

CeleryBody.TrialRunCeleryCommand command = new CeleryBody.TrialRunCeleryCommand();
command.check_id = "TR:"+request.id;
command.check_id = "TR:" + request.id;
command.check_name = request.name;
command.interval = request.interval;
command.command = request.checkCommand;
Expand All @@ -56,7 +75,7 @@ public byte[] writeTrialRun(Entity entity, TrialRunRequest request) {
alertArg.condition = request.alertCondition;
alertArg.name = request.name;
alertArg.period = request.period;
if(alertArg.period == null) {
if (alertArg.period == null) {
alertArg.period = "";
}
alertArg.team = "TRIAL RUN";
Expand Down Expand Up @@ -85,13 +104,13 @@ public byte[] write(Entity entity, Check check, Collection<Alert> alerts, long s
command.interval = checkDef.getInterval();
command.command = checkDef.getCommand();
command.entity = entity.getProperties();
command.schedule_time = ((double)scheduledTime) / 1000.0;
command.schedule_time = ((double) scheduledTime) / 1000.0;
body.args.add(command);

List<CeleryBody.CeleryAlertArg> alertList = new ArrayList<>();
body.args.add(alertList);

for(Alert alert : alerts) {
for (Alert alert : alerts) {
CeleryBody.CeleryAlertArg alertArg = new CeleryBody.CeleryAlertArg();
AlertDefinition alertDef = alert.getAlertDefinition();

Expand All @@ -104,7 +123,7 @@ public byte[] write(Entity entity, Check check, Collection<Alert> alerts, long s
alertArg.priority = alertDef.getPriority();
alertArg.tags = alertDef.getTags();

if(alertArg.period == null) {
if (alertArg.period == null) {
alertArg.period = "";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,7 @@ public void notifyEntityRemove(EntityRepository repo, Entity e) {
public void notifyEntityChange(EntityRepository repo, Entity oldEntity, Entity newEntity) {

}

@Override
public void notifyBatchEntityRemove (EntityRepository repo, Set<String> removedEntities) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package de.zalando.zmon.scheduler.ng.cleanup;

import de.zalando.zmon.scheduler.ng.entities.Entity;
import de.zalando.zmon.scheduler.ng.entities.EntityChangeListener;
import de.zalando.zmon.scheduler.ng.entities.EntityRepository;
import de.zalando.zmon.scheduler.ng.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;

public class BatchEntityCleanup implements EntityChangeListener {
private Scheduler scheduler;
private final static Logger LOG = LoggerFactory.getLogger(BatchEntityCleanup.class);


@Override
public void notifyEntityChange(EntityRepository repo, Entity entityOld, Entity entityNew) {

}

@Override
public void notifyEntityRemove(EntityRepository repo, Entity e) {
}

public BatchEntityCleanup(Scheduler scheduler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

constructor usually goes before methods in class declaration

this.scheduler = scheduler;
}

@Override
public void notifyBatchEntityRemove(EntityRepository repo, Set<String> removedEntities) {
scheduler.scheduleEntityCleanUp(removedEntities);
}

@Override
public void notifyEntityAdd(EntityRepository repo, Entity e) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -73,6 +74,9 @@ public void notifyEntityRemove(EntityRepository repo, Entity e) {

}

@Override
public void notifyBatchEntityRemove (EntityRepository repo, Set<String> removedEntities) {}

protected void notifyEntityChangeNoWait(EntityRepository repo, Entity entityOld, Entity entityNew) {
executor.schedule(new EntityChangeCleanupTask(entityOld, entityNew), 0, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import de.zalando.zmon.scheduler.ng.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -18,6 +20,7 @@
import de.zalando.zmon.scheduler.ng.entities.Entity;
import de.zalando.zmon.scheduler.ng.entities.EntityChangeListener;
import de.zalando.zmon.scheduler.ng.entities.EntityRepository;
import org.springframework.beans.factory.annotation.Autowired;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
Expand All @@ -27,6 +30,9 @@
*/
public class SingleEntityCleanup implements EntityChangeListener{
Copy link
Contributor

@alexkorotkikh alexkorotkikh Jan 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EntityChangeListener<space>{


@Autowired
private Scheduler scheduler;

private final static Logger LOG = LoggerFactory.getLogger(SingleEntityCleanup.class);

private final AlertRepository alertRepo;
Expand Down Expand Up @@ -56,6 +62,9 @@ public void notifyEntityRemove(EntityRepository repo, Entity e) {
executor.schedule(new EntityCleanupTask(e), 300, TimeUnit.SECONDS);
}

@Override
public void notifyBatchEntityRemove (EntityRepository repo, Set<String> removedEntities) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but it sounds semantically incorrect to have batch operation method in a class called SingleEntityCleanup. Did you think about introducing new Listener interface? Like BatchEntityChangeListener

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@Override
public void notifyEntityAdd(EntityRepository repo, Entity e) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.zalando.zmon.scheduler.ng.config;

import de.zalando.zmon.scheduler.ng.cleanup.BatchEntityCleanup;
import de.zalando.zmon.scheduler.ng.entities.EntityRepository;
import de.zalando.zmon.scheduler.ng.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class BatchEntityCleanupConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(BatchEntityCleanupConfiguration.class);

@Bean
BatchEntityCleanup getBatchEntityCleanup(Scheduler scheduler, EntityRepository entityRepository) {
LOG.info("Registering BatchEntityCleanup job");
BatchEntityCleanup cleanup = new BatchEntityCleanup(scheduler);
entityRepository.registerListener(cleanup);
return cleanup;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package de.zalando.zmon.scheduler.ng.entities;

import java.util.Set;

/**
* Created by jmussler on 4/17/15.
*/
Expand All @@ -8,5 +10,7 @@ public interface EntityChangeListener {

void notifyEntityRemove(EntityRepository repo, Entity e);

void notifyBatchEntityRemove(EntityRepository repo, Set<String> removedEntities);

void notifyEntityAdd(EntityRepository repo, Entity e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ public synchronized void fill() {
}
}

//Handover cleanup to worker
if (!removedIds.isEmpty()){
for (EntityChangeListener l : currentListeners) {
l.notifyBatchEntityRemove(this, removedIds);
}
}

for (String k : changedFilterProperties) {
for (EntityChangeListener l : currentListeners) {
l.notifyEntityChange(this, oldUnfiltered.get(k), unfilteredEntities.get(k));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -236,4 +232,9 @@ public void scheduleTrialRun(TrialRunRequest request) {
service.schedule(new TrialRunCleanupTask(request.id, schedulerConfig), 300, TimeUnit.SECONDS);
}
}

public void scheduleEntityCleanUp(Set<String> removedIds){
byte [] command = taskSerializer.writeCleanUp(removedIds);
queueSelector.execute(null, command);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package de.zalando.zmon.scheduler.ng.entities;

import com.codahale.metrics.MetricRegistry;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why were these imports added?

import de.zalando.zmon.scheduler.ng.config.SchedulerConfig;

import de.zalando.zmon.scheduler.ng.queue.QueueSelector;
import de.zalando.zmon.scheduler.ng.scheduler.Scheduler;
import io.opentracing.noop.NoopTracerFactory;
import org.junit.Test;
import org.mockito.Mockito;
Expand Down