Skip to content

Commit

Permalink
add some queue for nodes operations (#10620)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergey J <[email protected]>
  • Loading branch information
sourcecd and Sergey J authored Oct 18, 2024
1 parent 857b20e commit 07f2e60
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
9 changes: 8 additions & 1 deletion ydb/tools/ydbd_slice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def deduce_nodes_from_args(args, walle_provider, ssh_user):
sys.exit("unable to deduce hosts")

logger.info("use nodes '%s'", result)
return nodes.Nodes(result, args.dry_run, ssh_user=ssh_user)
return nodes.Nodes(result, args.dry_run, ssh_user=ssh_user, queue_size=args.cmd_queue_size)


def ya_build(arcadia_root, artifact, opts, dry_run):
Expand Down Expand Up @@ -1187,6 +1187,13 @@ def main(walle_provider=None):
default="ver-01gswscgce37hdbqyssjm3nd7x",
help=''
)
parser.add_argument(
"--cmd-queue-size",
metavar="SIZE",
type=int,
default=0,
help='the size of the command queue (for ssh commands), which limits their parallel execution on remote nodes'
)

modes = parser.add_subparsers()
walle_provider = walle_provider or NopHostsInformationProvider()
Expand Down
21 changes: 20 additions & 1 deletion ydb/tools/ydbd_slice/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@
import sys
import logging
import subprocess
import queue


logger = logging.getLogger(__name__)


class Nodes(object):
def __init__(self, nodes, dry_run=False, ssh_user=None):
def __init__(self, nodes, dry_run=False, ssh_user=None, queue_size=0):
assert isinstance(nodes, list)
assert len(nodes) > 0
assert isinstance(nodes[0], str)
self._nodes = nodes
self._dry_run = bool(dry_run)
self._ssh_user = ssh_user
self._logger = logger.getChild(self.__class__.__name__)
self._queue = queue.Queue(queue_size)
self._qsize = queue_size

@property
def nodes_list(self):
Expand Down Expand Up @@ -83,7 +86,23 @@ def execute_async_ret(self, cmd, check_retcode=True, nodes=None, results=None):

actual_cmd = self._get_ssh_command_prefix() + [host, cmd]
process = subprocess.Popen(actual_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

if self._qsize > 0:
self._queue.put((actual_cmd, process, host))
if not self._queue.full():
continue
if not self._queue.empty():
actual_cmd, process, host = self._queue.get()
process.wait()

running_jobs.append((actual_cmd, process, host))

if self._qsize > 0:
while not self._queue.empty():
actual_cmd, process, host = self._queue.get()
process.wait()
running_jobs.append((actual_cmd, process, host))

return running_jobs

def execute_async(self, cmd, check_retcode=True, nodes=None, results=None):
Expand Down

0 comments on commit 07f2e60

Please sign in to comment.