From a860ea9911fb39871e01499a3342e12c992276d6 Mon Sep 17 00:00:00 2001 From: David Eigen Date: Thu, 16 Jan 2025 02:16:11 +0000 Subject: [PATCH 01/22] make modelinfo for inference thread-safe --- clarifai/client/model.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/clarifai/client/model.py b/clarifai/client/model.py index 0cbef7dd..6502cfdf 100644 --- a/clarifai/client/model.py +++ b/clarifai/client/model.py @@ -424,14 +424,14 @@ def predict(self, raise UserError(f"Too many inputs. Max is {MAX_MODEL_PREDICT_INPUTS}." ) # TODO Use Chunker for inputs len > 128 - self._override_model_version(inference_params, output_config) + model_info = self._get_model_info_for_inference(inference_params, output_config) request = service_pb2.PostModelOutputsRequest( user_app_id=self.user_app_id, model_id=self.id, version_id=self.model_version.id, inputs=inputs, runner_selector=runner_selector, - model=self.model_info) + model=model_info) start_time = time.time() backoff_iterator = BackoffIterator(10) @@ -704,14 +704,16 @@ def generate(self, raise UserError(f"Too many inputs. Max is {MAX_MODEL_PREDICT_INPUTS}." ) # TODO Use Chunker for inputs len > 128 - self._override_model_version(inference_params, output_config) + model_info = self._get_model_info_for_inference(inference_params, output_config) request = service_pb2.PostModelOutputsRequest( user_app_id=self.user_app_id, model_id=self.id, version_id=self.model_version.id, inputs=inputs, runner_selector=runner_selector, - model=self.model_info) + model=model_info) + request.model.model_version.id = self.model_version.id + request.model.model_version.params start_time = time.time() backoff_iterator = BackoffIterator(10) @@ -922,7 +924,7 @@ def generate_by_url(self, inference_params=inference_params, output_config=output_config) - def _req_iterator(self, input_iterator: Iterator[List[Input]], runner_selector: RunnerSelector): + def _req_iterator(self, input_iterator: Iterator[List[Input]], runner_selector: RunnerSelector, model_info: resources_pb2.Model): for inputs in input_iterator: yield service_pb2.PostModelOutputsRequest( user_app_id=self.user_app_id, @@ -930,7 +932,7 @@ def _req_iterator(self, input_iterator: Iterator[List[Input]], runner_selector: version_id=self.model_version.id, inputs=inputs, runner_selector=runner_selector, - model=self.model_info) + model=model_info) def stream(self, inputs: Iterator[List[Input]], @@ -954,8 +956,8 @@ def stream(self, # if not isinstance(inputs, Iterator[List[Input]]): # raise UserError('Invalid inputs, inputs must be a iterator of list of Input objects.') - self._override_model_version(inference_params, output_config) - request = self._req_iterator(inputs, runner_selector) + model_info = self._get_model_info_for_inference(inference_params, output_config) + request = self._req_iterator(inputs, runner_selector, model_info) start_time = time.time() backoff_iterator = BackoffIterator(10) @@ -1168,7 +1170,7 @@ def input_generator(): inference_params=inference_params, output_config=output_config) - def _override_model_version(self, inference_params: Dict = {}, output_config: Dict = {}) -> None: + def _get_model_info_for_inference(self, inference_params: Dict = {}, output_config: Dict = {}) -> None: """Overrides the model version. Args: @@ -1179,13 +1181,14 @@ def _override_model_version(self, inference_params: Dict = {}, output_config: Di select_concepts (list[Concept]): The concepts to select. sample_ms (int): The number of milliseconds to sample. """ - params = Struct() - if inference_params is not None: - params.update(inference_params) - - self.model_info.model_version.output_info.CopyFrom( - resources_pb2.OutputInfo( - output_config=resources_pb2.OutputConfig(**output_config), params=params)) + if not inference_params and not output_config: + return self.model_info + + model_info = resources_pb2.Model() + model_info.CopyFrom(self.model_info) + model_info.model_version.output_info.params.update(inference_params) + model_info.model_version.output_info.output_config.update(output_config) + return model_info def _list_concepts(self) -> List[str]: """Lists all the concepts for the model type. From 477f2f0a92ec34a19e58c1c4c6012a5b9ac79e35 Mon Sep 17 00:00:00 2001 From: David Eigen Date: Tue, 21 Jan 2025 22:58:17 +0000 Subject: [PATCH 02/22] download in parallel to current stream --- clarifai/runners/models/model_servicer.py | 11 ++++------- clarifai/runners/utils/url_fetcher.py | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/clarifai/runners/models/model_servicer.py b/clarifai/runners/models/model_servicer.py index f241271c..f44d85b1 100644 --- a/clarifai/runners/models/model_servicer.py +++ b/clarifai/runners/models/model_servicer.py @@ -1,10 +1,9 @@ -from itertools import tee from typing import Iterator from clarifai_grpc.grpc.api import service_pb2, service_pb2_grpc from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2 -from ..utils.url_fetcher import ensure_urls_downloaded +from ..utils.url_fetcher import ensure_urls_downloaded, map_stream class ModelServicer(service_pb2_grpc.V2Servicer): @@ -66,15 +65,13 @@ def StreamModelOutputs(self, This is the method that will be called when the servicer is run. It takes in an input and returns an output. """ - # Duplicate the iterator - request, request_copy = tee(request) # Download any urls that are not already bytes. - for req in request: - ensure_urls_downloaded(req) + def _download_urls_stream(requests): + yield from map_stream(ensure_urls_downloaded, requests) try: - return self.model.stream_wrapper(request_copy) + return self.model_class.stream(_download_urls_stream(request)) except Exception as e: yield service_pb2.MultiOutputResponse(status=status_pb2.Status( code=status_code_pb2.MODEL_PREDICTION_FAILED, diff --git a/clarifai/runners/utils/url_fetcher.py b/clarifai/runners/utils/url_fetcher.py index 081d298b..fff55fe5 100644 --- a/clarifai/runners/utils/url_fetcher.py +++ b/clarifai/runners/utils/url_fetcher.py @@ -47,3 +47,23 @@ def ensure_urls_downloaded(request, max_threads=128): future.result() except Exception as e: logger.exception(f"Error downloading input: {e}") + return request + + +def map_stream(f, it, parallel=1): + ''' + Applies f to each element of it, yielding the results in order. + If parallel >= 1, uses a ThreadPoolExecutor to apply f in parallel to the current thread. + ''' + if parallel < 1: + return map(f, it) + with ThreadPoolExecutor(max_workers=parallel) as executor: + futures = [] + for i in range(parallel): + futures.append(executor.submit(f, next(it))) + for r in it: + res = futures.pop(0).result() + futures.append(executor.submit(f, r)) # start computing next result before yielding this one + yield res + for f in futures: + yield f.result() From 33bac1b7ae75f66d938da0c1746865bc32fa380b Mon Sep 17 00:00:00 2001 From: David Eigen Date: Tue, 28 Jan 2025 13:02:34 -0500 Subject: [PATCH 03/22] update requirements --- requirements.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/requirements.txt b/requirements.txt index dec59390..c34b4db5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,5 @@ Pillow>=9.5.0 tabulate>=0.9.0 fsspec==2024.6.1 click==8.1.7 +platformdirs==4.3.6 +requests From 5e96b5ca9fd87ecee45020c75b1f97c62d75bd09 Mon Sep 17 00:00:00 2001 From: David Eigen Date: Tue, 28 Jan 2025 13:02:45 -0500 Subject: [PATCH 04/22] maintain venv in user cache dir and add pdb flags --- clarifai/cli/model.py | 10 ++- clarifai/runners/models/model_run_locally.py | 73 ++++++++++++-------- 2 files changed, 53 insertions(+), 30 deletions(-) diff --git a/clarifai/cli/model.py b/clarifai/cli/model.py index 884b9eaa..0cc5d750 100644 --- a/clarifai/cli/model.py +++ b/clarifai/cli/model.py @@ -68,9 +68,11 @@ def download_checkpoints(model_path, out_path): help= 'Specify how to test the model locally: "env" for virtual environment or "container" for Docker container. Defaults to "env".' ) +@click.option('--pdb', is_flag=True, help='Enable PDB debugging when testing the model locally.') @click.option( '--keep_env', is_flag=True, + default=True, help= 'Keep the virtual environment after testing the model locally (applicable for virtualenv mode). Defaults to False.' ) @@ -80,7 +82,7 @@ def download_checkpoints(model_path, out_path): help= 'Keep the Docker image after testing the model locally (applicable for container mode). Defaults to False.' ) -def test_locally(model_path, keep_env=False, keep_image=False, mode='env'): +def test_locally(model_path, pdb=False, keep_env=False, keep_image=False, mode='env'): """Test model locally.""" try: from clarifai.runners.models import model_run_locally @@ -89,9 +91,12 @@ def test_locally(model_path, keep_env=False, keep_image=False, mode='env'): if mode == 'container' and keep_env: raise ValueError("'keep_env' is applicable only for 'env' mode") + if pdb and mode == "container": + raise ValueError("PDB debugging is not supported in container mode.") + if mode == "env": click.echo("Testing model locally in a virtual environment...") - model_run_locally.main(model_path, run_model_server=False, keep_env=keep_env) + model_run_locally.main(model_path, run_model_server=False, keep_env=keep_env, use_pdb=pdb) elif mode == "container": click.echo("Testing model locally inside a container...") model_run_locally.main( @@ -125,6 +130,7 @@ def test_locally(model_path, keep_env=False, keep_image=False, mode='env'): @click.option( '--keep_env', is_flag=True, + default=True, help= 'Keep the virtual environment after testing the model locally (applicable for virtualenv mode). Defaults to False.' ) diff --git a/clarifai/runners/models/model_run_locally.py b/clarifai/runners/models/model_run_locally.py index 08559f10..41364c29 100644 --- a/clarifai/runners/models/model_run_locally.py +++ b/clarifai/runners/models/model_run_locally.py @@ -10,6 +10,7 @@ import traceback import venv +import platformdirs from clarifai_grpc.grpc.api import resources_pb2, service_pb2 from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2 @@ -52,17 +53,14 @@ def _get_env_executable(self): def create_temp_venv(self): """Create a temporary virtual environment.""" - requirements_hash = self._requirements_hash() - - temp_dir = os.path.join(tempfile.gettempdir(), str(requirements_hash)) + venv_key = hashlib.md5(self.model_path.encode('utf-8')).hexdigest() + temp_dir = os.path.join(platformdirs.user_cache_dir('clarifai', 'clarifai'), 'venvs', venv_key) venv_dir = os.path.join(temp_dir, "venv") if os.path.exists(temp_dir): - logger.info(f"Using previous virtual environment at {temp_dir}") - use_existing_venv = True + logger.info(f"Using previous virtual environment at {venv_dir}") else: - logger.info("Creating temporary virtual environment...") - use_existing_venv = False + logger.info("Creating virtual environment...") venv.create(venv_dir, with_pip=True) logger.info(f"Created temporary virtual environment at {venv_dir}") @@ -70,19 +68,27 @@ def create_temp_venv(self): self.temp_dir = temp_dir self.python_executable, self.pip_executable = self._get_env_executable() - return use_existing_venv - def install_requirements(self): """Install the dependencies from requirements.txt and Clarifai.""" + if os.path.exists(os.path.join(self.temp_dir, "requirements.txt")): + requirements = open(self.requirements_file).read() + installed_requirements = open(os.path.join(self.temp_dir, "requirements.txt")).read() + if requirements == installed_requirements: + logger.info("Requirements already installed.") + return _, pip_executable = self._get_env_executable() try: logger.info( f"Installing requirements from {self.requirements_file}... in the virtual environment {self.venv_dir}" ) - subprocess.check_call([pip_executable, "install", "-r", self.requirements_file]) logger.info("Installing Clarifai package...") - subprocess.check_call([pip_executable, "install", "clarifai"]) + subprocess.check_call( + [pip_executable, "install", "clarifai==" + __import__("clarifai").__version__]) + logger.info("Installing model requirements...") + subprocess.check_call([pip_executable, "install", "-r", self.requirements_file]) logger.info("Requirements installed successfully!") + with open(os.path.join(self.temp_dir, "requirements.txt"), "w") as f: + f.write(open(self.requirements_file).read()) except subprocess.CalledProcessError as e: logger.error(f"Error installing requirements: {e}") self.clean_up() @@ -177,9 +183,12 @@ def _run_model_inference(self, model): )) if stream_response: - stream_first_res = next(stream_response) - if stream_first_res.outputs[0].status.code != status_code_pb2.SUCCESS: - logger.error(f"Moddel Prediction failed: {stream_first_res}") + try: + stream_first_res = next(stream_response) + except StopIteration: + stream_first_res = None + if stream_first_res is None or stream_first_res.outputs[0].status.code != status_code_pb2.SUCCESS: + logger.error(f"Model stream failed: {stream_first_res}") else: logger.info( f"Model Prediction succeeded for stream and first response: {stream_first_res}") @@ -191,7 +200,7 @@ def _run_test(self): # send an inference. self._run_model_inference(model) - def test_model(self): + def test_model(self, use_pdb=False): """Test the model by running it locally in the virtual environment.""" import_path = repr(os.path.dirname(os.path.abspath(__file__))) @@ -201,8 +210,12 @@ def test_model(self): f"sys.path.append({import_path}); " f"from model_run_locally import ModelRunLocally; " f"ModelRunLocally({model_path})._run_test()") + main_file = tempfile.NamedTemporaryFile(mode="w") + with open(main_file.name, "w") as f: + f.write(command_string) - command = [self.python_executable, "-c", command_string] + pdb_args = ["-m", "pdb"] if use_pdb else [] + command = [self.python_executable, *pdb_args, main_file.name] process = None try: logger.info("Testing the model locally...") @@ -232,12 +245,13 @@ def test_model(self): process.kill() # run the model server - def run_model_server(self, port=8080): + def run_model_server(self, port=8080, use_pdb=False): """Run the Clarifai Runners's model server.""" + pdb_args = ["-m", "pdb"] if use_pdb else [] command = [ - self.python_executable, "-m", "clarifai.runners.server", "--model_path", self.model_path, - "--grpc", "--port", + self.python_executable, *pdb_args, "-m", "clarifai.runners.server", "--model_path", + self.model_path, "--grpc", "--port", str(port) ] try: @@ -462,16 +476,19 @@ def remove_docker_image(self, image_name): def clean_up(self): """Clean up the temporary virtual environment.""" if os.path.exists(self.temp_dir): - logger.info("Cleaning up temporary virtual environment...") + logger.info("Cleaning up virtual environment...") shutil.rmtree(self.temp_dir) -def main(model_path, - run_model_server=False, - inside_container=False, - port=8080, - keep_env=False, - keep_image=False): +def main( + model_path, + run_model_server=False, + inside_container=False, + port=8080, + keep_env=True, + keep_image=False, + use_pdb=False, +): if not os.environ['CLARIFAI_PAT']: logger.error( @@ -513,9 +530,9 @@ def main(model_path, if not use_existing_env: manager.install_requirements() if run_model_server: - manager.run_model_server(port) + manager.run_model_server(port, use_pdb=use_pdb) else: - manager.test_model() + manager.test_model(use_pdb=use_pdb) finally: if not keep_env: manager.clean_up() From 74cd874d3d35f71af58ea3592b0b0b70123bc1e4 Mon Sep 17 00:00:00 2001 From: David Eigen Date: Thu, 30 Jan 2025 15:55:17 -0500 Subject: [PATCH 05/22] stream and video utils files --- clarifai/runners/utils/stream_utils.py | 199 +++++++++++++++++++++++++ clarifai/runners/utils/video_utils.py | 58 +++++++ 2 files changed, 257 insertions(+) create mode 100644 clarifai/runners/utils/stream_utils.py create mode 100644 clarifai/runners/utils/video_utils.py diff --git a/clarifai/runners/utils/stream_utils.py b/clarifai/runners/utils/stream_utils.py new file mode 100644 index 00000000..c0062457 --- /dev/null +++ b/clarifai/runners/utils/stream_utils.py @@ -0,0 +1,199 @@ +import io + +import requests + +MB = 1024 * 1024 + + +class BufferStream(io.RawIOBase): + ''' + A buffer that reads data from a chunked stream and provides a file-like interface for reading. + + :param chunk_iterator: An iterator that yields chunks of data (bytes) + ''' + + def __init__(self, chunk_iterator): + self._chunk_iterator = chunk_iterator + self.response = None + self.buffer = b'' + self.file_pos = 0 + self.b_pos = 0 + self._eof = False + + #### read() methods + + def readable(self): + return True + + def readinto(self, output_buf): + if self._eof: + return 0 + + try: + # load next chunk if necessary + if self.b_pos == len(self.buffer): + self.buffer = next(self._chunk_iterator) + self.b_pos = 0 + + # copy data to output buffer + n = min(len(output_buf), len(self.buffer - self.b_pos)) + assert n > 0 + + output_buf[:n] = self.buffer[self.b_pos:self.b_pos + n] + + # advance positions + self.b_pos += n + assert self.b_pos <= len(self.buffer) + + return n + + except StopIteration: + self._eof = True + return 0 + + +class SeekableBufferStream(io.RawIOBase): + ''' + EXPERIMENTAL + A buffer that reads data from a chunked stream and provides a file-like interface for reading. + + :param chunk_iterator: An iterator that yields chunks of data (bytes) + :param buffer_size: The maximum size of the buffer in bytes + ''' + + def __init__(self, chunk_iterator, buffer_size=100 * MB): + self._chunk_iterator = chunk_iterator + self.buffer_size = buffer_size + self.buffer_vec = [] + self.file_pos = 0 + self.vec_pos = 0 + self.b_pos = 0 + self._eof = False + + #### read() methods + + def readable(self): + return True + + def readinto(self, output_buf): + if self._eof: + return 0 + + assert self.vec_pos <= len(self.buffer_vec) + + try: + # load next chunk if necessary + if self.vec_pos == len(self.buffer_vec): + self._load_next_chunk() + + # copy data from buffer_vec to output buffer + n = min(len(output_buf), len(self.buffer_vec[self.vec_pos]) - self.b_pos) + assert n > 0 + + output_buf[:n] = self.buffer_vec[self.vec_pos][self.b_pos:self.b_pos + n] + + # advance positions + self.file_pos += n + self.b_pos += n + assert self.b_pos <= len(self.buffer_vec[self.vec_pos]) + if self.b_pos == len(self.buffer_vec[self.vec_pos]): + self.vec_pos += 1 + self.b_pos = 0 + return n + except StopIteration: + self._eof = True + return 0 + + def _load_next_chunk(self, check_bounds=True): + self.buffer_vec.append(next(self._chunk_iterator)) + total = sum(len(chunk) for chunk in self.buffer_vec) + while total > self.buffer_size: + chunk = self.buffer_vec.pop(0) + total -= len(chunk) + self.vec_pos -= 1 + if check_bounds: + assert self.vec_pos >= 0, 'current position fell outside the buffer' + + #### seek() methods (experimental) + + def seekable(self): + return True + + def tell(self): + return self.file_pos + + def seek(self, offset, whence=io.SEEK_SET): + #printerr(f"seek(offset={offset}, whence={('SET', 'CUR', 'END')[whence]})") + # convert to offset from start of file stream + if whence == io.SEEK_SET: + seek_pos = offset + elif whence == io.SEEK_CUR: + seek_pos = self.file_pos + offset + elif whence == io.SEEK_END: + self._seek_to_end() + seek_pos = self.file_pos + offset + else: + raise ValueError(f"Invalid whence: {whence}") + + # set positions to start of buffer vec to begin seeking + self.file_pos -= self.b_pos + self.b_pos = 0 + while self.vec_pos > 0: + self.vec_pos -= 1 + self.file_pos -= len(self.buffer_vec[self.vec_pos]) + + # check if still seeking backwards off the start of the buffer + if seek_pos < self.file_pos: + raise IOError('seek before start of buffer') + + # seek forwards to desired position + while self.file_pos < seek_pos: + if self.vec_pos == len(self.buffer_vec): + self._load_next_chunk() + n = len(self.buffer_vec[self.vec_pos]) + if self.file_pos + n > seek_pos: + self.b_pos = seek_pos - self.file_pos + self.file_pos = seek_pos + break + self.file_pos += n + self.vec_pos += 1 + + # unset EOF flag + self._eof = False + + return self.file_pos + + def _seek_to_end(self): + try: + # skip positions to end of the current buffer vec + if self.b_pos > 0: + self.file_pos += len(self.buffer_vec[self.vec_pos]) - self.b_pos + self.vec_pos += 1 + self.b_pos = 0 + # keep loading chunks until EOF + while True: + while self.vec_pos < len(self.buffer_vec): + self.file_pos += len(self.buffer_vec[self.vec_pos]) + self.vec_pos += 1 + self._load_next_chunk(check_bounds=False) + except StopIteration: + pass + # advance to end of buffer vec + while self.vec_pos < len(self.buffer_vec): + self.file_pos += len(self.buffer_vec[self.vec_pos]) + self.vec_pos += 1 + + +class URLStream(BufferStream): + + def __init__(self, url, chunk_size=1 * MB, buffer_size=10 * MB, requests_kwargs={}): + self.url = url + self.chunk_size = chunk_size + self.response = requests.get(self.url, stream=True, **requests_kwargs) + self.response.raise_for_status() + super().__init__( + self.response.iter_content(chunk_size=self.chunk_size), buffer_size=buffer_size) + + def close(self): + super().close() + self.response.close() diff --git a/clarifai/runners/utils/video_utils.py b/clarifai/runners/utils/video_utils.py new file mode 100644 index 00000000..0f853d29 --- /dev/null +++ b/clarifai/runners/utils/video_utils.py @@ -0,0 +1,58 @@ +import io +import tempfile + +import av +import requests + +from clarifai.runners.utils import stream_utils + + +def stream_video_from_url(url, download_ok=True): + """ + Streams a video at the specified resolution using PyAV. + + :param url: The video URL + :param download_ok: Whether to download the video if the URL is not a stream + """ + protocol = url.split('://', 1)[0] + if protocol == 'rtsp': + # stream from RTSP and send to PyAV + container = av.open(url) + elif protocol in ('http', 'https'): + if not download_ok: + raise ValueError('Download not allowed for URL scheme') + # download the video to the temporary file + # TODO: download just enough to get the file header and stream to pyav if possible, + # otherwise download the whole file + # e.g. if linking to a streamable file format like mpegts (not mp4) + file = tempfile.NamedTemporaryFile(delete=True) + _download_video(url, file) + container = av.open(file.name) + else: + # TODO others: s3, etc. + raise ValueError('Unsupported URL scheme') + + # Decode video frames + yield from container.decode(video=0) + + +def _download_video(url, file): + response = requests.get(url, stream=True) + response.raise_for_status() + for chunk in response.iter_content(chunk_size=1024): + file.write(chunk) + file.flush() + return file + + +def stream_video_from_bytes(bytes_iterator): + """ + Streams a video from a sequence of chunked byte strings of a streamable video + container format. + + :param bytes_iterator: An iterator that yields byte chunks with the video data + """ + buffer = stream_utils.BufferStream(bytes_iterator) + reader = io.BufferedReader(buffer) + container = av.open(reader) + yield from container.decode(video=0) From f49b1f9b4cddd2e7f40718a7e48cd39d4f8ffdca Mon Sep 17 00:00:00 2001 From: David Eigen Date: Thu, 30 Jan 2025 15:58:06 -0500 Subject: [PATCH 06/22] stream and video utils files --- clarifai/runners/utils/video_utils.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/clarifai/runners/utils/video_utils.py b/clarifai/runners/utils/video_utils.py index 0f853d29..17e0480c 100644 --- a/clarifai/runners/utils/video_utils.py +++ b/clarifai/runners/utils/video_utils.py @@ -26,7 +26,7 @@ def stream_video_from_url(url, download_ok=True): # otherwise download the whole file # e.g. if linking to a streamable file format like mpegts (not mp4) file = tempfile.NamedTemporaryFile(delete=True) - _download_video(url, file) + download_file(url, file.name) container = av.open(file.name) else: # TODO others: s3, etc. @@ -36,13 +36,12 @@ def stream_video_from_url(url, download_ok=True): yield from container.decode(video=0) -def _download_video(url, file): +def download_file(url, file_name): response = requests.get(url, stream=True) response.raise_for_status() - for chunk in response.iter_content(chunk_size=1024): - file.write(chunk) - file.flush() - return file + with open(file_name, 'wb') as f: + for chunk in response.iter_content(chunk_size=1024): + f.write(chunk) def stream_video_from_bytes(bytes_iterator): From 01398b7cb1f765bf73b4da026cedbe5a1ecea0c8 Mon Sep 17 00:00:00 2001 From: David Eigen Date: Thu, 30 Jan 2025 17:00:52 -0500 Subject: [PATCH 07/22] stream video file function --- clarifai/client/model.py | 55 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/clarifai/client/model.py b/clarifai/client/model.py index 6502cfdf..1eb29ef1 100644 --- a/clarifai/client/model.py +++ b/clarifai/client/model.py @@ -1,5 +1,7 @@ import json +import logging import os +import subprocess import time from typing import Any, Dict, Generator, Iterator, List, Tuple, Union @@ -924,7 +926,8 @@ def generate_by_url(self, inference_params=inference_params, output_config=output_config) - def _req_iterator(self, input_iterator: Iterator[List[Input]], runner_selector: RunnerSelector, model_info: resources_pb2.Model): + def _req_iterator(self, input_iterator: Iterator[List[Input]], runner_selector: RunnerSelector, + model_info: resources_pb2.Model): for inputs in input_iterator: yield service_pb2.PostModelOutputsRequest( user_app_id=self.user_app_id, @@ -1170,7 +1173,55 @@ def input_generator(): inference_params=inference_params, output_config=output_config) - def _get_model_info_for_inference(self, inference_params: Dict = {}, output_config: Dict = {}) -> None: + def stream_by_video_file(self, + filepath: str, + input_type: str = 'video', + compute_cluster_id: str = None, + nodepool_id: str = None, + deployment_id: str = None, + user_id: str = None, + inference_params: Dict = {}, + output_config: Dict = {}): + """ + Stream the model output based on the given video file. + + Converts the video file to a streamable format, streams as bytes to the model, + and streams back the model outputs. + + Args: + filepath (str): The filepath to predict. + input_type (str, optional): The type of input. Can be 'image', 'text', 'video' or 'audio. + compute_cluster_id (str): The compute cluster ID to use for the model. + nodepool_id (str): The nodepool ID to use for the model. + deployment_id (str): The deployment ID to use for the model. + inference_params (dict): The inference params to override. + output_config (dict): The output config to override. + """ + + if not os.path.isfile(filepath): + raise UserError('Invalid filepath.') + + # TODO check if the file is streamable already + + # Convert the video file to a streamable format + # TODO this conversion can offset the start time by a little bit; we should account for this + # by getting the original start time ffprobe and either sending that to the model so it can adjust + # with the ts of the first frame (too fragile to do all of this adjustment in the client input stream) + command = 'ffmpeg -i FILEPATH -c copy -f mpegts -muxpreload 0 -muxdelay 0 pipe:'.split() + command[command.index('FILEPATH')] = filepath # handles special characters in filepath + proc = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL if self.logger.level >= logging.INFO else None) + + chunk_size = 1024 * 1024 # 1 MB + chunk_iterator = iter(lambda: proc.stdout.read(chunk_size), b'') + + return self.stream_by_bytes(chunk_iterator, input_type, compute_cluster_id, nodepool_id, + deployment_id, user_id, inference_params, output_config) + + def _get_model_info_for_inference(self, inference_params: Dict = {}, + output_config: Dict = {}) -> None: """Overrides the model version. Args: From 917f0752e5a1423b9b7a255ee17e74ef1421f1fb Mon Sep 17 00:00:00 2001 From: David Eigen Date: Thu, 30 Jan 2025 22:03:40 -0500 Subject: [PATCH 08/22] create error model whem model fails to load --- clarifai/runners/models/model_builder.py | 24 ++++++++++++++++++++++++ clarifai/runners/server.py | 6 +++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/clarifai/runners/models/model_builder.py b/clarifai/runners/models/model_builder.py index 8c9dff93..b26e14f7 100644 --- a/clarifai/runners/models/model_builder.py +++ b/clarifai/runners/models/model_builder.py @@ -104,6 +104,12 @@ def create_model_instance(self, load_model=True): model.load_model() return model + def create_error_model_instance(self, exception: Exception): + """ + Create an instance of the model class that just raises the given exception. + """ + return ErrorModel(exception) + def _validate_folder(self, folder): if folder == ".": folder = "" # will getcwd() next which ends with / @@ -641,6 +647,24 @@ def monitor_model_build(self): return False +class ErrorModel(ModelClass): + + def __init__(self, exception): + self.exception = exception + + def load_model(self): + pass + + def predict(self, *args, **kwargs): + raise self.exception from self.exception + + def generate(self, *args, **kwargs): + raise self.exception from self.exception + + def stream(self, *args, **kwargs): + raise self.exception from self.exception + + def upload_model(folder, download_checkpoints, skip_dockerfile): builder = ModelBuilder(folder) if download_checkpoints: diff --git a/clarifai/runners/server.py b/clarifai/runners/server.py index b1c00b81..19e47045 100644 --- a/clarifai/runners/server.py +++ b/clarifai/runners/server.py @@ -70,7 +70,11 @@ def main(): builder = ModelBuilder(parsed_args.model_path) - model = builder.create_model_instance() + try: + model = builder.create_model_instance() + except Exception as e: + logger.exception("Error creating model instance") + model = builder.create_error_model_instance(e) # Setup the grpc server for local development. if parsed_args.grpc: From 6fb0396b2c82176c6092b67bd92bb279d658038b Mon Sep 17 00:00:00 2001 From: David Eigen Date: Mon, 3 Feb 2025 14:58:04 -0500 Subject: [PATCH 09/22] write ffmpeg command in pyav --- clarifai/client/model.py | 14 ++++----- clarifai/runners/utils/video_utils.py | 41 +++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/clarifai/client/model.py b/clarifai/client/model.py index 1eb29ef1..81674396 100644 --- a/clarifai/client/model.py +++ b/clarifai/client/model.py @@ -1,7 +1,5 @@ import json -import logging import os -import subprocess import time from typing import Any, Dict, Generator, Iterator, List, Tuple, Union @@ -1207,15 +1205,13 @@ def stream_by_video_file(self, # TODO this conversion can offset the start time by a little bit; we should account for this # by getting the original start time ffprobe and either sending that to the model so it can adjust # with the ts of the first frame (too fragile to do all of this adjustment in the client input stream) - command = 'ffmpeg -i FILEPATH -c copy -f mpegts -muxpreload 0 -muxdelay 0 pipe:'.split() - command[command.index('FILEPATH')] = filepath # handles special characters in filepath - proc = subprocess.Popen( - command, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL if self.logger.level >= logging.INFO else None) + # or by adjusting the timestamps in the output stream + from clarifai.runners.utils import video_utils + stream = video_utils.recontain_as_streamable(filepath) + # TODO accumulate reads to fill the chunk size chunk_size = 1024 * 1024 # 1 MB - chunk_iterator = iter(lambda: proc.stdout.read(chunk_size), b'') + chunk_iterator = iter(lambda: stream.read(chunk_size), b'') return self.stream_by_bytes(chunk_iterator, input_type, compute_cluster_id, nodepool_id, deployment_id, user_id, inference_params, output_config) diff --git a/clarifai/runners/utils/video_utils.py b/clarifai/runners/utils/video_utils.py index 17e0480c..39bf53b7 100644 --- a/clarifai/runners/utils/video_utils.py +++ b/clarifai/runners/utils/video_utils.py @@ -1,5 +1,7 @@ import io +import os import tempfile +import threading import av import requests @@ -55,3 +57,42 @@ def stream_video_from_bytes(bytes_iterator): reader = io.BufferedReader(buffer) container = av.open(reader) yield from container.decode(video=0) + + +def recontain_as_streamable(filepath): + return recontain(filepath, "mpegts", {"muxpreload": "0", "muxdelay": "0"}) + + +def recontain(input, format, options={}): + # pyav-only implementation of "ffmpeg -i filepath -f mpegts -muxpreload 0 -muxdelay 0 pipe:" + read_pipe_fd, write_pipe_fd = os.pipe() + read_pipe = os.fdopen(read_pipe_fd, "rb") + write_pipe = os.fdopen(write_pipe_fd, "wb") + + def _run_av(): + input_container = output_container = None + try: + # open input and output containers, using mpegts as output format + input_container = av.open(input, options=options) + output_container = av.open(write_pipe, mode="w", format=format) + + # Copy streams directly without re-encoding + for stream in input_container.streams: + output_container.add_stream_from_template(stream) + + # Read packets from input and write them to output + for packet in input_container.demux(): + if not packet.size: + break + output_container.mux(packet) + + finally: + if output_container: + output_container.close() + if input_container: + input_container.close() + + t = threading.Thread(target=_run_av) + t.start() + + return read_pipe From 91f5aca88d6ccdbd18987d8a24561e0afda46641 Mon Sep 17 00:00:00 2001 From: David Eigen Date: Tue, 4 Feb 2025 12:34:41 -0500 Subject: [PATCH 10/22] remove pip install clarifai from model_run_locally --- clarifai/runners/models/model_run_locally.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/clarifai/runners/models/model_run_locally.py b/clarifai/runners/models/model_run_locally.py index 41364c29..be2f3aed 100644 --- a/clarifai/runners/models/model_run_locally.py +++ b/clarifai/runners/models/model_run_locally.py @@ -81,9 +81,6 @@ def install_requirements(self): logger.info( f"Installing requirements from {self.requirements_file}... in the virtual environment {self.venv_dir}" ) - logger.info("Installing Clarifai package...") - subprocess.check_call( - [pip_executable, "install", "clarifai==" + __import__("clarifai").__version__]) logger.info("Installing model requirements...") subprocess.check_call([pip_executable, "install", "-r", self.requirements_file]) logger.info("Requirements installed successfully!") From 9f18f9e7ed0cbd1ac0dc5cf4230b76c9c19b17c1 Mon Sep 17 00:00:00 2001 From: David Eigen Date: Tue, 4 Feb 2025 12:36:54 -0500 Subject: [PATCH 11/22] rename function --- clarifai/runners/utils/video_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clarifai/runners/utils/video_utils.py b/clarifai/runners/utils/video_utils.py index 39bf53b7..23227205 100644 --- a/clarifai/runners/utils/video_utils.py +++ b/clarifai/runners/utils/video_utils.py @@ -9,7 +9,7 @@ from clarifai.runners.utils import stream_utils -def stream_video_from_url(url, download_ok=True): +def stream_frames_from_url(url, download_ok=True): """ Streams a video at the specified resolution using PyAV. @@ -46,7 +46,7 @@ def download_file(url, file_name): f.write(chunk) -def stream_video_from_bytes(bytes_iterator): +def stream_frames_from_bytes(bytes_iterator): """ Streams a video from a sequence of chunked byte strings of a streamable video container format. From 7fbd78cd46ba27891b07a664df3412534082767c Mon Sep 17 00:00:00 2001 From: David Eigen Date: Tue, 4 Feb 2025 21:58:38 -0500 Subject: [PATCH 12/22] stream_util changes --- clarifai/runners/models/model_servicer.py | 5 +- clarifai/runners/utils/stream_utils.py | 142 ++++++++++++++-------- clarifai/runners/utils/url_fetcher.py | 26 ++-- 3 files changed, 103 insertions(+), 70 deletions(-) diff --git a/clarifai/runners/models/model_servicer.py b/clarifai/runners/models/model_servicer.py index f44d85b1..9e334fc5 100644 --- a/clarifai/runners/models/model_servicer.py +++ b/clarifai/runners/models/model_servicer.py @@ -3,7 +3,8 @@ from clarifai_grpc.grpc.api import service_pb2, service_pb2_grpc from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2 -from ..utils.url_fetcher import ensure_urls_downloaded, map_stream +from ..utils.stream_utils import readahead +from ..utils.url_fetcher import ensure_urls_downloaded class ModelServicer(service_pb2_grpc.V2Servicer): @@ -68,7 +69,7 @@ def StreamModelOutputs(self, # Download any urls that are not already bytes. def _download_urls_stream(requests): - yield from map_stream(ensure_urls_downloaded, requests) + return readahead(map(ensure_urls_downloaded, requests)) try: return self.model_class.stream(_download_urls_stream(request)) diff --git a/clarifai/runners/utils/stream_utils.py b/clarifai/runners/utils/stream_utils.py index c0062457..6b51b582 100644 --- a/clarifai/runners/utils/stream_utils.py +++ b/clarifai/runners/utils/stream_utils.py @@ -1,27 +1,31 @@ import io +import queue -import requests +import threading +from concurrent.futures import ThreadPoolExecutor MB = 1024 * 1024 -class BufferStream(io.RawIOBase): +class StreamingChunksReader(io.RawIOBase): ''' - A buffer that reads data from a chunked stream and provides a file-like interface for reading. + A buffered reader that reads data from an iterator yielding chunks of bytes, used + to provide file-like access to a streaming data source. - :param chunk_iterator: An iterator that yields chunks of data (bytes) - ''' + :param chunk_iterator: An iterator that yields chunks of data (bytes) + ''' def __init__(self, chunk_iterator): + """ + Args: + chunk_iterator (iterator): An iterator that yields chunks of bytes. + """ self._chunk_iterator = chunk_iterator self.response = None self.buffer = b'' - self.file_pos = 0 self.b_pos = 0 self._eof = False - #### read() methods - def readable(self): return True @@ -36,7 +40,7 @@ def readinto(self, output_buf): self.b_pos = 0 # copy data to output buffer - n = min(len(output_buf), len(self.buffer - self.b_pos)) + n = min(len(output_buf), len(self.buffer) - self.b_pos) assert n > 0 output_buf[:n] = self.buffer[self.b_pos:self.b_pos + n] @@ -52,16 +56,21 @@ def readinto(self, output_buf): return 0 -class SeekableBufferStream(io.RawIOBase): - ''' - EXPERIMENTAL - A buffer that reads data from a chunked stream and provides a file-like interface for reading. +class SeekableStreamingChunksReader(io.RawIOBase): + """ + A buffered reader that reads data from an iterator yielding chunks of bytes, used + to provide file-like access to a streaming data source. - :param chunk_iterator: An iterator that yields chunks of data (bytes) - :param buffer_size: The maximum size of the buffer in bytes - ''' + This class allows supports limited seeking to positions within the stream, by buffering + buffering chunks internally and supporting basic seek operations within the buffer. + """ def __init__(self, chunk_iterator, buffer_size=100 * MB): + """ + Args: + chunk_iterator (iterator): An iterator that yields chunks of bytes. + buffer_size (int): Maximum buffer size in bytes before old chunks are discarded. + """ self._chunk_iterator = chunk_iterator self.buffer_size = buffer_size self.buffer_vec = [] @@ -76,6 +85,15 @@ def readable(self): return True def readinto(self, output_buf): + """ + Read data into the given buffer. + + Args: + output_buf (bytearray): Buffer to read data into. + + Returns: + int: Number of bytes read. + """ if self._eof: return 0 @@ -107,7 +125,7 @@ def readinto(self, output_buf): def _load_next_chunk(self, check_bounds=True): self.buffer_vec.append(next(self._chunk_iterator)) total = sum(len(chunk) for chunk in self.buffer_vec) - while total > self.buffer_size: + while total > self.buffer_size and len(self.buffer_vec) > 1: # keep at least the last chunk chunk = self.buffer_vec.pop(0) total -= len(chunk) self.vec_pos -= 1 @@ -123,15 +141,27 @@ def tell(self): return self.file_pos def seek(self, offset, whence=io.SEEK_SET): - #printerr(f"seek(offset={offset}, whence={('SET', 'CUR', 'END')[whence]})") - # convert to offset from start of file stream + """ + Seek to a new position in the buffered stream. + + Args: + offset (int): The offset to seek to. + whence (int): The reference position (SEEK_SET, SEEK_CUR). + SEEK_END is not supported. + + Returns: + int: The new file position. + + Raises: + ValueError: If an invalid `whence` value is provided. + IOError: If seeking before the start of the buffer. + """ if whence == io.SEEK_SET: seek_pos = offset elif whence == io.SEEK_CUR: seek_pos = self.file_pos + offset elif whence == io.SEEK_END: - self._seek_to_end() - seek_pos = self.file_pos + offset + raise ValueError('SEEK_END is not supported') else: raise ValueError(f"Invalid whence: {whence}") @@ -163,37 +193,47 @@ def seek(self, offset, whence=io.SEEK_SET): return self.file_pos - def _seek_to_end(self): - try: - # skip positions to end of the current buffer vec - if self.b_pos > 0: - self.file_pos += len(self.buffer_vec[self.vec_pos]) - self.b_pos - self.vec_pos += 1 - self.b_pos = 0 - # keep loading chunks until EOF - while True: - while self.vec_pos < len(self.buffer_vec): - self.file_pos += len(self.buffer_vec[self.vec_pos]) - self.vec_pos += 1 - self._load_next_chunk(check_bounds=False) - except StopIteration: - pass - # advance to end of buffer vec - while self.vec_pos < len(self.buffer_vec): - self.file_pos += len(self.buffer_vec[self.vec_pos]) - self.vec_pos += 1 +def readahead(iterator, n=1, daemon=True): + """ + Iterator wrapper that reads ahead from the underlying iterator, using a background thread. + + :Args: + iterator (iterator): The iterator to read from. + n (int): The maximum number of items to read ahead. + daemon (bool): Whether the background thread should be a daemon thread. + """ + q = queue.Queue(maxsize=n) + _sentinel = object() + + def _read(): + for x in iterator: + q.put(x) + q.put(_sentinel) -class URLStream(BufferStream): + t = threading.Thread(target=_read, daemon=daemon) + t.start() + while True: + x = q.get() + if x is _sentinel: + break + yield x - def __init__(self, url, chunk_size=1 * MB, buffer_size=10 * MB, requests_kwargs={}): - self.url = url - self.chunk_size = chunk_size - self.response = requests.get(self.url, stream=True, **requests_kwargs) - self.response.raise_for_status() - super().__init__( - self.response.iter_content(chunk_size=self.chunk_size), buffer_size=buffer_size) - def close(self): - super().close() - self.response.close() +def map(f, iterator, parallel=1): + ''' + Apply a function to each item in an iterator, optionally using multiple threads. + Similar to the built-in `map` function, but with support for parallel execution. + ''' + if parallel < 1: + return map(f, iterator) + with ThreadPoolExecutor(max_workers=parallel) as executor: + futures = [] + for i in range(parallel): + futures.append(executor.submit(f, next(iterator))) + for r in iterator: + res = futures.pop(0).result() + futures.append(executor.submit(f, r)) # start computing next result before yielding this one + yield res + for f in futures: + yield f.result() diff --git a/clarifai/runners/utils/url_fetcher.py b/clarifai/runners/utils/url_fetcher.py index fff55fe5..7cbfa8a1 100644 --- a/clarifai/runners/utils/url_fetcher.py +++ b/clarifai/runners/utils/url_fetcher.py @@ -1,7 +1,9 @@ import concurrent.futures +from typing import Iterable import fsspec +from clarifai.runners.utils import MB from clarifai.utils.logging import logger @@ -50,20 +52,10 @@ def ensure_urls_downloaded(request, max_threads=128): return request -def map_stream(f, it, parallel=1): - ''' - Applies f to each element of it, yielding the results in order. - If parallel >= 1, uses a ThreadPoolExecutor to apply f in parallel to the current thread. - ''' - if parallel < 1: - return map(f, it) - with ThreadPoolExecutor(max_workers=parallel) as executor: - futures = [] - for i in range(parallel): - futures.append(executor.submit(f, next(it))) - for r in it: - res = futures.pop(0).result() - futures.append(executor.submit(f, r)) # start computing next result before yielding this one - yield res - for f in futures: - yield f.result() +def stream_url(url: str, chunk_size: int = 1 * MB) -> Iterable[bytes]: + """ + Opens a stream of byte chunks from a URL. + """ + # block_size=0 means that the file is streamed + with fsspec.open(url, 'rb', block_size=0) as f: + yield from iter(lambda: f.read(chunk_size), b'') From 00ebae0e623a6356fe6d3900996914a8f2b75a58 Mon Sep 17 00:00:00 2001 From: David Eigen Date: Tue, 4 Feb 2025 22:06:08 -0500 Subject: [PATCH 13/22] revert running-related changes --- clarifai/cli/model.py | 10 +-- clarifai/runners/models/model_builder.py | 24 ------- clarifai/runners/models/model_run_locally.py | 70 ++++++++------------ requirements.txt | 2 - 4 files changed, 30 insertions(+), 76 deletions(-) diff --git a/clarifai/cli/model.py b/clarifai/cli/model.py index 0cc5d750..884b9eaa 100644 --- a/clarifai/cli/model.py +++ b/clarifai/cli/model.py @@ -68,11 +68,9 @@ def download_checkpoints(model_path, out_path): help= 'Specify how to test the model locally: "env" for virtual environment or "container" for Docker container. Defaults to "env".' ) -@click.option('--pdb', is_flag=True, help='Enable PDB debugging when testing the model locally.') @click.option( '--keep_env', is_flag=True, - default=True, help= 'Keep the virtual environment after testing the model locally (applicable for virtualenv mode). Defaults to False.' ) @@ -82,7 +80,7 @@ def download_checkpoints(model_path, out_path): help= 'Keep the Docker image after testing the model locally (applicable for container mode). Defaults to False.' ) -def test_locally(model_path, pdb=False, keep_env=False, keep_image=False, mode='env'): +def test_locally(model_path, keep_env=False, keep_image=False, mode='env'): """Test model locally.""" try: from clarifai.runners.models import model_run_locally @@ -91,12 +89,9 @@ def test_locally(model_path, pdb=False, keep_env=False, keep_image=False, mode=' if mode == 'container' and keep_env: raise ValueError("'keep_env' is applicable only for 'env' mode") - if pdb and mode == "container": - raise ValueError("PDB debugging is not supported in container mode.") - if mode == "env": click.echo("Testing model locally in a virtual environment...") - model_run_locally.main(model_path, run_model_server=False, keep_env=keep_env, use_pdb=pdb) + model_run_locally.main(model_path, run_model_server=False, keep_env=keep_env) elif mode == "container": click.echo("Testing model locally inside a container...") model_run_locally.main( @@ -130,7 +125,6 @@ def test_locally(model_path, pdb=False, keep_env=False, keep_image=False, mode=' @click.option( '--keep_env', is_flag=True, - default=True, help= 'Keep the virtual environment after testing the model locally (applicable for virtualenv mode). Defaults to False.' ) diff --git a/clarifai/runners/models/model_builder.py b/clarifai/runners/models/model_builder.py index b26e14f7..8c9dff93 100644 --- a/clarifai/runners/models/model_builder.py +++ b/clarifai/runners/models/model_builder.py @@ -104,12 +104,6 @@ def create_model_instance(self, load_model=True): model.load_model() return model - def create_error_model_instance(self, exception: Exception): - """ - Create an instance of the model class that just raises the given exception. - """ - return ErrorModel(exception) - def _validate_folder(self, folder): if folder == ".": folder = "" # will getcwd() next which ends with / @@ -647,24 +641,6 @@ def monitor_model_build(self): return False -class ErrorModel(ModelClass): - - def __init__(self, exception): - self.exception = exception - - def load_model(self): - pass - - def predict(self, *args, **kwargs): - raise self.exception from self.exception - - def generate(self, *args, **kwargs): - raise self.exception from self.exception - - def stream(self, *args, **kwargs): - raise self.exception from self.exception - - def upload_model(folder, download_checkpoints, skip_dockerfile): builder = ModelBuilder(folder) if download_checkpoints: diff --git a/clarifai/runners/models/model_run_locally.py b/clarifai/runners/models/model_run_locally.py index be2f3aed..08559f10 100644 --- a/clarifai/runners/models/model_run_locally.py +++ b/clarifai/runners/models/model_run_locally.py @@ -10,7 +10,6 @@ import traceback import venv -import platformdirs from clarifai_grpc.grpc.api import resources_pb2, service_pb2 from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2 @@ -53,14 +52,17 @@ def _get_env_executable(self): def create_temp_venv(self): """Create a temporary virtual environment.""" - venv_key = hashlib.md5(self.model_path.encode('utf-8')).hexdigest() - temp_dir = os.path.join(platformdirs.user_cache_dir('clarifai', 'clarifai'), 'venvs', venv_key) + requirements_hash = self._requirements_hash() + + temp_dir = os.path.join(tempfile.gettempdir(), str(requirements_hash)) venv_dir = os.path.join(temp_dir, "venv") if os.path.exists(temp_dir): - logger.info(f"Using previous virtual environment at {venv_dir}") + logger.info(f"Using previous virtual environment at {temp_dir}") + use_existing_venv = True else: - logger.info("Creating virtual environment...") + logger.info("Creating temporary virtual environment...") + use_existing_venv = False venv.create(venv_dir, with_pip=True) logger.info(f"Created temporary virtual environment at {venv_dir}") @@ -68,24 +70,19 @@ def create_temp_venv(self): self.temp_dir = temp_dir self.python_executable, self.pip_executable = self._get_env_executable() + return use_existing_venv + def install_requirements(self): """Install the dependencies from requirements.txt and Clarifai.""" - if os.path.exists(os.path.join(self.temp_dir, "requirements.txt")): - requirements = open(self.requirements_file).read() - installed_requirements = open(os.path.join(self.temp_dir, "requirements.txt")).read() - if requirements == installed_requirements: - logger.info("Requirements already installed.") - return _, pip_executable = self._get_env_executable() try: logger.info( f"Installing requirements from {self.requirements_file}... in the virtual environment {self.venv_dir}" ) - logger.info("Installing model requirements...") subprocess.check_call([pip_executable, "install", "-r", self.requirements_file]) + logger.info("Installing Clarifai package...") + subprocess.check_call([pip_executable, "install", "clarifai"]) logger.info("Requirements installed successfully!") - with open(os.path.join(self.temp_dir, "requirements.txt"), "w") as f: - f.write(open(self.requirements_file).read()) except subprocess.CalledProcessError as e: logger.error(f"Error installing requirements: {e}") self.clean_up() @@ -180,12 +177,9 @@ def _run_model_inference(self, model): )) if stream_response: - try: - stream_first_res = next(stream_response) - except StopIteration: - stream_first_res = None - if stream_first_res is None or stream_first_res.outputs[0].status.code != status_code_pb2.SUCCESS: - logger.error(f"Model stream failed: {stream_first_res}") + stream_first_res = next(stream_response) + if stream_first_res.outputs[0].status.code != status_code_pb2.SUCCESS: + logger.error(f"Moddel Prediction failed: {stream_first_res}") else: logger.info( f"Model Prediction succeeded for stream and first response: {stream_first_res}") @@ -197,7 +191,7 @@ def _run_test(self): # send an inference. self._run_model_inference(model) - def test_model(self, use_pdb=False): + def test_model(self): """Test the model by running it locally in the virtual environment.""" import_path = repr(os.path.dirname(os.path.abspath(__file__))) @@ -207,12 +201,8 @@ def test_model(self, use_pdb=False): f"sys.path.append({import_path}); " f"from model_run_locally import ModelRunLocally; " f"ModelRunLocally({model_path})._run_test()") - main_file = tempfile.NamedTemporaryFile(mode="w") - with open(main_file.name, "w") as f: - f.write(command_string) - pdb_args = ["-m", "pdb"] if use_pdb else [] - command = [self.python_executable, *pdb_args, main_file.name] + command = [self.python_executable, "-c", command_string] process = None try: logger.info("Testing the model locally...") @@ -242,13 +232,12 @@ def test_model(self, use_pdb=False): process.kill() # run the model server - def run_model_server(self, port=8080, use_pdb=False): + def run_model_server(self, port=8080): """Run the Clarifai Runners's model server.""" - pdb_args = ["-m", "pdb"] if use_pdb else [] command = [ - self.python_executable, *pdb_args, "-m", "clarifai.runners.server", "--model_path", - self.model_path, "--grpc", "--port", + self.python_executable, "-m", "clarifai.runners.server", "--model_path", self.model_path, + "--grpc", "--port", str(port) ] try: @@ -473,19 +462,16 @@ def remove_docker_image(self, image_name): def clean_up(self): """Clean up the temporary virtual environment.""" if os.path.exists(self.temp_dir): - logger.info("Cleaning up virtual environment...") + logger.info("Cleaning up temporary virtual environment...") shutil.rmtree(self.temp_dir) -def main( - model_path, - run_model_server=False, - inside_container=False, - port=8080, - keep_env=True, - keep_image=False, - use_pdb=False, -): +def main(model_path, + run_model_server=False, + inside_container=False, + port=8080, + keep_env=False, + keep_image=False): if not os.environ['CLARIFAI_PAT']: logger.error( @@ -527,9 +513,9 @@ def main( if not use_existing_env: manager.install_requirements() if run_model_server: - manager.run_model_server(port, use_pdb=use_pdb) + manager.run_model_server(port) else: - manager.test_model(use_pdb=use_pdb) + manager.test_model() finally: if not keep_env: manager.clean_up() diff --git a/requirements.txt b/requirements.txt index c34b4db5..dec59390 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,5 +9,3 @@ Pillow>=9.5.0 tabulate>=0.9.0 fsspec==2024.6.1 click==8.1.7 -platformdirs==4.3.6 -requests From 8bac7974babf547089a31f3d9abfb50dab4225d2 Mon Sep 17 00:00:00 2001 From: David Eigen Date: Wed, 5 Feb 2025 16:25:13 -0500 Subject: [PATCH 14/22] move ensure_urls_downloaded into model class with enable flag --- clarifai/runners/models/base_typed_model.py | 9 +++++++++ clarifai/runners/models/model_class.py | 22 +++++++++++++++++++-- clarifai/runners/models/model_runner.py | 4 ---- clarifai/runners/models/model_servicer.py | 16 +-------------- 4 files changed, 30 insertions(+), 21 deletions(-) diff --git a/clarifai/runners/models/base_typed_model.py b/clarifai/runners/models/base_typed_model.py index 2809d0c0..a6ec3142 100644 --- a/clarifai/runners/models/base_typed_model.py +++ b/clarifai/runners/models/base_typed_model.py @@ -6,6 +6,9 @@ from clarifai_grpc.grpc.api.service_pb2 import PostModelOutputsRequest from google.protobuf import json_format +from clarifai.runners.utils.stream_utils import readahead +from clarifai.runners.utils.url_fetcher import ensure_urls_downloaded + from ..utils.data_handler import InputDataHandler, OutputDataHandler from .model_class import ModelClass @@ -46,6 +49,8 @@ def convert_output_to_proto(self, outputs: list): def predict_wrapper( self, request: service_pb2.PostModelOutputsRequest) -> service_pb2.MultiOutputResponse: + if self.download_request_urls: + ensure_urls_downloaded(request) list_dict_input, inference_params = self.parse_input_request(request) outputs = self.predict(list_dict_input, inference_parameters=inference_params) return self.convert_output_to_proto(outputs) @@ -53,6 +58,8 @@ def predict_wrapper( def generate_wrapper( self, request: PostModelOutputsRequest) -> Iterator[service_pb2.MultiOutputResponse]: list_dict_input, inference_params = self.parse_input_request(request) + if self.download_request_urls: + ensure_urls_downloaded(request) outputs = self.generate(list_dict_input, inference_parameters=inference_params) for output in outputs: yield self.convert_output_to_proto(output) @@ -69,6 +76,8 @@ def stream_wrapper(self, request: Iterator[PostModelOutputsRequest] first_request = next(request) _, inference_params = self.parse_input_request(first_request) request_iterator = itertools.chain([first_request], request) + if self.download_request_urls: + request_iterator = readahead(map(ensure_urls_downloaded, request_iterator)) outputs = self.stream(self._preprocess_stream(request_iterator), inference_params) for output in outputs: yield self.convert_output_to_proto(output) diff --git a/clarifai/runners/models/model_class.py b/clarifai/runners/models/model_class.py index 5b342ba2..71381764 100644 --- a/clarifai/runners/models/model_class.py +++ b/clarifai/runners/models/model_class.py @@ -3,23 +3,41 @@ from clarifai_grpc.grpc.api import service_pb2 +from clarifai.runners.utils.stream_utils import readahead +from clarifai.runners.utils.url_fetcher import ensure_urls_downloaded + class ModelClass(ABC): + download_request_urls = True + def predict_wrapper( self, request: service_pb2.PostModelOutputsRequest) -> service_pb2.MultiOutputResponse: """This method is used for input/output proto data conversion""" + # Download any urls that are not already bytes. + if self.download_request_urls: + ensure_urls_downloaded(request) + return self.predict(request) def generate_wrapper(self, request: service_pb2.PostModelOutputsRequest ) -> Iterator[service_pb2.MultiOutputResponse]: """This method is used for input/output proto data conversion and yield outcome""" + # Download any urls that are not already bytes. + if self.download_request_urls: + ensure_urls_downloaded(request) + return self.generate(request) - def stream_wrapper(self, request: service_pb2.PostModelOutputsRequest + def stream_wrapper(self, request_stream: Iterator[service_pb2.PostModelOutputsRequest] ) -> Iterator[service_pb2.MultiOutputResponse]: """This method is used for input/output proto data conversion and yield outcome""" - return self.stream(request) + + # Download any urls that are not already bytes. + if self.download_request_urls: + request_stream = readahead(map(ensure_urls_downloaded, request_stream)) + + return self.stream(request_stream) @abstractmethod def load_model(self): diff --git a/clarifai/runners/models/model_runner.py b/clarifai/runners/models/model_runner.py index a24cba4d..9e8a495a 100644 --- a/clarifai/runners/models/model_runner.py +++ b/clarifai/runners/models/model_runner.py @@ -5,7 +5,6 @@ from clarifai_protocol import BaseRunner from clarifai_protocol.utils.health import HealthProbeRequestHandler -from ..utils.url_fetcher import ensure_urls_downloaded from .model_class import ModelClass @@ -79,7 +78,6 @@ def runner_item_predict(self, if not runner_item.HasField('post_model_outputs_request'): raise Exception("Unexpected work item type: {}".format(runner_item)) request = runner_item.post_model_outputs_request - ensure_urls_downloaded(request) resp = self.model.predict_wrapper(request) successes = [o.status.code == status_code_pb2.SUCCESS for o in resp.outputs] @@ -109,7 +107,6 @@ def runner_item_generate( if not runner_item.HasField('post_model_outputs_request'): raise Exception("Unexpected work item type: {}".format(runner_item)) request = runner_item.post_model_outputs_request - ensure_urls_downloaded(request) for resp in self.model.generate_wrapper(request): successes = [] @@ -169,5 +166,4 @@ def pmo_iterator(runner_item_iterator): for runner_item in runner_item_iterator: if not runner_item.HasField('post_model_outputs_request'): raise Exception("Unexpected work item type: {}".format(runner_item)) - ensure_urls_downloaded(runner_item.post_model_outputs_request) yield runner_item.post_model_outputs_request diff --git a/clarifai/runners/models/model_servicer.py b/clarifai/runners/models/model_servicer.py index 9e334fc5..a32f6e3e 100644 --- a/clarifai/runners/models/model_servicer.py +++ b/clarifai/runners/models/model_servicer.py @@ -3,9 +3,6 @@ from clarifai_grpc.grpc.api import service_pb2, service_pb2_grpc from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2 -from ..utils.stream_utils import readahead -from ..utils.url_fetcher import ensure_urls_downloaded - class ModelServicer(service_pb2_grpc.V2Servicer): """ @@ -27,9 +24,6 @@ def PostModelOutputs(self, request: service_pb2.PostModelOutputsRequest, returns an output. """ - # Download any urls that are not already bytes. - ensure_urls_downloaded(request) - try: return self.model.predict_wrapper(request) except Exception as e: @@ -46,9 +40,6 @@ def GenerateModelOutputs(self, request: service_pb2.PostModelOutputsRequest, This is the method that will be called when the servicer is run. It takes in an input and returns an output. """ - # Download any urls that are not already bytes. - ensure_urls_downloaded(request) - try: return self.model.generate_wrapper(request) except Exception as e: @@ -66,13 +57,8 @@ def StreamModelOutputs(self, This is the method that will be called when the servicer is run. It takes in an input and returns an output. """ - - # Download any urls that are not already bytes. - def _download_urls_stream(requests): - return readahead(map(ensure_urls_downloaded, requests)) - try: - return self.model_class.stream(_download_urls_stream(request)) + return self.model_class.stream(request) except Exception as e: yield service_pb2.MultiOutputResponse(status=status_pb2.Status( code=status_code_pb2.MODEL_PREDICTION_FAILED, From 55e3f780d47904cb2da67fed0f52294933c77e28 Mon Sep 17 00:00:00 2001 From: David Eigen Date: Wed, 5 Feb 2025 19:11:42 -0500 Subject: [PATCH 15/22] revert unrelated change --- clarifai/runners/server.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/clarifai/runners/server.py b/clarifai/runners/server.py index 19e47045..b1c00b81 100644 --- a/clarifai/runners/server.py +++ b/clarifai/runners/server.py @@ -70,11 +70,7 @@ def main(): builder = ModelBuilder(parsed_args.model_path) - try: - model = builder.create_model_instance() - except Exception as e: - logger.exception("Error creating model instance") - model = builder.create_error_model_instance(e) + model = builder.create_model_instance() # Setup the grpc server for local development. if parsed_args.grpc: From 03897766a08b56a5e8f186bc91c3adfeec10f5c5 Mon Sep 17 00:00:00 2001 From: David Eigen Date: Wed, 5 Feb 2025 19:16:16 -0500 Subject: [PATCH 16/22] fix import --- clarifai/runners/utils/url_fetcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clarifai/runners/utils/url_fetcher.py b/clarifai/runners/utils/url_fetcher.py index 7cbfa8a1..28b539d4 100644 --- a/clarifai/runners/utils/url_fetcher.py +++ b/clarifai/runners/utils/url_fetcher.py @@ -3,7 +3,7 @@ import fsspec -from clarifai.runners.utils import MB +from clarifai.runners.utils.stream_utils import MB from clarifai.utils.logging import logger From e002eb24729cd14537b8aa790c75d16eeb508042 Mon Sep 17 00:00:00 2001 From: David Eigen Date: Wed, 5 Feb 2025 19:20:24 -0500 Subject: [PATCH 17/22] add test --- tests/runners/test_stream_utils.py | 78 ++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 tests/runners/test_stream_utils.py diff --git a/tests/runners/test_stream_utils.py b/tests/runners/test_stream_utils.py new file mode 100644 index 00000000..0aca7c3b --- /dev/null +++ b/tests/runners/test_stream_utils.py @@ -0,0 +1,78 @@ +import io +import unittest + +from clarifai.runners.utils.stream_utils import (SeekableStreamingChunksReader, + StreamingChunksReader) + + +class TestStreamingChunksReader(unittest.TestCase): + + def setUp(self): + self.chunks = [b'hello', b'world', b'12345'] + #self.reader = BufferStream(iter(self.chunks), buffer_size=10) + self.reader = StreamingChunksReader(iter(self.chunks)) + + def test_read(self): + buffer = bytearray(5) + self.assertEqual(self.reader.readinto(buffer), 5) + self.assertEqual(buffer, b'hello') + + def test_read_file(self): + self.assertEqual(self.reader.read(5), b'hello') + + def test_read_partial_chunk(self): + """Test reading fewer bytes than a chunk contains, across multiple reads.""" + buffer = bytearray(3) + self.assertEqual(self.reader.readinto(buffer), 3) + self.assertEqual(buffer, b'hel') + self.assertEqual(self.reader.readinto(buffer), 2) + self.assertEqual(buffer[:2], b'lo') + self.assertEqual(self.reader.readinto(buffer), 3) + self.assertEqual(buffer, b'wor') + + def test_large_chunk(self): + """Test handling a chunk larger than the buffer size.""" + large_chunk = b'a' * 20 + reader = StreamingChunksReader(iter([large_chunk])) + buffer = bytearray(10) + self.assertEqual(reader.readinto(buffer), 10) + self.assertEqual(buffer, b'a' * 10) + self.assertEqual(reader.readinto(buffer), 10) + self.assertEqual(buffer, b'a' * 10) + + +class TestSeekableStreamingChunksReader(TestStreamingChunksReader): + + def setUp(self): + self.chunks = [b'hello', b'world', b'12345'] + self.reader = SeekableStreamingChunksReader(iter(self.chunks), buffer_size=10) + + def test_interleaved_read_and_seek(self): + """Test alternating read and seek operations.""" + buffer = bytearray(5) + self.reader.readinto(buffer) + self.assertEqual(buffer, b'hello') + buffer[:] = b'xxxxx' + self.reader.seek(0) + self.assertEqual(self.reader.readinto(buffer), 5) + self.assertEqual(buffer, b'hello') + self.reader.seek(7) + n = self.reader.readinto(buffer) + assert 1 <= n <= len(buffer) + self.assertEqual(buffer[:n], b''.join(self.chunks)[7:7 + n]) + + def test_seek_and_tell(self): + """Test seeking to a position and confirming it with tell().""" + self.reader.seek(5) + self.assertEqual(self.reader.tell(), 5) + self.reader.seek(-2, io.SEEK_CUR) + self.assertEqual(self.reader.tell(), 3) + + def test_seek_out_of_bounds(self): + """Test seeking to a negative position, which should raise an IOError.""" + with self.assertRaises(IOError): + self.reader.seek(-1) + + +if __name__ == '__main__': + unittest.main() From 9ac9293065be1d1f82837b10c083d5d20f716dea Mon Sep 17 00:00:00 2001 From: David Eigen Date: Wed, 5 Feb 2025 19:45:04 -0500 Subject: [PATCH 18/22] fix name --- clarifai/runners/utils/video_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clarifai/runners/utils/video_utils.py b/clarifai/runners/utils/video_utils.py index 23227205..de0294cf 100644 --- a/clarifai/runners/utils/video_utils.py +++ b/clarifai/runners/utils/video_utils.py @@ -53,7 +53,7 @@ def stream_frames_from_bytes(bytes_iterator): :param bytes_iterator: An iterator that yields byte chunks with the video data """ - buffer = stream_utils.BufferStream(bytes_iterator) + buffer = stream_utils.StreamingChunksReader(bytes_iterator) reader = io.BufferedReader(buffer) container = av.open(reader) yield from container.decode(video=0) From 6568c048975d99636f4d139c6cadb41ec758cd98 Mon Sep 17 00:00:00 2001 From: David Eigen Date: Thu, 6 Feb 2025 12:46:44 -0500 Subject: [PATCH 19/22] fix model param updates --- clarifai/client/model.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/clarifai/client/model.py b/clarifai/client/model.py index 81674396..80f36a7c 100644 --- a/clarifai/client/model.py +++ b/clarifai/client/model.py @@ -1218,7 +1218,7 @@ def stream_by_video_file(self, def _get_model_info_for_inference(self, inference_params: Dict = {}, output_config: Dict = {}) -> None: - """Overrides the model version. + """Gets the model_info with modified inference params and output config. Args: inference_params (dict): The inference params to override. @@ -1228,13 +1228,11 @@ def _get_model_info_for_inference(self, inference_params: Dict = {}, select_concepts (list[Concept]): The concepts to select. sample_ms (int): The number of milliseconds to sample. """ - if not inference_params and not output_config: - return self.model_info - model_info = resources_pb2.Model() model_info.CopyFrom(self.model_info) - model_info.model_version.output_info.params.update(inference_params) - model_info.model_version.output_info.output_config.update(output_config) + model_info.model_version.output_info.params = inference_params + model_info.model_version.output_info.output_config.CopyFrom( + resources_pb2.OutputConfig(**output_config)) return model_info def _list_concepts(self) -> List[str]: From 87c60c7f5cf2914f7935ec5f7b063c8644f823ac Mon Sep 17 00:00:00 2001 From: David Eigen Date: Thu, 6 Feb 2025 13:00:51 -0500 Subject: [PATCH 20/22] fix unused line --- clarifai/client/model.py | 1 - 1 file changed, 1 deletion(-) diff --git a/clarifai/client/model.py b/clarifai/client/model.py index 80f36a7c..5e930737 100644 --- a/clarifai/client/model.py +++ b/clarifai/client/model.py @@ -713,7 +713,6 @@ def generate(self, runner_selector=runner_selector, model=model_info) request.model.model_version.id = self.model_version.id - request.model.model_version.params start_time = time.time() backoff_iterator = BackoffIterator(10) From ca6ab9ce72c1185be2c29335a1ed4534d5557fca Mon Sep 17 00:00:00 2001 From: David Eigen Date: Thu, 6 Feb 2025 13:27:03 -0500 Subject: [PATCH 21/22] fixes --- clarifai/client/model.py | 1 - clarifai/runners/models/model_servicer.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/clarifai/client/model.py b/clarifai/client/model.py index 5e930737..2413fe5b 100644 --- a/clarifai/client/model.py +++ b/clarifai/client/model.py @@ -712,7 +712,6 @@ def generate(self, inputs=inputs, runner_selector=runner_selector, model=model_info) - request.model.model_version.id = self.model_version.id start_time = time.time() backoff_iterator = BackoffIterator(10) diff --git a/clarifai/runners/models/model_servicer.py b/clarifai/runners/models/model_servicer.py index a32f6e3e..d6252517 100644 --- a/clarifai/runners/models/model_servicer.py +++ b/clarifai/runners/models/model_servicer.py @@ -58,7 +58,7 @@ def StreamModelOutputs(self, returns an output. """ try: - return self.model_class.stream(request) + return self.model_class.stream_wrapper(request) except Exception as e: yield service_pb2.MultiOutputResponse(status=status_pb2.Status( code=status_code_pb2.MODEL_PREDICTION_FAILED, From 1c5a334af23121b363e1c068330b05a9b44598ed Mon Sep 17 00:00:00 2001 From: David Eigen Date: Thu, 6 Feb 2025 13:35:36 -0500 Subject: [PATCH 22/22] fix order of ensure_urls_downloaded and parse_input_request --- clarifai/runners/models/base_typed_model.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/clarifai/runners/models/base_typed_model.py b/clarifai/runners/models/base_typed_model.py index a6ec3142..313d630a 100644 --- a/clarifai/runners/models/base_typed_model.py +++ b/clarifai/runners/models/base_typed_model.py @@ -57,9 +57,9 @@ def predict_wrapper( def generate_wrapper( self, request: PostModelOutputsRequest) -> Iterator[service_pb2.MultiOutputResponse]: - list_dict_input, inference_params = self.parse_input_request(request) if self.download_request_urls: ensure_urls_downloaded(request) + list_dict_input, inference_params = self.parse_input_request(request) outputs = self.generate(list_dict_input, inference_parameters=inference_params) for output in outputs: yield self.convert_output_to_proto(output) @@ -71,13 +71,13 @@ def _preprocess_stream( input_data, _ = self.parse_input_request(req) yield input_data - def stream_wrapper(self, request: Iterator[PostModelOutputsRequest] + def stream_wrapper(self, request_iterator: Iterator[PostModelOutputsRequest] ) -> Iterator[service_pb2.MultiOutputResponse]: - first_request = next(request) - _, inference_params = self.parse_input_request(first_request) - request_iterator = itertools.chain([first_request], request) if self.download_request_urls: request_iterator = readahead(map(ensure_urls_downloaded, request_iterator)) + first_request = next(request_iterator) + _, inference_params = self.parse_input_request(first_request) + request_iterator = itertools.chain([first_request], request_iterator) outputs = self.stream(self._preprocess_stream(request_iterator), inference_params) for output in outputs: yield self.convert_output_to_proto(output)