Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: job cluster synchronization to avoid running the same job twice #16

Open
newnewcoder opened this issue May 14, 2022 · 9 comments

Comments

@newnewcoder
Copy link

Is it possible to add a Scheduler Interface for users to implement it?

@amanteaux
Copy link
Member

Hi @newnewcoder,

Sorry for the late answer, can you detail your use case?
The Scheduler class is the root of the Wisp Scheduler, making another implementation would mean that nothing from Wisp is used (except the configuration files).

@newnewcoder
Copy link
Author

In my case, I need to run the scheduler application separately on two nodes under loadbalancer, but with the same job/scheduling state, any suggestions?

@amanteaux
Copy link
Member

I have a doubt, do you want to execute the same jobs on both nodes, or do you want to load balance your jobs to your node instances (so if you have 2 jobs, you want to execute one job on node 1 and the other on node 2)?

@newnewcoder
Copy link
Author

Neither the case. In fact, I use Wisp in a web project, which is under load balancer. And I don't want the same job runs twice.

@amanteaux
Copy link
Member

Ok I see!

The "easy" solution is to have a dedicated node that run jobs.

But that is actually a good idea to provide in the future a way to provide job cluster synchronization. We might provide another Wisp library for this specific purpose!

@amanteaux amanteaux changed the title request for Scheduler Feature request: job cluster synchronization to avoid running the same job twice Sep 15, 2022
@amanteaux amanteaux added this to the 3.0.0 milestone Sep 15, 2022
@amanteaux
Copy link
Member

One external service must be used to provide the cluster synchronization feature.

For a first version, a database service will be used for that. Maybe in the future other implementations will be provided to connect to other services.

@RobbiewOnline
Copy link

I've implemented this already in my own code, but thought I'd share my strategy as it may help you decide on an implementation.

In my case any one of my nodes could go un-healthy, or be removed from the cluster because of scaling down, so the node that runs the task can change between each invocation.

Here's a use-case... Every cron period (e.g. 2am every morning) the cron would wake up the Scheduler on every node (x 10).

Within my implementation (which needs refining to be perfect) each node will see how long ago it was since a node elected to perform the work and if it was more than 10s ago then they write to the database to say that they're alive and ready to run the task.

Once this election process has started the task sleeps for 5s (very generous, I use MongoDB and wasn't 100% sure how long it would take after writing to ensure it's written in the database and propagated to the other nodes. My MongoStorage.setOrUpdateConfigByNameAndSyncWithSqs method is what's used to persist this value which is cached in memory AND send also messages to the other nodes using AWS SQS (Simple Queue Service). This will be received by the other nodes to invalidate their cache, forcing them to read the database rather than the cache when next queries (in <5s time).

After this generous sleep, if the hostname of the machine that's running the task is still the same as the hostname persisted in the database then only that host will run the task, the others will exit like a no-op.

The code for my implementation is below, feel free to use, ignore or pilfer any good bits.

    public static void registerRunnableCron(
            String threadName,
            String cron,
            boolean limitToOneNode,
            Runnable tick
    ) {

        CronExpressionSchedule cronSchedule = CronExpressionSchedule.parse(cron);

        scheduler.schedule(threadName, new Runnable() {
            @Override
            public void run() {
                try {
                    if (limitToOneNode) {
                        final Date NOW = new Date();
                        final String myHostname = getHostname();
                        boolean iVoted = false;

                        Date lastSetDate = MongoStorage.getConfigAsDate(threadName + "-cron-date", false);
                        String lastSetHostName = MongoStorage.getConfigAsString(threadName + "-cron-hostname", false);

                        if (lastSetDate == null || lastSetHostName == null
                                || (NOW.getTime() - lastSetDate.getTime()) > 10000) {

                            // If not previous set or more than 10s ago then vote to do the work
                            MongoStorage.setOrUpdateConfigByNameAndSyncWithSqs(threadName + "-cron-hostname", myHostname);
                            MongoStorage.setOrUpdateConfigByNameAndSyncWithSqs(threadName + "-cron-date", NOW);

                            Log.info("CRON " + threadName + " on host:" + myHostname + " I've voted to take on this task");
                            iVoted = true;
                        }

                        try {
                            // Wait 5s enough time to propagate vote to all servers
                            Thread.sleep(5000);
                        } catch (InterruptedException ex) {
                            Log.error(true, this, "Sleep interrupted");
                        }

                        // Check to see whether the hostname in the DB is myself - 
                        // if it is then do the work otherwise ignore it
                        lastSetHostName = MongoStorage.getConfigAsString(threadName + "-cron-hostname", false);

                        if (myHostname.equals(lastSetHostName)) {
                            Log.info("CRON " + threadName + " on host:" + myHostname + " I'm responsible for this task, invoking tick ...");
                            tick.run();
                        } else {
                            if (iVoted) {
                                Log.info("CRON " + threadName + " on host:" + myHostname + " I lost the vote for this task, won by host:" + lastSetHostName);
                            }
                        }
                    } else {
                        tick.run();
                    }
                } catch (Exception e) {
                    Log.error(false, this, threadName + " error during tick reason:" + e.getMessage(), e);
                }
            }
        }, cronSchedule);
    }

I do plan to improve the algorithm slightly...

  • I should randomly stagger the election so that they don't all try to read the database and all decide that it's stale and subsequently all elect, this way only one or two might elect and then the other 8 would read the database after a small delay and determine a node has already been elected recently and not bother to.
  • Reduce the sleep time to be a maximum round-trip to receive the SQS message and re-read the database (probably 1s should be plenty)
  • Remove the logging once confident all is good
  • Possibly don't use my cached in-memory database (which will remove the dependency on SQS) and instead read, write and re-read to verify the hostname is me and hasn't changed
  • The currently elected node should periodically write to say when it was last alive (not necessarily running any specific task) which would mean each node wouldn't need to vote if the watchdog time is still within a short period.

@amanteaux
Copy link
Member

@devology-rob Wow thank you for sharing this!

On my side, I thought about making each node try to run an update query, and then, the only node who will succeed the update query would then execute the job. The update SQL query is something like update wisp_cluster set last_execution_time = expected_execution_time where job_name = name AND last_execution_time < expected_execution_time)
So it works only for jobs that are executed using a cron expression or using a fixed time executing, but I guess for this specific use case of cluster synchronization, it is ok.
For now the skeleton of this code is contained in: master...v3#diff-f7a9606d334f9a338196e5eab50dbd43daa1601e89b8e7caa4bb71bf3c4549e4
Feel free to add any suggestion!

I keep your implementation in mind though, it might be useful for more complex cases!

@amanteaux
Copy link
Member

This feature actually requires some work, especially because querying the database may bring dependencies. Ideally this would be implemented using only a java.sql.Connection provider and raw JDBC. This way, this can be used in any framework without bringing new dependencies.

I do not have a lot of time to work on this lately, but if someone wants to start implementing something from the v3 branch that introduces more modularization, I would be happy to review it (if possible, it would be great to make intermediate PRs, to validate the work little by little).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants