Skip to content

Commit

Permalink
copyutil: closing the local end of pipes after processes starts
Browse files Browse the repository at this point in the history
CopyTask is is using ReceivingChannels and SendingChannels
which are a list of pipes created by CopyTask and those pipes
are being send to a list of subprocesses so the main task
can communicate with them.

in one dtest that delibratily make once of those child process
to break and exit, from time to time we see it getting stuck
forever.

the reason is the CopyTask process is hanging on `recv` call
on one of those pipes, since pipes are copy into the child processes
there's one fd open on CopyTask and one fd open on child process
when the child process closes the fd, `recv()` doesn't raise EOF
since there an open fd that might still send in data.

so we need to close the local pipes on CopyTask after all child processes
are started

Fixes: #37
  • Loading branch information
fruch committed Sep 21, 2023
1 parent e651e12 commit 0dc8708
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion pylib/cqlshlib/copyutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ def __init__(self, num_channels):
self.pipes = [OneWayPipe() for _ in range(num_channels)]
self.channels = [SendingChannel(p) for p in self.pipes]
self.num_channels = num_channels
self._readers = [p.reader for p in self.pipes]

def release_readers(self):
for reader in self._readers:
reader.close()

def close(self):
for ch in self.channels:
Expand All @@ -177,12 +182,17 @@ def __init__(self, num_channels):
self.pipes = [OneWayPipe() for _ in range(num_channels)]
self.channels = [ReceivingChannel(p) for p in self.pipes]
self._readers = [p.reader for p in self.pipes]
self._writers = [p.writer for p in self.pipes]
self._rlocks = [p.rlock for p in self.pipes]
self._rlocks_by_readers = dict([(p.reader, p.rlock) for p in self.pipes])
self.num_channels = num_channels

self.recv = self.recv_select if IS_LINUX else self.recv_polling

def release_writers(self):
for writer in self._writers:
writer.close()

def recv_select(self, timeout):
"""
Implementation of the recv method for Linux, where select is available. Receive an object from
Expand Down Expand Up @@ -465,7 +475,8 @@ def start_processes(self):
for i, process in enumerate(self.processes):
process.start()
self.trace_process(process.pid)

self.inmsg.release_writers()
self.outmsg.release_readers()
self.trace_process(self.get_pid())

def stop_processes(self):
Expand Down

0 comments on commit 0dc8708

Please sign in to comment.