-
Notifications
You must be signed in to change notification settings - Fork 206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Asyncio coroutines for common tasks #389
Comments
@mbrancato So essentially the request is to offer the same functionality the existing (sync) client offers, but in async/await flavor? I'll forward it, but I suspect this will require substantial amount of work to match the current client's features. |
That's basically it @plamut. It would prevent needing to wrap some of the pubsub calls in things like In case anyone needs a solution, a good example of what works now is here: https://github.com/allenporter/python-google-nest-sdm/blob/cb2dc1bb5c61284e6f489f9e8933dfa758645196/google_nest_sdm/google_nest_subscriber.py#L49 |
@mbrancato I brought this up at a meeting, and it's quite a common request in other client libraries, too. This feature is being considered, but it's currently in exploratory phase, e.g. how to approach not to duplicate the hand-written logic too much, etc., as the amount of work required can be substantial. As such, there's no ETA at the moment, but I'll keep this request open for visibility, and for any future updates. |
I understand that there is no ETA on this specific feature, but has there been any thoughts/plan about how approach the problem in general? |
We do want to support keeping up with new language features like async/await. However, we're also careful to keep the API surface similar across languages, so we need to co-ordinate between languages to do this. This is something we will look at for a 2.0 API. |
Hello, until a v2.0 API is implemented, is it possible to provide some examples on how to combine the library with projects that use other async libraries? For instance, in our FastAPI based project we need to implement consumers that store data on a mongo db database using an async mongo library (AsyncIOMotorClient). I believe that this is a common use case. Is this something that can be implemented in a thread safe way, using asyncio and functions like run_until_complete? We can find some workarounds but we don't feel confident that these workarounds are safe since they combine asyncio with the internal threading mechanism of the library. Do you have any recommendations or examples? |
+1 for this feature request |
As there is now a v2.0 version of this library, is an Async-flavour interface now implemented? The documentation on 2.0 is very thin on the ground... |
Almost end of 2023 but still no updates 😢 +999 for a feature |
@NickNaskida I am not a maintainer of this library anymore (since the start of 2022), and thus cannot say much about its current state or its roadmap, I'm afraid. |
@NickNaskida This may be helpful, but here is an example of using the async publisher. It may not be supported by Google to use these directly. import asyncio
import logging
from typing import MutableSequence
from google.pubsub_v1 import PubsubMessage
from google.pubsub_v1.services.publisher.async_client import PublisherAsyncClient
from google.pubsub_v1.services.publisher.transports import PublisherGrpcAsyncIOTransport
from grpc import ChannelConnectivity
async def run():
publisher_client = PublisherAsyncClient(transport="grpc_asyncio")
transport = publisher_client.transport
if isinstance(transport, PublisherGrpcAsyncIOTransport):
await transport.grpc_channel.channel_ready()
chan = transport.grpc_channel.get_state(try_to_connect=True)
if chan != ChannelConnectivity.READY:
logging.error("Channel is not ready")
return
messages = [
PubsubMessage(
{
"data": b"FOO",
"attributes": {
"message_type": "test",
"customer": "acme",
},
}
),
PubsubMessage(
{
"data": b"BAR",
"attributes": {
"message_type": "test",
"customer": "acme",
},
}
),
]
if await publish_pubsub(messages, publisher_client):
logging.info("Published messages")
await publisher_client.transport.close()
async def publish_pubsub(
messages: MutableSequence[PubsubMessage],
client: PublisherAsyncClient,
) -> bool:
topic = client.topic_path("my_project", "my_topic")
try:
resp = await client.publish(topic=topic, messages=messages)
if len(resp.message_ids) != len(messages):
logging.error("Failed to publish some messages to Pub/Sub")
return False
except asyncio.TimeoutError:
logging.warning("Timeout pushing event to Pub/Sub")
return False
else:
return True |
In case it's helpful to anyone else, I ultimately decided to use the synchronous PublisherClient in my FastAPI service and convert the future to an awaitable via |
The current status of asyncio has mostly been answered here already: #218 (comment)
This feature request is to ask for a formal exposure of high-level awaitable methods for interacting with the PubSub library and reducing the use of threads. Although, if the library still wants to manage threads, that seems fine as long as how we use the library would be awaitable and the callback are run in the same event loop. I don't want to be too prescriptive on the implementation in my request.
Some examples, assuming
SubscriberClient
andPublisherClient
were already setup...example subscribe:
example publish:
The text was updated successfully, but these errors were encountered: