Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Background Thread Not Exiting on Pub/Sub Subscriber Shutdown, Causing ValueError: 'Cannot invoke RPC: Channel closed!' #1316

Open
HassanA01 opened this issue Jan 3, 2025 · 0 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.

Comments

@HassanA01
Copy link

Environment details

  • OS type and version: MacOS 15.1.1
  • Python version: 3.11.6
  • pip version: pip 24.0
  • google-cloud-pubsub: 2.27.1

Description

I'm encountering an issue where my FastAPI application does not terminate gracefully when shutting down the Pub/Sub subscriber. This results in lingering background threads and the following error:

ValueError('Cannot invoke RPC: Channel closed!')

This behavior aligns with the GitHub issue #747 related to incomplete shutdown handling in the google-cloud-pubsub library.

Steps to Reproduce

  1. Set Up a FastAPI Application with Pub/Sub Subscriber:

    • Initialize a FastAPI application.
    • Integrate a Pub/Sub subscriber using the google-cloud-pubsub library.
    • Implement startup and shutdown logic using an async context manager to manage the subscriber lifecycle.
  2. Run the Application:

    • Start the FastAPI server.
    • Ensure the Pub/Sub subscriber is actively listening to messages.
  3. Initiate Shutdown:

    • Terminate the application using Ctrl+C or another shutdown signal.
    • Observe the shutdown process and logs.
  4. Verify Shutdown Behavior:

    • Notice that the application does not terminate cleanly.
    • Check for lingering background threads and error logs indicating issues with shutting down the Pub/Sub subscriber.

Code Example

Below is a minimalistic example that reproduces the issue. This setup initializes a FastAPI application with a Pub/Sub subscriber and attempts to shut it down gracefully.

# main.py
from fastapi import FastAPI
import uvicorn
from contextlib import asynccontextmanager
import asyncio
from google.cloud import pubsub_v1
import concurrent.futures
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class PubSubSubscriber:
    def __init__(self, project_id: str, subscription_name: str):
        self.project_id = project_id
        self.subscription_name = subscription_name
        self.subscriber = pubsub_v1.SubscriberClient()
        self.streaming_pull_future = None

    def _get_subscription_path(self) -> str:
        return self.subscriber.subscription_path(self.project_id, self.subscription_name)

    async def _callback(self, message):
        """Asynchronous callback for processing messages."""
        try:
            data = message.data.decode('utf-8')
            logger.info(f"Received message: {data}")
            message.ack()
        except Exception as e:
            logger.error(f"Error processing message: {e}")
            message.nack()

    async def start(self):
        """Starts the Pub/Sub subscriber."""
        subscription_path = self._get_subscription_path()
        self.streaming_pull_future = self.subscriber.subscribe(
            subscription_path, callback=self._callback_wrapper
        )
        logger.info("Pub/Sub subscriber started.")
        await self._monitor_streaming_future()

    async def _callback_wrapper(self, message):
        """Wrapper to ensure the callback is asynchronous."""
        await self._callback(message)

    async def _monitor_streaming_future(self):
        """Monitors the streaming pull for errors."""
        loop = asyncio.get_running_loop()
        try:
            await loop.run_in_executor(None, self.streaming_pull_future.result)
        except Exception as e:
            logger.error(f"Streaming pull future threw an exception: {e}")
            self.stop()

    def stop(self):
        """Stops the Pub/Sub subscriber."""
        logger.info("Stopping Pub/Sub subscriber...")
        if self.streaming_pull_future:
            self.streaming_pull_future.cancel()
            try:
                self.streaming_pull_future.result(timeout=5.0)
            except concurrent.futures.TimeoutError:
                logger.warning("Timed out waiting for streaming pull future to terminate.")
            except Exception as e:
                logger.debug(f"Ignoring exception after cancel: {e}")
        self.subscriber.close()
        logger.info("Pub/Sub subscriber stopped.")

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Defines startup and shutdown events for the FastAPI application."""
    project_id = "your-google-project-id"
    subscription_name = "your-subscription-name"

    # Startup logic
    subscriber = PubSubSubscriber(project_id, subscription_name)
    app.state.pubsub_task = asyncio.create_task(subscriber.start())
    app.state.pubsub_instance = subscriber
    logger.info("Application startup complete.")
    
    yield  # Application is running

    # Shutdown logic
    logger.info("Shutting down Pub/Sub subscriber.")
    pubsub_instance = getattr(app.state, "pubsub_instance", None)
    pubsub_task = getattr(app.state, "pubsub_task", None)

    if pubsub_instance:
        try:
            pubsub_instance.stop()
        except Exception as e:
            logger.error(f"Error while stopping Pub/Sub subscriber: {e}")

    if pubsub_task:
        pubsub_task.cancel()
        try:
            await pubsub_task
        except asyncio.CancelledError:
            pass

    logger.info("Pub/Sub subscriber shutdown complete.")

app = FastAPI(title="MailFlowAI", lifespan=lifespan)

@app.get("/")
async def read_root():
    return {"Hello": "World"}

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8000)

Stack Trace

Upon initiating shutdown (e.g., pressing Ctrl+C), the following logs and errors are produced:

^CINFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:main:Shutting down Pub/Sub subscriber.
INFO:main:Stopping Pub/Sub subscriber...
WARNING:google.api_core.bidi:Background thread did not exit.
ERROR:root:Exception in callback <bound method ResumableBidiRpc._on_call_done of <google.api_core.bidi.ResumableBidiRpc object at 0x105a5af50>>: ValueError('Cannot invoke RPC: Channel closed!')
INFO:main:Pub/Sub subscriber stopped.
INFO:main:Pub/Sub subscriber shutdown complete.
INFO:     Application shutdown complete.
INFO:     Finished server process [7040]

  • Additional Context:
    • The issue appears to be linked to the google-cloud-pubsub library's handling of shutdown sequences, where the background gRPC threads do not terminate as expected, leading to ValueError exceptions and lingering threads that prevent the application from exiting gracefully.

Thank you for your time and assistance!


Notes for Maintainer

  • Replace Placeholder Values:
    • E.g. "your-google-project-id"and"your-subscription-name"` with your actual Google Cloud project ID and Pub/Sub subscription name in the code example.

Thanks!

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Jan 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.
Projects
None yet
Development

No branches or pull requests

2 participants