Skip to content

Commit

Permalink
Correctly close streams, better handle edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
sd2k committed Feb 14, 2025
1 parent f36666d commit 2ffc1de
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 40 deletions.
15 changes: 6 additions & 9 deletions src/mcp_grafana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import anyio
import uvicorn
from mcp.server import FastMCP
from starlette.requests import Request

from .tools import add_tools

Expand All @@ -18,14 +17,14 @@ class Transport(enum.StrEnum):
class GrafanaMCP(FastMCP):
async def run_http_async(self) -> None:
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.routing import Mount

from .transports.http import handle_message

async def handle_http(request: Request):
async with handle_message(
request.scope, request.receive, request._send
) as (
async def handle_http(scope, receive, send):
if scope["type"] != "http":
raise ValueError("Expected HTTP request")
async with handle_message(scope, receive, send) as (
read_stream,
write_stream,
):
Expand All @@ -37,9 +36,7 @@ async def handle_http(request: Request):

starlette_app = Starlette(
debug=self.settings.debug,
routes=[
Route("/mcp", endpoint=handle_http, methods=["POST"]),
],
routes=[Mount("/", app=handle_http)],
)

config = uvicorn.Config(
Expand Down
78 changes: 47 additions & 31 deletions src/mcp_grafana/transports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,39 +113,55 @@ async def handle_message(scope: Scope, receive: Receive, send: Send):
read_stream, read_stream_writer, write_stream, write_stream_reader = make_streams()

async def handle_post_message():
request = Request(scope, receive)
try:
json = await request.json()
except JSONDecodeError as err:
logger.error(f"Failed to parse message: {err}")
response = Response("Could not parse message", status_code=400)
await response(scope, receive, send)
return
try:
client_message = types.JSONRPCMessage.model_validate(json)
logger.debug(f"Validated client message: {client_message}")
except ValidationError as err:
logger.error(f"Failed to parse message: {err}")
response = Response("Could not parse message", status_code=400)
request = Request(scope, receive)
if request.method != "POST":
response = Response("Method not allowed", status_code=405)
await response(scope, receive, send)
return
if scope["path"] != "/mcp":
response = Response("Not found", status_code=404)
await response(scope, receive, send)
return
try:
json = await request.json()
except JSONDecodeError as err:
logger.error(f"Failed to parse message: {err}")
response = Response("Could not parse message", status_code=400)
await response(scope, receive, send)
return

try:
client_message = types.JSONRPCMessage.model_validate(json)
logger.debug(f"Validated client message: {client_message}")
except ValidationError as err:
logger.error(f"Failed to parse message: {err}")
response = Response("Could not parse message", status_code=400)
await response(scope, receive, send)
return

# As part of the MCP spec we need to initialize first.
# In a stateful flow (e.g. stdio or sse transports) the client would
# send an initialize request to the server, and the server would send
# a response back to the client. In this case we're trying to be stateless,
# so we'll handle the initialization ourselves.
logger.debug("Initializing server")
await initialize(read_stream_writer, write_stream_reader)

# Alright, now we can send the client message.
logger.debug("Sending client message")
await read_stream_writer.send(client_message)

# Wait for the server's response, and forward it to the client.
server_message = await write_stream_reader.receive()
obj = server_message.model_dump(
by_alias=True, mode="json", exclude_none=True
)
response = JSONResponse(obj)
await response(scope, receive, send)
return

# As part of the MCP spec we need to initialize first.
# In a stateful flow (e.g. stdio or sse transports) the client would
# send an initialize request to the server, and the server would send
# a response back to the client. In this case we're trying to be stateless,
# so we'll handle the initialization ourselves.
logger.debug("Initializing server")
await initialize(read_stream_writer, write_stream_reader)

# Alright, now we can send the client message.
logger.debug("Sending client message")
await read_stream_writer.send(client_message)
# Wait for the server's response, and forward it to the client.
server_message = await write_stream_reader.receive()
obj = server_message.model_dump(by_alias=True, mode="json", exclude_none=True)
response = JSONResponse(obj)
await response(scope, receive, send)
finally:
await read_stream_writer.aclose()
await write_stream_reader.aclose()

async with anyio.create_task_group() as tg:
tg.start_soon(handle_post_message)
Expand Down

0 comments on commit 2ffc1de

Please sign in to comment.