Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combined schedulers #3839

Open
wants to merge 83 commits into
base: main
Choose a base branch
from
Open

Combined schedulers #3839

wants to merge 83 commits into from

Conversation

jpbruinsslot
Copy link
Contributor

@jpbruinsslot jpbruinsslot commented Nov 13, 2024

Warning

Ready for review, not for merging

Changes

  • app.py: Removal of creation of organisational schedulers. We don't gather all available organisations from the katalogus and create individual boefje, report, and normalizer schedulers.
  • app.py: Removal of continuous checking of added or removed organisations. We removed the monitor_organisations method that was running in a thread to check if we added or removed organisation and create schedulers for it.
  • models/ : Task and Schedule get a specific organisation field. Since we don't differentiate tasks that are pushed on an organisation queue anymore, we need to be able to determine for what organisation an item on a queue is from.1
  • schedulers/: Since we've consolidated the organisational schedulers to 3 types of schedulers, and we assume that we get task creation from consolidated message queues, we need to update the schedulers to handle that.
    • BoefjeScheduler
      • retrieve scan profile mutations from one message queue
      • iterate over organisation to check for enables boefjes2
      • refactor of method names and exception handling
  • server/
    • Merger of queues/ and schedulers/ endpoint. The notion of a queue and scheduler are used interchangeably. Therefore it makes more sense at the moment to merge those endpoints to avoid confusion and simplify the code.
    • Support to pop off more than one Task from a scheduler. Task runner will now be able to batch pop tasks from a scheduler using filters.

Next steps and impact

Issue link

#3838

QA notes

The process of scanning tasks, rescheduling of tasks should remain the same from a users standpoint of view. Testing this would be the main focus point. Additionally since all organisational schedulers have been combined to one, particular emphasis should be made to check if users will only see, or are able to interact with tasks that are for the organisation that they are allowed to.


Code Checklist

  • All the commits in this PR are properly PGP-signed and verified.
  • This PR only contains functionality relevant to the issue.
  • I have written unit tests for the changes or fixes I made.
  • I have checked the documentation and made changes where necessary.
  • I have performed a self-review of my code and refactored it to the best of my abilities.
  • Tickets have been created for newly discovered issues.
  • For any non-trivial functionality, I have added integration and/or end-to-end tests.
  • I have informed others of any required .env changes files if required and changed the .env-dist accordingly.
  • I have included comments in the code to elaborate on what is not self-evident from the code itself, including references to issues and discussions online, or implicit behavior of an interface.

Checklist for code reviewers:

Copy-paste the checklist from the docs/source/templates folder into your comment.


Checklist for QA:

Copy-paste the checklist from the docs/source/templates folder into your comment.

Footnotes

  1. The attentive reader will know that the organisation information is present in the data specification in a Task , and can be filtered on by using a FilterRequest. However in practice the scheduler is mainly used in the context of multiple organisations and filtering on this id is deemed developer friendly when interfacing with the scheduler. Hence the choice explicitly defining the organisation field on a top level.

  2. as mentioned in the code this iterating over all organisations in order to check if there are enabled plugins is very inefficient, please refer to issue Katalogus caching in the scheduler #3357

