-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduling.py
68 lines (53 loc) · 1.79 KB
/
scheduling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
"""
This module is used to delay the execution of any function. This is used to end giveaways.
"""
import asyncio
import config
import sys
import transaction
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.jobstores.base import JobLookupError
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime
channel_ids_grace_period = set()
jobstores = {
'default': SQLAlchemyJobStore(
url='sqlite:///' + config.SCHEDULER_DB_FILENAME)
}
_scheduler = None
def init_scheduler():
sys.stdout.write("Starting scheduler...")
global _scheduler
_scheduler = AsyncIOScheduler(jobstores=jobstores,
job_defaults={
'misfire_grace_time': None
}
)
_scheduler.start()
sys.stdout.write("done\n")
def delayed_execute(func, args, exec_time: datetime):
"""
Executes a function at a later date/time.
Attributes:
func (function): The function to be executed at a later date/time.
args (list): The list of arguments for the given function.
exec_time (datetime.datetime): The date/time at which the given
function gets executed.
Returns:
None
"""
_id = _scheduler.add_job(_execute_wrapper, 'date',
args=[func] + args, run_date=exec_time).id
return _id
# wrap function to include transaction.commit
async def _execute_wrapper(func, *args, **kwargs):
ret = func(*args, **kwargs)
if asyncio.iscoroutine(ret):
ret = await ret
transaction.commit()
return ret
def deschedule(job_id):
try:
_scheduler.remove_job(job_id)
except JobLookupError:
pass