Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaner control loop for worker.py possible with psycopg>=3.2.0 #6297

Open
AlanCoding opened this issue Feb 24, 2025 · 1 comment
Open

Cleaner control loop for worker.py possible with psycopg>=3.2.0 #6297

AlanCoding opened this issue Feb 24, 2025 · 1 comment

Comments

@AlanCoding
Copy link

Background

Because pulpcore uses pg_notify for "wakeup" messages its tasking system, it has the same shared problem with AWX, which also triggers tasks via pg_notify. Up until this point, psycopg, the postgres client library that Django uses, has offered extremely poor options for the pattern of listen-or-timeout in synchronous python code.

Both used select.select. pulpcore does some other stuff with os.pipe that I can't claim to fully understand.

What changed?

Working on this problem in ansible/dispatcher#89 (subtask involved in externalizing AWX tasking system), I wanted to solve it the right way, leading me to...

psycopg/psycopg@f88dcbc

Essentially, the difficulty of doing this was recognized upstream and fixed in 3.2.0 with some additional options.

Proposal

General rubric for how I think this would be used:

diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py
index d3829c3b5..8d343b984 100644
--- a/pulpcore/tasking/worker.py
+++ b/pulpcore/tasking/worker.py
@@ -367,14 +367,11 @@ class PulpcoreWorker:
 
         _logger.debug(_("Worker %s entering sleep state."), self.name)
         while not self.shutdown_requested and not self.wakeup:
-            r, w, x = select.select(
-                [self.sentinel, connection.connection], [], [], self.heartbeat_period.seconds
-            )
-            self.beat()
-            if connection.connection in r:
+            for _ in connection.connection.notifies(timeout=self.heartbeat_period.seconds, stop_after=1):
+                break
+            else:
                 connection.connection.execute("SELECT 1")
-            if self.sentinel in r:
-                os.read(self.sentinel, 256)
+            self.beat()
         self.wakeup = False
 
     def supervise_task(self, task):

This would affect much more than this, as self.sentinel and others may go away. This would clean up a lot, and we would all converge more closely on a single correct way to do this, which is the intended use from psycopg.

@mdellweg
Copy link
Member

Here's the point: We need to use select, because the db connection is not the only thing we want to be notified about. This loop is also listening to signals and to the progress of the subprocess if a task is being performed.
Since it can only become more complicated from here, we are thinking about rewriting the whole worker code in asyncronous python.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants