Skip to content

Commit

Permalink
wip: use zmq for storage write
Browse files Browse the repository at this point in the history
Signed-off-by: Cedric Hombourger <[email protected]>
  • Loading branch information
chombourger committed Jan 17, 2025
1 parent 997a89b commit cbcfda1
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 59 deletions.
44 changes: 24 additions & 20 deletions mtda/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import subprocess
import tempfile
import time
import zmq
import zstandard as zstd

from mtda.main import MultiTenantDeviceAccess
Expand Down Expand Up @@ -44,6 +45,7 @@ def __init__(self, host=None, session=None, config_files=None,
else:
self._impl = agent
self._agent = agent
self._data = None

if session is None:
HOST = socket.gethostname()
Expand Down Expand Up @@ -115,9 +117,15 @@ def storage_open(self):
while tries > 0:
tries = tries - 1
try:
self._impl.storage_open(self._session)
host = self.remote()
port = self._impl.storage_open(self._session)
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect(f'tcp://{host}:{port}')
self._data = socket
return
except Exception:
except Exception as e:
print(f'### exception from storage_open: {e}')
if tries > 0:
time.sleep(1)
pass
Expand Down Expand Up @@ -201,15 +209,16 @@ def storage_write_image(self, path, callback=None):

try:
# Prepare for download/copy
file.prepare(image_size)
file.prepare(self._data, image_size)

# Copy image to shared storage
file.copy()

# Wait for background writes to complete
file.flush()

except Exception:
except Exception as e:
print(f'#### exception {e}')
raise
finally:
# Storage may be closed now
Expand Down Expand Up @@ -324,8 +333,12 @@ def flush(self):
inputsize = self._inputsize
totalread = self._totalread
outputsize = self._outputsize

agent.storage_flush()
self._socket.close()

while True:
status, writing, written = agent.storage_status(self._session)
status, writing, written = agent.storage_status()
if callback is not None:
callback(imgname, totalread, inputsize, written, outputsize)
if writing is False:
Expand All @@ -335,10 +348,11 @@ def flush(self):
def path(self):
return self._path

def prepare(self, output_size=None, compression=None):
def prepare(self, socket, output_size=None, compression=None):
compr = self.compression() if compression is None else compression
self._inputsize = self.size()
self._outputsize = output_size
self._socket = socket
# if image is uncompressed, we compress on the fly
if compr == CONSTS.IMAGE.RAW.value:
compr = CONSTS.IMAGE.ZST.value
Expand All @@ -364,20 +378,10 @@ def size(self):
def _write_to_storage(self, data):
max_tries = int(CONSTS.STORAGE.TIMEOUT / CONSTS.STORAGE.RETRY_INTERVAL)

for _ in range(max_tries):
result = self._agent.storage_write(data, self._session)
if result != 0:
break
time.sleep(CONSTS.STORAGE.RETRY_INTERVAL)

if result > 0:
return result
elif result < 0:
exc = 'write or decompression error from shared storage'
raise IOError(exc)
else:
exc = 'timeout from shared storage'
raise IOError(exc)
try:
self._socket.send(data)
except Exception as e:
print(f'#### send exception {e}')


class ImageLocal(ImageFile):
Expand Down
2 changes: 0 additions & 2 deletions mtda/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,5 @@ class STORAGE:


class WRITER:
QUEUE_SLOTS = 16
QUEUE_TIMEOUT = 5
READ_SIZE = 1*1024*1024
WRITE_SIZE = 1*1024*1024
23 changes: 20 additions & 3 deletions mtda/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,20 @@ def storage_close(self, session=None):
if self.storage is not None:
self.storage_locked()

self.mtda.debug(3, f"main.storage_close(): {str(result)}")
self.mtda.debug(3, f"main.storage_close(): {result}")
return result

@Pyro4.expose
def storage_flush(self, session=None):
self.mtda.debug(3, "main.storage_flush()")

self._session_check(session)
if self.storage is None:
result = False
else:
result = self._writer.flush()

self.mtda.debug(3, f"main.storage_flush(): {result}")
return result

@Pyro4.expose
Expand Down Expand Up @@ -790,6 +803,7 @@ def storage_open(self, session=None):

self._session_check(session)
owner = self._storage_owner
result = None
status, _, _ = self.storage_status()

if self.storage is None:
Expand All @@ -799,15 +813,18 @@ def storage_open(self, session=None):
elif owner is not None and owner != session:
raise RuntimeError('shared storage in use')
elif self._storage_opened is False:
self.mtda.debug(2, 'main.storage_open: calling driver')
self.storage.open()
self._storage_opened = True
self._storage_owner = session
self._writer.start()
self.mtda.debug(2, 'main.storage_open: starting writer')
result = self._writer.start()
self._storage_event(CONSTS.STORAGE.OPENED, session)
if self.storage is not None:
self.storage_locked()

self.mtda.debug(3, 'main.storage_open(): success')
self.mtda.debug(3, f'main.storage_open(): {result}')
return result

@Pyro4.expose
def storage_status(self, session=None):
Expand Down
72 changes: 38 additions & 34 deletions mtda/storage/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
# ---------------------------------------------------------------------------

import bz2
import queue
import threading
import mtda.constants as CONSTS
import zlib
import zstandard as zstd
import lzma
import zmq


class AsyncImageWriter(queue.Queue):
class AsyncImageWriter:

def __init__(self, mtda, storage, compression=CONSTS.IMAGE.RAW):
self.mtda = mtda
Expand All @@ -27,11 +27,12 @@ def __init__(self, mtda, storage, compression=CONSTS.IMAGE.RAW):
self._blksz = CONSTS.WRITER.WRITE_SIZE
self._exiting = False
self._failed = False
self._socket = None
self._thread = None
self._receiving = False
self._writing = False
self._written = 0
self._zdec = None
super().__init__(maxsize=CONSTS.WRITER.QUEUE_SLOTS)

@property
def compression(self):
Expand Down Expand Up @@ -68,44 +69,47 @@ def compression(self, compression):
def failed(self):
return self._failed

def put(self, chunk, block=True, timeout=None):
self.mtda.debug(3, "storage.writer.put()")

if self.storage is None:
self.mtda.debug(1, "storage.writer.put(): no storage!")
raise IOError("no storage!")
result = super().put(chunk, block, timeout)
# if thread is started and put data is not empty
if len(chunk) > 0 and self._exiting is False:
self._writing = True
self.mtda.debug(3, f"storage.writer.put(): {str(result)}")
def flush(self):
self.mtda.debug(3, "mtda.storage.writer.flush()")

result = None
self._receiving = False
if self._socket:
self._socket.close()
self._socket = None

self.mtda.debug(3, f"storage.writer.flush(): {result}")
return result

def start(self):
self.mtda.debug(3, "mtda.storage.writer.start()")

result = None
context = zmq.Context()
self._socket = context.socket(zmq.PULL)
self._socket.bind("tcp://*:0")

endpoint = self._socket.getsockopt_string(zmq.LAST_ENDPOINT)
result = int(endpoint.split(":")[-1])

self._thread = threading.Thread(target=self.worker,
daemon=True, name='writer')
self._thread.start()

self.mtda.debug(3, f"storage.writer.start(): {str(result)}")
self.mtda.debug(3, f"storage.writer.start(): {result}")
return result

def stop(self):
self.mtda.debug(3, "storage.writer.stop()")

result = None
self.mtda.debug(2, "storage.writer.stop(): waiting on queue...")
self.join()
result = self.flush()
self._exiting = True

if self._thread is not None:
self.mtda.debug(2, "storage.writer.stop(): waiting on thread...")
self._exiting = True
self.put(b'')
self._thread.join()

self.mtda.debug(2, "storage.writer.stop(): all done")

self._thread = None
self._zdec = None

Expand All @@ -118,25 +122,25 @@ def worker(self):
result = None
self._exiting = False
self._failed = False
self._receiving = True
self._written = 0
while self._exiting is False:
if self.empty():
self._writing = True
while self._exiting is False and self._receiving is True:
try:
chunk = self._socket.recv()
self._write(chunk)
except Exception as e:
self.mtda.debug(1, f"storage.writer.worker(): {e}")
self._failed = True
self._writing = False
chunk = self.get()
if self._exiting is False:
try:
self._write(chunk)
except Exception as e:
self.mtda.debug(1, f"storage.writer.worker(): {e}")
self._failed = True
self._writing = False
pass
self.task_done()
pass
if self._failed is True:
self.mtda.debug(1, "storage.writer.worker(): "
"write or decompression error!")

self.mtda.debug(3, f"storage.writer.worker(): {str(result)}")
self._receiving = False
self._writing = False
self.mtda.debug(3, f"storage.writer.worker(): {result}")
return result

def write_raw(self, data):
Expand Down

0 comments on commit cbcfda1

Please sign in to comment.