@jpbruinsslot jpbruinsslot added the mula Issues related to the scheduler label Nov 13, 2024
@jpbruinsslot jpbruinsslot self-assigned this Nov 13, 2024
@jpbruinsslot jpbruinsslot force-pushed the poc/mula/combined-schedulers branch from 57d040f to da3f337 Compare November 18, 2024 13:29
@jpbruinsslot jpbruinsslot linked an issue Nov 21, 2024 that may be closed by this pull request
@jpbruinsslot jpbruinsslot changed the title POC: combined schedulers Combined schedulers Dec 2, 2024
* main: (64 commits)
  Bug fix: KAT-alogus parameter is now organization member instead of organization code (#3895)
  Remove sigrid workflows (#3920)
  Updated packages (#3898)
  Fix mula migrations Debian package (#3919)
  Adds loggers to report flow (#3872)
  Add additional check if task already run for report scheduler (#3900)
  Create separate finding for Microsoft RDP port (#3882)
  fix: 🐛 allow boefje completion with 404 (#3893)
  Feature/improve rename bulk modal (#3885)
  Update scheduler folder structure (#3883)
  Translations update from Hosted Weblate (#3870)
  Increase max number of PostgreSQL connections (#3889)
  Fix for task id as valid UUID (#3744)
  Add `auto_calculate_deadline` attribute to Scheduler (#3869)
  Ignore specific url parameters when following location headers (#3856)
  Let mailserver inherit l1 (#3704)
  Change plugins enabling in report flow to checkboxes (#3747)
  Fix rocky katalogus tests and delete unused fixtures (#3884)
  Enable/disable scheduled reports (#3871)
  optimize locking in katalogus.py, reuse available data (#3752)
  ...
with self.lock:
if scheduler_id not in self.schedulers:
return
# FIXME:: can be queries instead of a loop
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

@underdarknl underdarknl added this to the OpenKAT v1.19 milestone Jan 21, 2025
Comment on lines +82 to +85
response = self._session.post(f"/schedulers/{scheduler_id}/pop?limit=1")
self._verify_response(response)

return TypeAdapter(list[Queue]).validate_json(response.content)
page = TypeAdapter(PaginatedTasksResponse | None).validate_json(response.content)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the limit is hard coded to 1, do we really need a paginated response, especially when we then return only the first Task on line 89

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is specifically left in to make the new changes work with the current setup, of the task runner handling one task at a time. The endpoint now allows for batched tasks to be popped from the endpoint as was requested in feature requests for the scheduler. I made @Donnype aware of these changes and he will be thinking about the necessary changes to the task runner to leverage this new functionality.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, come to think about it I need to check whether pagination will work correctly with tasks being popped and set to dispatched correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I was thinking it might have been because of the multi-pop support. But, I'm guessing that would either introduce an extra param (limit?) or a new method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

popping and changing the status (for any number of taskst) should be handled in the same database-transaction I'd say. Hopefully relying on table locking to ensure a task is not popped twice simultaneously.

Comment on lines +79 to +80
response = TypeAdapter(PaginatedTasksResponse).validate_json(self.boefje_responses.pop(0))
p_item = response.results[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paginated responses when we are only dealing with single results.

schedule = Column(String, nullable=True)

tasks = relationship("TaskDB", back_populates="schedule")

deadline_at = Column(DateTime(timezone=True), nullable=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this (and created_at and modified_at not be set to func.utcnow() to make sure we don't rely on the possibly missing UTC flag on the connection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. For the deadline_at it is significant that it is possible to set it to None , setting the default on inserting a Schedule in the database has consequences on code that relies on this. This should remains as is.
  2. Since this a model definition that translates to a SQL migration the func.now() function generates the SQL NOW(), which retrieves the current timestamp from the database server, if the database server is configured to use UTC , NOW() will return the current time in UTC. As far as I know there isn't a func.utcnow() function in sqlalchemy. We can however in the migrations force the database server to use UTC as its default timezone with:
SET timezone = 'UTC';

alternatively, I think we can also might be able to set the time on using Python's datetime.now(timezone.utc)

modified_at = Column(
    DateTime(timezone=True),
    nullable=False,
    default=lambda: datetime.now(timezone.utc),
    onupdate=lambda: datetime.now(timezone.utc)
)

However this likely will be evaluated in Python instead of the database.

@ammar92 you have any particular thoughts about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Since this a model definition that translates to a SQL migration the func.now() function generates the SQL NOW(), which retrieves the current timestamp from the database server, if the database server is configured to use UTC , NOW() will return the current time in UTC. As far as I know there isn't a func.utcnow() function in sqlalchemy.

As far as I know the internally stored value for timestamp with time zone is always in UTC. So even if there was function like a UTCNOW(), it wouldn't matter for this non-naive date time type.

However this likely will be evaluated in Python instead of the database.

It's indeed evaluated in Python land, but the performance impact is negligible. While it's a good alternative, using SQLAlchemy's function is generally preferred and more common for this use case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hash: str | None = Field(None, max_length=32)

data: dict = Field(default_factory=dict)

created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also use func.utcnow() here.

# There should be an OOI in value
ooi = mutation.value
if ooi is None:
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather keep the try except for the ValidationError smaller and only around the actual validation code.

list(range(boefje.scan_level, 5)), # type: ignore
)

# Filter OOIs based on permission
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done in the remote API / query. Lets add a #TODO for this.

boefje: Boefje to run.
ooi: OOI to run Boefje on.
caller: The name of the function that called this function, used for logging.
# Boefje allowed to scan ooi?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done in the remote API / query. Lets add a #TODO for this.

scheduler_id=self.scheduler_id,
)
return True
def has_schedule_permission_to_run(self, schedule: models.Schedule) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we somehow move this to the original query that pops our schedules, instead of evaluating this in python time?

Comment on lines +50 to +51
filters.Filter(column="deadline_at", operator="lt", value=datetime.now(timezone.utc)),
filters.Filter(column="enabled", operator="eq", value=True),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the correct joins/filters here to limit this result set to only schedules that are allowed to produce a task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These filters should do that. Other evaluations of whether a task should be executed or not depend on the scheduler and subsequent cross-references checks to other services. E.g. for a boefje scheduler, does the ooi still exist, is the plugin still enabled, are we still allowed to scan, etc. These factors can't be evaluated by a database query.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can they be set on the update events of those changes though? Eg, disabling / removing tasks for boefjes that are being disabled seems like the correct way forward instead of keeping them around and checking them for an enabled boefje every time they could be executed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is clearly out of scope for this PR, but still worth investigating.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
mula Issues related to the scheduler
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Combine all schedulers for all organisations
5 participants