-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Combined schedulers #3839
base: main
Are you sure you want to change the base?
Combined schedulers #3839
Conversation
57d040f
to
da3f337
Compare
* main: (64 commits) Bug fix: KAT-alogus parameter is now organization member instead of organization code (#3895) Remove sigrid workflows (#3920) Updated packages (#3898) Fix mula migrations Debian package (#3919) Adds loggers to report flow (#3872) Add additional check if task already run for report scheduler (#3900) Create separate finding for Microsoft RDP port (#3882) fix: 🐛 allow boefje completion with 404 (#3893) Feature/improve rename bulk modal (#3885) Update scheduler folder structure (#3883) Translations update from Hosted Weblate (#3870) Increase max number of PostgreSQL connections (#3889) Fix for task id as valid UUID (#3744) Add `auto_calculate_deadline` attribute to Scheduler (#3869) Ignore specific url parameters when following location headers (#3856) Let mailserver inherit l1 (#3704) Change plugins enabling in report flow to checkboxes (#3747) Fix rocky katalogus tests and delete unused fixtures (#3884) Enable/disable scheduled reports (#3871) optimize locking in katalogus.py, reuse available data (#3752) ...
mula/scheduler/app.py
Outdated
with self.lock: | ||
if scheduler_id not in self.schedulers: | ||
return | ||
# FIXME:: can be queries instead of a loop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO
response = self._session.post(f"/schedulers/{scheduler_id}/pop?limit=1") | ||
self._verify_response(response) | ||
|
||
return TypeAdapter(list[Queue]).validate_json(response.content) | ||
page = TypeAdapter(PaginatedTasksResponse | None).validate_json(response.content) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the limit is hard coded to 1, do we really need a paginated response, especially when we then return only the first Task on line 89
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is specifically left in to make the new changes work with the current setup, of the task runner handling one task at a time. The endpoint now allows for batched tasks to be popped from the endpoint as was requested in feature requests for the scheduler. I made @Donnype aware of these changes and he will be thinking about the necessary changes to the task runner to leverage this new functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, come to think about it I need to check whether pagination will work correctly with tasks being popped and set to dispatched correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I was thinking it might have been because of the multi-pop support. But, I'm guessing that would either introduce an extra param (limit?) or a new method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
popping and changing the status (for any number of taskst) should be handled in the same database-transaction I'd say. Hopefully relying on table locking to ensure a task is not popped twice simultaneously.
response = TypeAdapter(PaginatedTasksResponse).validate_json(self.boefje_responses.pop(0)) | ||
p_item = response.results[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
paginated responses when we are only dealing with single results.
schedule = Column(String, nullable=True) | ||
|
||
tasks = relationship("TaskDB", back_populates="schedule") | ||
|
||
deadline_at = Column(DateTime(timezone=True), nullable=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this (and created_at and modified_at not be set to func.utcnow() to make sure we don't rely on the possibly missing UTC flag on the connection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- For the
deadline_at
it is significant that it is possible to set it toNone
, setting the default on inserting aSchedule
in the database has consequences on code that relies on this. This should remains as is. - Since this a model definition that translates to a SQL migration the
func.now()
function generates the SQLNOW()
, which retrieves the current timestamp from the database server, if the database server is configured to use UTC ,NOW()
will return the current time in UTC. As far as I know there isn't afunc.utcnow()
function in sqlalchemy. We can however in the migrations force the database server to use UTC as its default timezone with:
SET timezone = 'UTC';
alternatively, I think we can also might be able to set the time on using Python's datetime.now(timezone.utc)
modified_at = Column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc)
)
However this likely will be evaluated in Python instead of the database.
@ammar92 you have any particular thoughts about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Since this a model definition that translates to a SQL migration the
func.now()
function generates the SQLNOW()
, which retrieves the current timestamp from the database server, if the database server is configured to use UTC ,NOW()
will return the current time in UTC. As far as I know there isn't afunc.utcnow()
function in sqlalchemy.
As far as I know the internally stored value for timestamp with time zone
is always in UTC. So even if there was function like a UTCNOW()
, it wouldn't matter for this non-naive date time type.
However this likely will be evaluated in Python instead of the database.
It's indeed evaluated in Python land, but the performance impact is negligible. While it's a good alternative, using SQLAlchemy's function is generally preferred and more common for this use case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, sorry, I missed that it was an example:
https://docs.sqlalchemy.org/en/20/core/compiler.html#utc-timestamp-function
hash: str | None = Field(None, max_length=32) | ||
|
||
data: dict = Field(default_factory=dict) | ||
|
||
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also use func.utcnow() here.
# There should be an OOI in value | ||
ooi = mutation.value | ||
if ooi is None: | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather keep the try except for the ValidationError smaller and only around the actual validation code.
list(range(boefje.scan_level, 5)), # type: ignore | ||
) | ||
|
||
# Filter OOIs based on permission |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be done in the remote API / query. Lets add a #TODO for this.
boefje: Boefje to run. | ||
ooi: OOI to run Boefje on. | ||
caller: The name of the function that called this function, used for logging. | ||
# Boefje allowed to scan ooi? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be done in the remote API / query. Lets add a #TODO for this.
scheduler_id=self.scheduler_id, | ||
) | ||
return True | ||
def has_schedule_permission_to_run(self, schedule: models.Schedule) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we somehow move this to the original query that pops our schedules, instead of evaluating this in python time?
filters.Filter(column="deadline_at", operator="lt", value=datetime.now(timezone.utc)), | ||
filters.Filter(column="enabled", operator="eq", value=True), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add the correct joins/filters here to limit this result set to only schedules that are allowed to produce a task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These filters should do that. Other evaluations of whether a task should be executed or not depend on the scheduler and subsequent cross-references checks to other services. E.g. for a boefje scheduler, does the ooi still exist, is the plugin still enabled, are we still allowed to scan, etc. These factors can't be evaluated by a database query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can they be set on the update events of those changes though? Eg, disabling / removing tasks for boefjes that are being disabled seems like the correct way forward instead of keeping them around and checking them for an enabled boefje every time they could be executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which is clearly out of scope for this PR, but still worth investigating.
Warning
Ready for review, not for merging
Changes
app.py
: Removal of creation of organisational schedulers. We don't gather all available organisations from the katalogus and create individual boefje, report, and normalizer schedulers.app.py
: Removal of continuous checking of added or removed organisations. We removed themonitor_organisations
method that was running in a thread to check if we added or removed organisation and create schedulers for it.models/
:Task
andSchedule
get a specificorganisation
field. Since we don't differentiate tasks that are pushed on an organisation queue anymore, we need to be able to determine for what organisation an item on a queue is from.1schedulers/
: Since we've consolidated the organisational schedulers to 3 types of schedulers, and we assume that we get task creation from consolidated message queues, we need to update the schedulers to handle that.BoefjeScheduler
server/
queues/
andschedulers/
endpoint. The notion of a queue and scheduler are used interchangeably. Therefore it makes more sense at the moment to merge those endpoints to avoid confusion and simplify the code.Task
from a scheduler. Task runner will now be able to batch pop tasks from a scheduler using filters.Next steps and impact
/queues
to/schedulers/{id}/pop
, additionally it will return a paginated result instead of a singleTask
, this is because the pop endpoint now supports filtering with multiple tasks returns. Services that rely on the scheduler pop endpoint need to update their interfaces (Update services that rely on/pop
endpoint of scheduler #3961)/queues
toschedulers/{id}/push
, services that interface with the push endpoints (rocky) need to update their interfaces. (Update services that rely on/push
endpoint of scheduler #3962)organisation
fields are added toTask
,Schedule
, services using these models need to update their specifications. (Model definition updates: addition oforganisation
field toTask
andSchedule
models #3965)Issue link
#3838
QA notes
The process of scanning tasks, rescheduling of tasks should remain the same from a users standpoint of view. Testing this would be the main focus point. Additionally since all organisational schedulers have been combined to one, particular emphasis should be made to check if users will only see, or are able to interact with tasks that are for the organisation that they are allowed to.
Code Checklist
.env
changes files if required and changed the.env-dist
accordingly.Checklist for code reviewers:
Copy-paste the checklist from the docs/source/templates folder into your comment.
Checklist for QA:
Copy-paste the checklist from the docs/source/templates folder into your comment.
Footnotes
The attentive reader will know that the organisation information is present in the
data
specification in aTask
, and can be filtered on by using aFilterRequest
. However in practice the scheduler is mainly used in the context of multiple organisations and filtering on this id is deemed developer friendly when interfacing with the scheduler. Hence the choice explicitly defining theorganisation
field on a top level. ↩as mentioned in the code this iterating over all organisations in order to check if there are enabled plugins is very inefficient, please refer to issue Katalogus caching in the scheduler #3357 ↩