-
Notifications
You must be signed in to change notification settings - Fork 16.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
langchain[minor], community[minor], core[minor]: Async Cache support and AsyncRedisCache #15817
Changes from 14 commits
18150e4
83dff04
58256da
3244533
cbba991
c113166
d125f76
589c813
3da8707
088f6a1
6bfb3bc
168cf10
5c5a857
a7b4af4
68bf6db
91d4ed7
6c52107
51fecab
bde6216
8bcec4d
96f8067
8afbc75
9cd1d7c
66f4e49
7e8f604
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import logging | ||
import uuid | ||
import warnings | ||
from abc import ABC | ||
from datetime import timedelta | ||
from functools import lru_cache | ||
from typing import ( | ||
|
@@ -349,21 +350,69 @@ def clear(self, **kwargs: Any) -> None: | |
self.redis.flushdb(flush_type=asynchronous) | ||
|
||
|
||
class RedisCache(BaseCache): | ||
"""Cache that uses Redis as a backend.""" | ||
class RedisCacheBase(BaseCache, ABC): | ||
@staticmethod | ||
def _key(prompt: str, llm_string: str) -> str: | ||
"""Compute key from prompt and llm_string""" | ||
return _hash(prompt + llm_string) | ||
|
||
@staticmethod | ||
def _ensure_generation_type(return_val: RETURN_VAL_TYPE): | ||
for gen in return_val: | ||
if not isinstance(gen, Generation): | ||
raise ValueError( | ||
"RedisCache only supports caching of normal LLM generations, " | ||
f"got {type(gen)}" | ||
) | ||
|
||
@staticmethod | ||
def _get_generations(results: dict[str | bytes, str | bytes]) -> list[Generation]: | ||
generations = [] | ||
if results: | ||
for _, text in results.items(): | ||
try: | ||
generations.append(loads(text)) | ||
except Exception: | ||
logger.warning( | ||
"Retrieving a cache value that could not be deserialized " | ||
"properly. This is likely due to the cache being in an " | ||
"older format. Please recreate your cache to avoid this " | ||
"error." | ||
) | ||
# In a previous life we stored the raw text directly | ||
# in the table, so assume it's in that format. | ||
generations.append(Generation(text=text)) | ||
return generations if generations else None | ||
|
||
@staticmethod | ||
def _configure_pipeline_for_update(key, pipe, return_val, ttl=None): | ||
pipe.hset( | ||
key, | ||
mapping={ | ||
str(idx): dumps(generation) for idx, generation in enumerate(return_val) | ||
}, | ||
) | ||
if ttl is not None: | ||
pipe.expire(key, ttl) | ||
|
||
|
||
class RedisCache(RedisCacheBase): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An alternative design is to make the initializer able to accept both async and sync versions of the redis client. If its initialized with the async redis client, then it uses its async methods. If it gets initializes with the sync client, it delegates all the async calls to the sync ones (using the trick in the abstract class). What do you think would be better? A single There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @eyurtsev it was actually my original implementation, a single Cache class with ability to work with sync and async client. However @cbornet explicitly asked to change - break it out to two separate Redis Caches. I can see pros and cons in either approach. I think at this point, having two different opinions which both result in a fine working implementation means we can go either way. The maintenance burden of such split seems negligible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think even if such split turns out to be a mistake, we should be able to work around it in future by consolidating features in a single class and then using type alias for But I think its just fine as it is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry didn't notice this comment -- should've responded earlier! I touched based with @baskaryan, I think we'll try to go with a convention where both sync and async implementations live as close as possible to one another -- the thought is to just use that as the design pattern unless there's a compelling reason to do otherwise. The reason for the convention itself is:
The downside to this approach that I see is that the user needs to know that they can pass aredis client, which is more implicit than the approach with ARedisCache. cc @cbornet Apologies should've checked in before modifying the PR. Let me know if you're OK with this change or not. If so, I can merge. If there's still a strong feeling that we should have two implementations please let me know why, and I can revert the changes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. I get the arguments for the single class. Sorry for the pull in the wrong direction. |
||
""" | ||
Cache that uses Redis as a backend. Allows to use a sync `redis.Redis` client. | ||
""" | ||
|
||
def __init__(self, redis_: Any, *, ttl: Optional[int] = None): | ||
""" | ||
Initialize an instance of RedisCache. | ||
|
||
This method initializes an object with Redis caching capabilities. | ||
It takes a `redis_` parameter, which should be an instance of a Redis | ||
client class, allowing the object to interact with a Redis | ||
server for caching purposes. | ||
client class (`redis.Redis`), allowing the object | ||
to interact with a Redis server for caching purposes. | ||
|
||
Parameters: | ||
redis_ (Any): An instance of a Redis client class | ||
(e.g., redis.Redis) used for caching. | ||
(`redis.Redis`) to be used for caching. | ||
This allows the object to communicate with a | ||
Redis server for caching operations. | ||
ttl (int, optional): Time-to-live (TTL) for cached items in seconds. | ||
|
@@ -375,61 +424,27 @@ def __init__(self, redis_: Any, *, ttl: Optional[int] = None): | |
from redis import Redis | ||
except ImportError: | ||
raise ValueError( | ||
"Could not import redis python package. " | ||
"Could not import `redis` python package. " | ||
"Please install it with `pip install redis`." | ||
) | ||
if not isinstance(redis_, Redis): | ||
raise ValueError("Please pass in Redis object.") | ||
raise ValueError("Please pass a valid `redis.Redis` client.") | ||
self.redis = redis_ | ||
self.ttl = ttl | ||
|
||
def _key(self, prompt: str, llm_string: str) -> str: | ||
"""Compute key from prompt and llm_string""" | ||
return _hash(prompt + llm_string) | ||
|
||
def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]: | ||
"""Look up based on prompt and llm_string.""" | ||
generations = [] | ||
# Read from a Redis HASH | ||
results = self.redis.hgetall(self._key(prompt, llm_string)) | ||
if results: | ||
for _, text in results.items(): | ||
try: | ||
generations.append(loads(text)) | ||
except Exception: | ||
logger.warning( | ||
"Retrieving a cache value that could not be deserialized " | ||
"properly. This is likely due to the cache being in an " | ||
"older format. Please recreate your cache to avoid this " | ||
"error." | ||
) | ||
# In a previous life we stored the raw text directly | ||
# in the table, so assume it's in that format. | ||
generations.append(Generation(text=text)) | ||
return generations if generations else None | ||
return self._get_generations(results) | ||
|
||
def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None: | ||
"""Update cache based on prompt and llm_string.""" | ||
for gen in return_val: | ||
if not isinstance(gen, Generation): | ||
raise ValueError( | ||
"RedisCache only supports caching of normal LLM generations, " | ||
f"got {type(gen)}" | ||
) | ||
# Write to a Redis HASH | ||
self._ensure_generation_type(return_val) | ||
key = self._key(prompt, llm_string) | ||
|
||
with self.redis.pipeline() as pipe: | ||
pipe.hset( | ||
key, | ||
mapping={ | ||
str(idx): dumps(generation) | ||
for idx, generation in enumerate(return_val) | ||
}, | ||
) | ||
if self.ttl is not None: | ||
pipe.expire(key, self.ttl) | ||
|
||
self._configure_pipeline_for_update(key, pipe, return_val, self.ttl) | ||
pipe.execute() | ||
|
||
def clear(self, **kwargs: Any) -> None: | ||
|
@@ -438,6 +453,89 @@ def clear(self, **kwargs: Any) -> None: | |
self.redis.flushdb(asynchronous=asynchronous, **kwargs) | ||
|
||
|
||
class AsyncRedisCache(RedisCacheBase): | ||
""" | ||
Cache that uses Redis as a backend. Allows to use an | ||
async `redis.asyncio.Redis` client. | ||
""" | ||
|
||
def __init__(self, redis_: Any, *, ttl: Optional[int] = None): | ||
""" | ||
Initialize an instance of AsyncRedisCache. | ||
|
||
This method initializes an object with Redis caching capabilities. | ||
It takes a `redis_` parameter, which should be an instance of a Redis | ||
client class (`redis.asyncio.Redis`), allowing the object | ||
to interact with a Redis server for caching purposes. | ||
|
||
Parameters: | ||
redis_ (Any): An instance of a Redis client class | ||
(`redis.asyncio.Redis`) to be used for caching. | ||
This allows the object to communicate with a | ||
Redis server for caching operations. | ||
ttl (int, optional): Time-to-live (TTL) for cached items in seconds. | ||
If provided, it sets the time duration for how long cached | ||
items will remain valid. If not provided, cached items will not | ||
have an automatic expiration. | ||
""" | ||
try: | ||
from redis.asyncio import Redis | ||
except ImportError: | ||
raise ValueError( | ||
"Could not import `redis.asyncio` python package. " | ||
"Please install it with `pip install redis`." | ||
) | ||
if not isinstance(redis_, Redis): | ||
raise ValueError("Please pass a valid `redis.asyncio.Redis` client.") | ||
self.redis = redis_ | ||
self.ttl = ttl | ||
|
||
def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]: | ||
"""Look up based on prompt and llm_string.""" | ||
raise NotImplementedError( | ||
"This async Redis cache does not implement `lookup()` method. " | ||
"Consider using the async `alookup()` version." | ||
) | ||
|
||
async def alookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]: | ||
"""Look up based on prompt and llm_string. Async version.""" | ||
results = await self.redis.hgetall(self._key(prompt, llm_string)) | ||
return self._get_generations(results) | ||
|
||
def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None: | ||
"""Update cache based on prompt and llm_string.""" | ||
raise NotImplementedError( | ||
"This async Redis cache does not implement `update()` method. " | ||
"Consider using the async `aupdate()` version." | ||
) | ||
|
||
async def aupdate( | ||
self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE | ||
) -> None: | ||
"""Update cache based on prompt and llm_string. Async version.""" | ||
self._ensure_generation_type(return_val) | ||
key = self._key(prompt, llm_string) | ||
|
||
async with self.redis.pipeline() as pipe: | ||
self._configure_pipeline_for_update(key, pipe, return_val, self.ttl) | ||
await pipe.execute() | ||
|
||
def clear(self, **kwargs: Any) -> None: | ||
"""Clear cache. If `asynchronous` is True, flush asynchronously.""" | ||
raise NotImplementedError( | ||
"This async Redis cache does not implement `clear()` method. " | ||
"Consider using the async `aclear()` version." | ||
) | ||
|
||
async def aclear(self, **kwargs: Any) -> None: | ||
""" | ||
Clear cache. If `asynchronous` is True, flush asynchronously. | ||
Async version. | ||
""" | ||
asynchronous = kwargs.get("asynchronous", False) | ||
await self.redis.flushdb(asynchronous=asynchronous, **kwargs) | ||
|
||
|
||
class RedisSemanticCache(BaseCache): | ||
"""Cache that uses Redis as a vector-store backend.""" | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
from typing import Any, Optional, Sequence | ||
|
||
from langchain_core.outputs import Generation | ||
from langchain_core.runnables import run_in_executor | ||
|
||
RETURN_VAL_TYPE = Sequence[Generation] | ||
|
||
|
@@ -22,3 +23,17 @@ def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> N | |
@abstractmethod | ||
def clear(self, **kwargs: Any) -> None: | ||
"""Clear cache that can take additional keyword arguments.""" | ||
|
||
async def alookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An alternative would be to add these methods to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dzmitry-kankalovich We generally prefer keeping async version of methods in the same class as the sync versions, and have them provide a default async implementation that uses Would you mind merging into existing abstractions? Ideally a single PR that just modifies the core interface first, and then separately we can do a PR for any implementations There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As for the smaller PRs - yes, I can move slower and split up into several PRs, if you are happy with current direction. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generally better to split PRs by package to minimize potential dependency conflicts since the packages may have different release schedules |
||
"""Look up based on prompt and llm_string.""" | ||
return await run_in_executor(None, self.lookup, prompt, llm_string) | ||
|
||
async def aupdate( | ||
self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE | ||
) -> None: | ||
"""Update cache based on prompt and llm_string.""" | ||
return await run_in_executor(None, self.update, prompt, llm_string, return_val) | ||
|
||
async def aclear(self, **kwargs: Any) -> None: | ||
"""Clear cache that can take additional keyword arguments.""" | ||
return await run_in_executor(None, self.clear, **kwargs) |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note: shouldn't import this in langchain, imports here are only for backwards compatibility There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Noted! Maybe makes sense to drop a comment in that file next time you are working with it - future contributors will know. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we mark this as private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, can you show me the example? Something like
__RedisCacheBase
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looked at other examples in codebase and seems like the pattern is to use a single underscore, so I pushed
_RedisCacheBase
rename.