Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ollama client (from adalflow.components.model_client.ollama_client) does not work with stream=True #299

Open
debasisdwivedy opened this issue Dec 13, 2024 · 2 comments · May be fixed by #309
Labels
bug Something isn't working, either in /adalflow, /tutorials, or /use cases...

Comments

@debasisdwivedy
Copy link

debasisdwivedy commented Dec 13, 2024

Bug description

The application does not work with stream set to True. The class adalflow.components.model_client.ollama_client has the method todo stream input as below:

def parse_stream_response(completion: GeneratorType) -> Any:
    """Parse the completion to a str. We use the generate with prompt instead of chat with messages."""
    for chunk in completion:
        log.debug(f"Raw chunk: {chunk}")
        raw_response = chunk["response"] if "response" in chunk else None
        yield GeneratorOutput(data=None, raw_response=raw_response)


def parse_chat_completion(
        self, completion: Union[GenerateResponse, GeneratorType]
    ) -> GeneratorOutput:
        """Parse the completion to a str. We use the generate with prompt instead of chat with messages."""
        log.debug(f"completion: {completion}, {isinstance(completion, GeneratorType)}")
        if isinstance(completion, GeneratorType):  # streaming
            return parse_stream_response(completion)
        else:
            return parse_generate_response(completion)

The yield method would require a loop to get all the tokens. Is there a reason to use yield instead of return.

There are two ways to go about solving this:

SOLUTION 1
Change yield to return.

def parse_stream_response(completion: GeneratorType) -> Any:
    """Parse the completion to a str. We use the generate with prompt instead of chat with messages."""
    gen_output = GeneratorOutput(data=None, raw_response='')
    for chunk in completion:
        log.debug(f"Raw chunk: {chunk}")
        raw_response = chunk["response"] if "response" in chunk else None
        gen_output.raw_response += token
    return gen_output 
def parse_chat_completion(
        self, completion: Union[GenerateResponse, GeneratorType]
    ) -> GeneratorOutput:
        """Parse the completion to a str. We use the generate with prompt instead of chat with messages."""
        log.debug(f"completion: {completion}, {isinstance(completion, GeneratorType)}")
        if isinstance(completion, GeneratorType):  # streaming
            return parse_stream_response(completion)
        else:
            return parse_generate_response(completion)

SOLUTION 2

Change method parse_chat_completion to get all the token and then return the GeneratorOutput

def parse_stream_response(completion: GeneratorType) -> Any:
    """Parse the completion to a str. We use the generate with prompt instead of chat with messages."""
    for chunk in completion:
        log.debug(f"Raw chunk: {chunk}")
        raw_response = chunk["response"] if "response" in chunk else None
        yield raw_response 
def parse_chat_completion(
        self, completion: Union[GenerateResponse, GeneratorType]
    ) -> GeneratorOutput:
        """Parse the completion to a str. We use the generate with prompt instead of chat with messages."""
        log.debug(f"completion: {completion}, {isinstance(completion, GeneratorType)}")
        if isinstance(completion, GeneratorType):  # streaming
            gen_output = GeneratorOutput(data=None, raw_response='')
            tokens = parse_stream_response(completion)

            for token in tokens:
                gen_output.raw_response += token
            return gen_output
        else:
            return parse_generate_response(completion)

One thing to remember is that for async implementation we have to create async_parse_chat_completion as the method parse_chat_completion would not work for asynchronous calls.

@liyin2015 Once reviewed and verified that this is an issue i would go ahed and raise a PR for the implementation.

Regards,

What version are you seeing the problem on?

pip installed.

To get the version:

 show adalflow

Name: adalflow
Version: 0.2.6
Summary: The Library to Build and Auto-optimize LLM Applications
Home-page: https://github.com/SylphAI-Inc/AdalFlow
Author: Li Yin
Author-email: li@sylphai.com
License: MIT
Location: <>
Requires: backoff, boto3, botocore, colorama, diskcache, jinja2, jsonlines, nest-asyncio, numpy, python-dotenv, pyyaml, tiktoken, tqdm
Required-by:

How to reproduce the bug

from adalflow.components.model_client.ollama_client import OllamaClient
from adalflow.core.generator import Generator

host = "127.0.0.1:11434"


ollama_ai = {
        "model_client": OllamaClient(host=host),
        "model_kwargs": {
            "model": "phi3:latest",
            "stream": True,
        },
    }

generator = Generator(**ollama_ai)
output = generator({"input_str": "What is the capital of France?"})
print(output)


### Error messages and logs

Error processing the output: 'generator' object has no attribute 'raw_response'
GeneratorOutput(id=None, data=None, error="'generator' object has no attribute 'raw_response'", usage=None, raw_response='<generator object Client._request.<locals>.inner at 0x12aa131b0>', metadata=None)



### Environment

- OS: [e.g., Linux, Windows, macOS]
macOS - M1 pro
Version OS -  15.1

### More info

_No response_
@debasisdwivedy debasisdwivedy added the bug Something isn't working, either in /adalflow, /tutorials, or /use cases... label Dec 13, 2024
@BalasubramanyamEvani
Copy link

BalasubramanyamEvani commented Dec 13, 2024

I can work on this @liyin2015 . I think changing yield to return or changing parse_chat_completion to get all tokens might be conflicting to what stream is supposed to do.

After going through the code, I think the way adalflow is processing it in the adalflow::core::generator.py _post_call might be the issue.

I made the following changes that seems to be solve it. Let me know if this seems good, I can open up a PR.

def _post_call(self, completion: Any) -> GeneratorOutput:
        r"""Get string completion and process it with the output_processors."""
        # parse chat completion will only fill the raw_response
        log.debug("in post call")
        output: GeneratorOutput = self.model_client.parse_chat_completion(completion)

        if isinstance(output, types.GeneratorType):
            def processed_generator():
                """Process each chunk dynamically."""
                try:
                    for raw_output in output:
                        log.debug(f"Processing raw chunk: {raw_output.raw_response}")
                        processed_chunk = raw_output.raw_response
                        if self.output_processors and processed_chunk:
                            try:
                                processed_chunk = self.output_processors(processed_chunk)
                            except Exception as e:
                                log.error(f"Error processing the output processors: {e}")
                                yield GeneratorOutput(
                                    data=None,
                                    raw_response=raw_output.raw_response,
                                    error=str(e),
                                )
                                continue
                        yield GeneratorOutput(
                            data=processed_chunk,
                            raw_response=raw_output.raw_response,
                            error=None,
                        )
                except Exception as e:
                    log.error(f"Error while streaming processed chunks: {e}")
                    yield GeneratorOutput(error=str(e), raw_response=None)

            # Return a new GeneratorOutput with the processed generator
            return GeneratorOutput(data=processed_generator(), raw_response=output)

        # Now adding the data filed to the output
        data = output.raw_response
        if self.output_processors:
            if data:
                try:
                    data = self.output_processors(data)
                    output.data = data
                except Exception as e:
                    log.error(f"Error processing the output processors: {e}")
                    output.error = str(e)
        else:
            output.data = data
        return output

I tested it with the following

ollama_ai = {
        "model_client": OllamaClient(host=host),
        "model_kwargs": {
            "model": "phi3:latest",
            "stream": True,
        },
    }

generator = Generator(**ollama_ai)
output = generator({"input_str": "What is the capital of France?"})
for chunk in output.data:
  print(chunk.data, end="", flush=True)

# for stream: False
# print(output.data)

@debasisdwivedy
Copy link
Author

Thanks @BalasubramanyamEvani .

Added the else clause to the if to make it clearer. Code below:

from typing import Any, Dict, Optional, Union, Callable, Tuple, List,Generator as GeneratorType

def _post_call(self, completion: Any) -> GeneratorOutput:
        r"""Get string completion and process it with the output_processors."""
        # parse chat completion will only fill the raw_response
        output: GeneratorOutput = self.model_client.parse_chat_completion(completion)
        if isinstance(output, GeneratorType):
            def processed_generator():
                """Process each chunk dynamically."""
                try:
                    for raw_output in output:
                        log.debug(f"Processing raw chunk: {raw_output.raw_response}")
                        processed_chunk = raw_output.raw_response
                        if self.output_processors and processed_chunk:
                            try:
                                processed_chunk = self.output_processors(processed_chunk)
                            except Exception as e:
                                log.error(f"Error processing the output processors: {e}")
                                yield GeneratorOutput(
                                    data=None,
                                    raw_response=raw_output.raw_response,
                                    error=str(e),
                                )
                                continue
                        yield GeneratorOutput(
                            data=processed_chunk,
                            raw_response=raw_output.raw_response,
                            error=None,
                        )
                except Exception as e:
                    log.error(f"Error while streaming processed chunks: {e}")
                    yield GeneratorOutput(error=str(e), raw_response=None)
            return GeneratorOutput(data=processed_generator(), raw_response=output)
        else:
            # Now adding the data filed to the output
            data = output.raw_response
            if self.output_processors:
                if data:
                    try:
                        data = self.output_processors(data)
                        output.data = data
                    except Exception as e:
                        log.error(f"Error processing the output processors: {e}")
                        output.error = str(e)

            else:
                output.data = data
                
            return output

I did a round of testing and it works.

Regards,

@BalasubramanyamEvani BalasubramanyamEvani linked a pull request Dec 18, 2024 that will close this issue
7 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working, either in /adalflow, /tutorials, or /use cases...
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants