From 081ccf06ad4ad8c0e4b00fddfd9d7717d0cadeec Mon Sep 17 00:00:00 2001 From: Magd Bayoumi Date: Thu, 29 Jan 2026 05:59:27 -0500 Subject: [PATCH 01/15] fix: handle http2 goaway race conditions --- httpcore/__init__.py | 2 + httpcore/_async/connection_pool.py | 27 ++++++- httpcore/_async/http2.py | 124 +++++++++++++++++++++++++--- httpcore/_exceptions.py | 78 ++++++++++++++++++ httpcore/_sync/connection_pool.py | 27 ++++++- httpcore/_sync/http2.py | 125 ++++++++++++++++++++++++++--- 6 files changed, 357 insertions(+), 26 deletions(-) diff --git a/httpcore/__init__.py b/httpcore/__init__.py index 9a92dc4a..d45791e5 100644 --- a/httpcore/__init__.py +++ b/httpcore/__init__.py @@ -19,6 +19,7 @@ from ._backends.sync import SyncBackend from ._exceptions import ( ConnectError, + ConnectionGoingAway, ConnectionNotAvailable, ConnectTimeout, LocalProtocolError, @@ -114,6 +115,7 @@ def __init__(self, *args, **kwargs): # type: ignore "default_ssl_context", "SOCKET_OPTION", # exceptions + "ConnectionGoingAway", "ConnectionNotAvailable", "ProxyError", "ProtocolError", diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 5ef74e64..bd41868d 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -7,7 +7,12 @@ from .._backends.auto import AutoBackend from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend -from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol +from .._exceptions import ( + ConnectionGoingAway, + ConnectionNotAvailable, + RemoteProtocolError, + UnsupportedProtocol, +) from .._models import Origin, Proxy, Request, Response from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock from .connection import AsyncHTTPConnection @@ -242,6 +247,26 @@ async def handle_async_request(self, request: Request) -> Response: # # In this case we clear the connection and try again. pool_request.clear_connection() + except ConnectionGoingAway as exc: + # GOAWAY frame recieved during request processing. + # Determine if we can safely retry based on RFC 7540 semantics. + pool_request.clear_connection() + + if exc.is_safe_to_retry: + # stream_id > last_stream_id: guaranteed unprocessed per RFC 7540 + # Safe to retry on a new connection + continue + elif exc.is_graceful_shutdown and not exc.may_have_side_effects: + # Graceful shutdown and headers weren't sent yet. + # Likely safe to retry. + continue + else: + # Request may have been processed. Propagate error with context so application can decide whether to retry. + msg = ( + "GOAWAY recieved: request may have been processed" + ) + # QUESTION: What is the best way to propagate the context for the applications? + raise RemoteProtocolError(msg) from exc else: break # pragma: nocover diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index dbd0beeb..7b989c60 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -14,6 +14,7 @@ from .._backends.base import AsyncNetworkStream from .._exceptions import ( + ConnectionGoingAway, ConnectionNotAvailable, LocalProtocolError, RemoteProtocolError, @@ -36,7 +37,8 @@ def has_body_headers(request: Request) -> bool: class HTTPConnectionState(enum.IntEnum): ACTIVE = 1 IDLE = 2 - CLOSED = 3 + DRAINING = 3 + CLOSED = 4 class AsyncHTTP2Connection(AsyncConnectionInterface): @@ -82,6 +84,11 @@ def __init__( self._read_exception: Exception | None = None self._write_exception: Exception | None = None + # Track request phases for GOAWAY retry safety determination. + # Maps stream_id -> {"headers_sent": bool, "body_sent": bool} + # TODO: Consider shifting this to a dataclass or typeddict + self._stream_requests: dict[int, dict[str, bool]] = {} + async def handle_async_request(self, request: Request) -> Response: if not self.can_handle_request(request.url.origin): # This cannot occur in normal operation, since the connection pool @@ -133,6 +140,8 @@ async def handle_async_request(self, request: Request) -> Response: try: stream_id = self._h2_state.get_next_available_stream_id() self._events[stream_id] = [] + # Initialize phase tracking for this stream + self._stream_requests[stream_id] = {"headers_sent": False, "body_sent": False} except h2.exceptions.NoAvailableStreamIDError: # pragma: nocover self._used_all_stream_ids = True self._request_count -= 1 @@ -142,8 +151,10 @@ async def handle_async_request(self, request: Request) -> Response: kwargs = {"request": request, "stream_id": stream_id} async with Trace("send_request_headers", logger, request, kwargs): await self._send_request_headers(request=request, stream_id=stream_id) + self._stream_requests[stream_id]["headers_sent"] = True async with Trace("send_request_body", logger, request, kwargs): await self._send_request_body(request=request, stream_id=stream_id) + self._stream_requests[stream_id]["body_sent"] = True async with Trace( "receive_response_headers", logger, request, kwargs ) as trace: @@ -177,9 +188,35 @@ async def handle_async_request(self, request: Request) -> Response: # a protocol error at any point they interact with the 'h2_state'. # # In this case we'll have stored the event, and should raise - # it as a RemoteProtocolError. + # it as a ConnectionGoingAway if applicable, or RemoteProtocolError. if self._connection_terminated: # pragma: nocover - raise RemoteProtocolError(self._connection_terminated) + phase = self._stream_requests.get( + stream_id, {"headers_sent": False, "body_sent": False}, + ) + raise ConnectionGoingAway( + self._connection_terminated, + last_stream_id=self._connection_terminated.last_stream_id, + error_code=self._connection_terminated.error_code, + request_stream_id=stream_id, + headers_sent=phase["headers_sent"], + body_sent=phase["body_sent"], + ) + # Check if h2 is in CLOSED state due to GOAWAY. This can happen when + # GOAWAY was recieved but we haven't processed the event yet (race condition). + if self._h2_state.state_machine.state == h2.connection.ConnectionState.CLOSED: + phase = self._stream_requests.get( + stream_id, {"headers_sent": False, "body_sent": False}, + ) + msg = f"Connection closed: {exc}" + raise ConnectionGoingAway( + msg, + last_stream_id=stream_id, # Conservative: assume this stream may have been processed + error_code=0, # Assume graceful shutdown + request_stream_id=stream_id, + headers_sent=phase["headers_sent"], + body_sent=phase["body_sent"], + ) + # If h2 raises a protocol error in some other state then we # must somehow have made a protocol violation. raise LocalProtocolError(exc) # pragma: nocover @@ -349,10 +386,33 @@ async def _receive_events( async with self._read_lock: if self._connection_terminated is not None: last_stream_id = self._connection_terminated.last_stream_id - if stream_id and last_stream_id and stream_id > last_stream_id: - self._request_count -= 1 - raise ConnectionNotAvailable() - raise RemoteProtocolError(self._connection_terminated) + if stream_id is not None: + phase = self._stream_requests.get( + stream_id, {"headers_sent": False, "body_sent": False} + ) + if last_stream_id is not None and stream_id > last_stream_id: + # stream_id > last_stream_id: guaranteed unprocessed, safe to retry + self._request_count -= 1 + raise ConnectionGoingAway( + f"GOAWAY received: stream {stream_id} > last_stream_id {last_stream_id}", + last_stream_id=last_stream_id, + error_code=self._connection_terminated.error_code, + request_stream_id=stream_id, + headers_sent=phase["headers_sent"], + body_sent=phase["body_sent"], + ) + if self._state != HTTPConnectionState.DRAINING: + # stream_id <= last_stream_id: may have been processed + raise ConnectionGoingAway( + f"GOAWAY received: stream {stream_id} <= last_stream_id {last_stream_id}", + last_stream_id=last_stream_id if last_stream_id is not None else 0, + error_code=self._connection_terminated.error_code, + request_stream_id=stream_id, + headers_sent=phase["headers_sent"], + body_sent=phase["body_sent"], + ) + else: + raise RemoteProtocolError(self._connection_terminated) # This conditional is a bit icky. We don't want to block reading if we've # actually got an event to return for a given stream. We need to do that @@ -361,7 +421,7 @@ async def _receive_events( # block until we've available flow control, event when we have events # pending for the stream ID we're attempting to send on. if stream_id is None or not self._events.get(stream_id): - events = await self._read_incoming_data(request) + events = await self._read_incoming_data(request, stream_id) for event in events: if isinstance(event, h2.events.RemoteSettingsChanged): async with Trace( @@ -384,6 +444,13 @@ async def _receive_events( elif isinstance(event, h2.events.ConnectionTerminated): self._connection_terminated = event + # Transition to DRAINING on graceful shutdown (NO_ERROR), + # allowing in-flight streams to complete. + # Non-graceful shutdown closes immediately. + if event.error_code == 0: + self._state = HTTPConnectionState.DRAINING + else: + self._state = HTTPConnectionState.CLOSED await self._write_outgoing_data(request) @@ -409,6 +476,7 @@ async def _receive_remote_settings_change( async def _response_closed(self, stream_id: int) -> None: await self._max_streams_semaphore.release() del self._events[stream_id] + self._stream_requests.pop(stream_id, None) # Clean up phase tracking async with self._state_lock: if self._connection_terminated and not self._events: await self.aclose() @@ -430,7 +498,9 @@ async def aclose(self) -> None: # Wrappers around network read/write operations... - async def _read_incoming_data(self, request: Request) -> list[h2.events.Event]: + async def _read_incoming_data( + self, request: Request, stream_id: int | None = None + ) -> list[h2.events.Event]: timeouts = request.extensions.get("timeout", {}) timeout = timeouts.get("read", None) @@ -440,7 +510,32 @@ async def _read_incoming_data(self, request: Request) -> list[h2.events.Event]: try: data = await self._network_stream.read(self.READ_NUM_BYTES, timeout) if data == b"": - raise RemoteProtocolError("Server disconnected") + # Server disconnected. Check if this is related to GOAWAY. + if stream_id is not None: + phase = self._stream_requests.get( + stream_id, {"headers_sent": False, "body_sent": False} + ) + # If we have a GOAWAY recorded, this disconnect is GOAWAY-related + if self._connection_terminated is not None: + last_stream_id = self._connection_terminated.last_stream_id + raise ConnectionGoingAway( + "Server disconnected after GOAWAY", + last_stream_id=last_stream_id if last_stream_id else 0, + error_code=self._connection_terminated.error_code, + request_stream_id=stream_id, + headers_sent=phase["headers_sent"], + body_sent=phase["body_sent"], + ) + # Check if h2 is in CLOSED state (GOAWAY received but not processed) + if self._h2_state.state_machine.state == h2.connection.ConnectionState.CLOSED: + raise ConnectionGoingAway( + "Server disconnected (connection closed)", + last_stream_id=stream_id, # Conservative + error_code=0, # Assume graceful + request_stream_id=stream_id, + headers_sent=phase["headers_sent"], + body_sent=phase["body_sent"], + ) except Exception as exc: # If we get a network error we should: # @@ -510,7 +605,7 @@ def can_handle_request(self, origin: Origin) -> bool: def is_available(self) -> bool: return ( - self._state != HTTPConnectionState.CLOSED + self._state not in (HTTPConnectionState.DRAINING, HTTPConnectionState.CLOSED) and not self._connection_error and not self._used_all_stream_ids and not ( @@ -521,7 +616,12 @@ def is_available(self) -> bool: def has_expired(self) -> bool: now = time.monotonic() - return self._expire_at is not None and now > self._expire_at + keepalive_expired = self._expire_at is not None and now > self._expire_at + # Draining connections with no active streams are considered expired + draining_complete = ( + self._state == HTTPConnectionState.DRAINING and not self._events + ) + return keepalive_expired or draining_complete def is_idle(self) -> bool: return self._state == HTTPConnectionState.IDLE diff --git a/httpcore/_exceptions.py b/httpcore/_exceptions.py index bc28d44f..7d3fcf9d 100644 --- a/httpcore/_exceptions.py +++ b/httpcore/_exceptions.py @@ -19,6 +19,84 @@ class ConnectionNotAvailable(Exception): pass +class ConnectionGoingAway(ConnectionNotAvailable): + """ + Raised when a GOAWAY frame is received during HTTP/2 request processing. + + This exception provides context for determining whether a request is safe + to retry, based on RFC 7540 Section 6.8 semantics. + + Per RFC 7540: streams with IDs > last_stream_id are guaranteed unprocessed + and safe to retry. Streams with IDs <= last_stream_id may have been processed. + + Attributes: + last_stream_id: The highest stream ID the server may have processed. + error_code: The GOAWAY error code (0 = NO_ERROR for graceful shutdown). + request_stream_id: The stream ID assigned to this specific request. + headers_sent: Whether request headers were transmitted before GOAWAY. + body_sent: Whether request body was transmitted before GOAWAY. + """ + + def __init__( + self, + message: str, + *, + last_stream_id: int, + error_code: int, + request_stream_id: int, + headers_sent: bool = False, + body_sent: bool = False, + ) -> None: + super().__init__(message) + self.last_stream_id = last_stream_id + self.error_code = error_code + self.request_stream_id = request_stream_id + self.headers_sent = headers_sent + self.body_sent = body_sent + + @property + def is_safe_to_retry(self) -> bool: + """ + Returns True if the request is guaranteed unprocessed and safe to retry. + + Per RFC 7540 Section 6.8: any stream with ID > last_stream_id was never + seen by the server and can be safely retried. + """ + return self.request_stream_id > self.last_stream_id + + @property + def is_graceful_shutdown(self) -> bool: + """ + Returns True if this is a graceful shutdown (NO_ERROR). + + NO_ERROR (0x0) indicates administrative shutdown such as server restart, + connection limit reached, or idle timeout. + """ + return self.error_code == 0 + + @property + def may_have_side_effects(self) -> bool: + """ + Returns True if the request may have been processed by the server. + + If stream_id > last_stream_id: guaranteed no side effects (unprocessed). + If stream_id <= last_stream_id AND (headers or body sent): possibly processed. + """ + if self.request_stream_id > self.last_stream_id: + return False # Guaranteed unprocessed per RFC 7540 + return self.headers_sent or self.body_sent + + def __repr__(self) -> str: + return ( + f"ConnectionGoingAway(" + f"last_stream_id={self.last_stream_id}, " + f"error_code={self.error_code}, " + f"request_stream_id={self.request_stream_id}, " + f"is_safe_to_retry={self.is_safe_to_retry}, " + f"is_graceful_shutdown={self.is_graceful_shutdown})" + ) + + class ProxyError(Exception): pass diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 4b26f9c6..8217218a 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -7,7 +7,12 @@ from .._backends.sync import SyncBackend from .._backends.base import SOCKET_OPTION, NetworkBackend -from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol +from .._exceptions import ( + ConnectionGoingAway, + ConnectionNotAvailable, + RemoteProtocolError, + UnsupportedProtocol, +) from .._models import Origin, Proxy, Request, Response from .._synchronization import Event, ShieldCancellation, ThreadLock from .connection import HTTPConnection @@ -242,6 +247,26 @@ def handle_request(self, request: Request) -> Response: # # In this case we clear the connection and try again. pool_request.clear_connection() + except ConnectionGoingAway as exc: + # GOAWAY frame recieved during request processing. + # Determine if we can safely retry based on RFC 7540 semantics. + pool_request.clear_connection() + + if exc.is_safe_to_retry: + # stream_id > last_stream_id: guaranteed unprocessed per RFC 7540 + # Safe to retry on a new connection + continue + elif exc.is_graceful_shutdown and not exc.may_have_side_effects: + # Graceful shutdown and headers weren't sent yet. + # Likely safe to retry. + continue + else: + # Request may have been processed. Propagate error with context so application can decide whether to retry. + msg = ( + "GOAWAY recieved: request may have been processed" + ) + # QUESTION: What is the best way to propagate the context for the applications? + raise RemoteProtocolError(msg) from exc else: break # pragma: nocover diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index ddcc1890..e7408608 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -14,6 +14,7 @@ from .._backends.base import NetworkStream from .._exceptions import ( + ConnectionGoingAway, ConnectionNotAvailable, LocalProtocolError, RemoteProtocolError, @@ -36,7 +37,8 @@ def has_body_headers(request: Request) -> bool: class HTTPConnectionState(enum.IntEnum): ACTIVE = 1 IDLE = 2 - CLOSED = 3 + DRAINING = 3 + CLOSED = 4 class HTTP2Connection(ConnectionInterface): @@ -82,6 +84,12 @@ def __init__( self._read_exception: Exception | None = None self._write_exception: Exception | None = None + # Track request phases for GOAWAY retry safety determination. + # Maps stream_id -> {"headers_sent": bool, "body_sent": bool} + # TODO: Consider shifting this to a dataclass or typeddict + self._stream_requests: dict[int, dict[str, bool]] = {} + + def handle_request(self, request: Request) -> Response: if not self.can_handle_request(request.url.origin): # This cannot occur in normal operation, since the connection pool @@ -133,6 +141,8 @@ def handle_request(self, request: Request) -> Response: try: stream_id = self._h2_state.get_next_available_stream_id() self._events[stream_id] = [] + # Initialize phase tracking for this stream + self._stream_requests[stream_id] = {"headers_sent": False, "body_sent": False} except h2.exceptions.NoAvailableStreamIDError: # pragma: nocover self._used_all_stream_ids = True self._request_count -= 1 @@ -142,8 +152,10 @@ def handle_request(self, request: Request) -> Response: kwargs = {"request": request, "stream_id": stream_id} with Trace("send_request_headers", logger, request, kwargs): self._send_request_headers(request=request, stream_id=stream_id) + self._stream_requests[stream_id]["headers_sent"] = True with Trace("send_request_body", logger, request, kwargs): self._send_request_body(request=request, stream_id=stream_id) + self._stream_requests[stream_id]["body_sent"] = True with Trace( "receive_response_headers", logger, request, kwargs ) as trace: @@ -177,9 +189,35 @@ def handle_request(self, request: Request) -> Response: # a protocol error at any point they interact with the 'h2_state'. # # In this case we'll have stored the event, and should raise - # it as a RemoteProtocolError. + # it as a ConnectionGoingAway if applicable, or RemoteProtocolError. if self._connection_terminated: # pragma: nocover - raise RemoteProtocolError(self._connection_terminated) + phase = self._stream_requests.get( + stream_id, {"headers_sent": False, "body_sent": False}, + ) + raise ConnectionGoingAway( + self._connection_terminated, + last_stream_id=self._connection_terminated.last_stream_id, + error_code=self._connection_terminated.error_code, + request_stream_id=stream_id, + headers_sent=phase["headers_sent"], + body_sent=phase["body_sent"], + ) + # Check if h2 is in CLOSED state due to GOAWAY. This can happen when + # GOAWAY was recieved but we haven't processed the event yet (race condition). + if self._h2_state.state_machine.state == h2.connection.ConnectionState.CLOSED: + phase = self._stream_requests.get( + stream_id, {"headers_sent": False, "body_sent": False}, + ) + msg = f"Connection closed: {exc}" + raise ConnectionGoingAway( + msg, + last_stream_id=stream_id, # Conservative: assume this stream may have been processed + error_code=0, # Assume graceful shutdown + request_stream_id=stream_id, + headers_sent=phase["headers_sent"], + body_sent=phase["body_sent"], + ) + # If h2 raises a protocol error in some other state then we # must somehow have made a protocol violation. raise LocalProtocolError(exc) # pragma: nocover @@ -349,10 +387,33 @@ def _receive_events( with self._read_lock: if self._connection_terminated is not None: last_stream_id = self._connection_terminated.last_stream_id - if stream_id and last_stream_id and stream_id > last_stream_id: - self._request_count -= 1 - raise ConnectionNotAvailable() - raise RemoteProtocolError(self._connection_terminated) + if stream_id is not None: + phase = self._stream_requests.get( + stream_id, {"headers_sent": False, "body_sent": False} + ) + if last_stream_id is not None and stream_id > last_stream_id: + # stream_id > last_stream_id: guaranteed unprocessed, safe to retry + self._request_count -= 1 + raise ConnectionGoingAway( + f"GOAWAY received: stream {stream_id} > last_stream_id {last_stream_id}", + last_stream_id=last_stream_id, + error_code=self._connection_terminated.error_code, + request_stream_id=stream_id, + headers_sent=phase["headers_sent"], + body_sent=phase["body_sent"], + ) + # stream_id <= last_stream_id: may have been processed + if self._state != HTTPConnectionState.DRAINING: + raise ConnectionGoingAway( + f"GOAWAY received: stream {stream_id} <= last_stream_id {last_stream_id}", + last_stream_id=last_stream_id if last_stream_id is not None else 0, + error_code=self._connection_terminated.error_code, + request_stream_id=stream_id, + headers_sent=phase["headers_sent"], + body_sent=phase["body_sent"], + ) + else: + raise RemoteProtocolError(self._connection_terminated) # This conditional is a bit icky. We don't want to block reading if we've # actually got an event to return for a given stream. We need to do that @@ -361,7 +422,7 @@ def _receive_events( # block until we've available flow control, event when we have events # pending for the stream ID we're attempting to send on. if stream_id is None or not self._events.get(stream_id): - events = self._read_incoming_data(request) + events = self._read_incoming_data(request, stream_id) for event in events: if isinstance(event, h2.events.RemoteSettingsChanged): with Trace( @@ -384,6 +445,13 @@ def _receive_events( elif isinstance(event, h2.events.ConnectionTerminated): self._connection_terminated = event + # Transition to DRAINING on graceful shutdown (NO_ERROR), + # allowing in-flight streams to complete. + # Non-graceful shutdown closes immediately. + if event.error_code == 0: + self._state = HTTPConnectionState.DRAINING + else: + self._state = HTTPConnectionState.CLOSED self._write_outgoing_data(request) @@ -409,6 +477,7 @@ def _receive_remote_settings_change( def _response_closed(self, stream_id: int) -> None: self._max_streams_semaphore.release() del self._events[stream_id] + self._stream_requests.pop(stream_id, None) # Clean up phase tracking with self._state_lock: if self._connection_terminated and not self._events: self.close() @@ -430,7 +499,9 @@ def close(self) -> None: # Wrappers around network read/write operations... - def _read_incoming_data(self, request: Request) -> list[h2.events.Event]: + def _read_incoming_data( + self, request: Request, stream_id: int | None = None + ) -> list[h2.events.Event]: timeouts = request.extensions.get("timeout", {}) timeout = timeouts.get("read", None) @@ -440,7 +511,32 @@ def _read_incoming_data(self, request: Request) -> list[h2.events.Event]: try: data = self._network_stream.read(self.READ_NUM_BYTES, timeout) if data == b"": - raise RemoteProtocolError("Server disconnected") + # Server disconnected. Check if this is related to GOAWAY. + if stream_id is not None: + phase = self._stream_requests.get( + stream_id, {"headers_sent": False, "body_sent": False} + ) + # If we have a GOAWAY recorded, this disconnect is GOAWAY-related + if self._connection_terminated is not None: + last_stream_id = self._connection_terminated.last_stream_id + raise ConnectionGoingAway( + "Server disconnected after GOAWAY", + last_stream_id=last_stream_id if last_stream_id else 0, + error_code=self._connection_terminated.error_code, + request_stream_id=stream_id, + headers_sent=phase["headers_sent"], + body_sent=phase["body_sent"], + ) + # Check if h2 is in CLOSED state (GOAWAY received but not processed) + if self._h2_state.state_machine.state == h2.connection.ConnectionState.CLOSED: + raise ConnectionGoingAway( + "Server disconnected (connection closed)", + last_stream_id=stream_id, # Conservative + error_code=0, # Assume graceful + request_stream_id=stream_id, + headers_sent=phase["headers_sent"], + body_sent=phase["body_sent"], + ) except Exception as exc: # If we get a network error we should: # @@ -510,7 +606,7 @@ def can_handle_request(self, origin: Origin) -> bool: def is_available(self) -> bool: return ( - self._state != HTTPConnectionState.CLOSED + self._state not in (HTTPConnectionState.DRAINING, HTTPConnectionState.CLOSED) and not self._connection_error and not self._used_all_stream_ids and not ( @@ -521,7 +617,12 @@ def is_available(self) -> bool: def has_expired(self) -> bool: now = time.monotonic() - return self._expire_at is not None and now > self._expire_at + keepalive_expired = self._expire_at is not None and now > self._expire_at + # Draining connections with no active streams are considered expired + draining_complete = ( + self._state == HTTPConnectionState.DRAINING and not self._events + ) + return keepalive_expired or draining_complete def is_idle(self) -> bool: return self._state == HTTPConnectionState.IDLE From 98b0dd8ef949ebe5c94f52a6619a73e7aa862b13 Mon Sep 17 00:00:00 2001 From: Magd Bayoumi Date: Thu, 29 Jan 2026 07:58:01 -0500 Subject: [PATCH 02/15] fixup! fix: handle http2 goaway race conditions --- httpcore/_async/connection_pool.py | 4 +--- httpcore/_async/http2.py | 32 +++++++++++++++++++++--------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index bd41868d..31cb4bff 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -262,9 +262,7 @@ async def handle_async_request(self, request: Request) -> Response: continue else: # Request may have been processed. Propagate error with context so application can decide whether to retry. - msg = ( - "GOAWAY recieved: request may have been processed" - ) + msg = "GOAWAY recieved: request may have been processed" # QUESTION: What is the best way to propagate the context for the applications? raise RemoteProtocolError(msg) from exc else: diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index 7b989c60..fd9a76d7 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -141,7 +141,10 @@ async def handle_async_request(self, request: Request) -> Response: stream_id = self._h2_state.get_next_available_stream_id() self._events[stream_id] = [] # Initialize phase tracking for this stream - self._stream_requests[stream_id] = {"headers_sent": False, "body_sent": False} + self._stream_requests[stream_id] = { + "headers_sent": False, + "body_sent": False, + } except h2.exceptions.NoAvailableStreamIDError: # pragma: nocover self._used_all_stream_ids = True self._request_count -= 1 @@ -191,7 +194,8 @@ async def handle_async_request(self, request: Request) -> Response: # it as a ConnectionGoingAway if applicable, or RemoteProtocolError. if self._connection_terminated: # pragma: nocover phase = self._stream_requests.get( - stream_id, {"headers_sent": False, "body_sent": False}, + stream_id, + {"headers_sent": False, "body_sent": False}, ) raise ConnectionGoingAway( self._connection_terminated, @@ -203,15 +207,19 @@ async def handle_async_request(self, request: Request) -> Response: ) # Check if h2 is in CLOSED state due to GOAWAY. This can happen when # GOAWAY was recieved but we haven't processed the event yet (race condition). - if self._h2_state.state_machine.state == h2.connection.ConnectionState.CLOSED: + if ( + self._h2_state.state_machine.state + == h2.connection.ConnectionState.CLOSED + ): phase = self._stream_requests.get( - stream_id, {"headers_sent": False, "body_sent": False}, + stream_id, + {"headers_sent": False, "body_sent": False}, ) msg = f"Connection closed: {exc}" raise ConnectionGoingAway( msg, - last_stream_id=stream_id, # Conservative: assume this stream may have been processed - error_code=0, # Assume graceful shutdown + last_stream_id=stream_id, # Conservative: assume this stream may have been processed + error_code=0, # Assume graceful shutdown request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -405,7 +413,9 @@ async def _receive_events( # stream_id <= last_stream_id: may have been processed raise ConnectionGoingAway( f"GOAWAY received: stream {stream_id} <= last_stream_id {last_stream_id}", - last_stream_id=last_stream_id if last_stream_id is not None else 0, + last_stream_id=last_stream_id + if last_stream_id is not None + else 0, error_code=self._connection_terminated.error_code, request_stream_id=stream_id, headers_sent=phase["headers_sent"], @@ -527,7 +537,10 @@ async def _read_incoming_data( body_sent=phase["body_sent"], ) # Check if h2 is in CLOSED state (GOAWAY received but not processed) - if self._h2_state.state_machine.state == h2.connection.ConnectionState.CLOSED: + if ( + self._h2_state.state_machine.state + == h2.connection.ConnectionState.CLOSED + ): raise ConnectionGoingAway( "Server disconnected (connection closed)", last_stream_id=stream_id, # Conservative @@ -605,7 +618,8 @@ def can_handle_request(self, origin: Origin) -> bool: def is_available(self) -> bool: return ( - self._state not in (HTTPConnectionState.DRAINING, HTTPConnectionState.CLOSED) + self._state + not in (HTTPConnectionState.DRAINING, HTTPConnectionState.CLOSED) and not self._connection_error and not self._used_all_stream_ids and not ( From 473d82e508f0415ae6a06ed652a03676582ae942 Mon Sep 17 00:00:00 2001 From: Magd Bayoumi Date: Thu, 29 Jan 2026 08:07:08 -0500 Subject: [PATCH 03/15] fixup! fixup! fix: handle http2 goaway race conditions --- httpcore/_async/http2.py | 1 + httpcore/_sync/http2.py | 1 + tests/_async/test_http2.py | 2 +- tests/_async/test_http_proxy.py | 2 +- tests/_sync/test_http2.py | 2 +- 5 files changed, 5 insertions(+), 3 deletions(-) diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index fd9a76d7..75d24d73 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -549,6 +549,7 @@ async def _read_incoming_data( headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], ) + raise RemoteProtocolError("Server disconnected") except Exception as exc: # If we get a network error we should: # diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index e7408608..96dac847 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -537,6 +537,7 @@ def _read_incoming_data( headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], ) + raise RemoteProtocolError("Server disconnected") except Exception as exc: # If we get a network error we should: # diff --git a/tests/_async/test_http2.py b/tests/_async/test_http2.py index b4ec6648..dd3e6a89 100644 --- a/tests/_async/test_http2.py +++ b/tests/_async/test_http2.py @@ -214,7 +214,7 @@ async def test_http2_connection_with_goaway(): ) async with httpcore.AsyncHTTP2Connection(origin=origin, stream=stream) as conn: # The initial request has been closed midway, with an unrecoverable error. - with pytest.raises(httpcore.RemoteProtocolError): + with pytest.raises(httpcore.ConnectionGoingAway): await conn.request("GET", "https://example.com/") # The second request can receive a graceful `ConnectionNotAvailable`, diff --git a/tests/_async/test_http_proxy.py b/tests/_async/test_http_proxy.py index 84a984b8..2fdcc775 100644 --- a/tests/_async/test_http_proxy.py +++ b/tests/_async/test_http_proxy.py @@ -224,7 +224,7 @@ async def test_proxy_tunneling_with_403(): """ network_backend = AsyncMockBackend( [ - b"HTTP/1.1 403 Permission Denied\r\n" b"\r\n", + b"HTTP/1.1 403 Permission Denied\r\n\r\n", ] ) diff --git a/tests/_sync/test_http2.py b/tests/_sync/test_http2.py index 695359bd..01e8207f 100644 --- a/tests/_sync/test_http2.py +++ b/tests/_sync/test_http2.py @@ -214,7 +214,7 @@ def test_http2_connection_with_goaway(): ) with httpcore.HTTP2Connection(origin=origin, stream=stream) as conn: # The initial request has been closed midway, with an unrecoverable error. - with pytest.raises(httpcore.RemoteProtocolError): + with pytest.raises(httpcore.ConnectionGoingAway): conn.request("GET", "https://example.com/") # The second request can receive a graceful `ConnectionNotAvailable`, From 995c04f8cce5646b5241951bd53662c44c2d8770 Mon Sep 17 00:00:00 2001 From: Magd Bayoumi Date: Thu, 29 Jan 2026 08:15:13 -0500 Subject: [PATCH 04/15] fixup! fixup! fixup! fix: handle http2 goaway race conditions --- httpcore/_async/http2.py | 12 ++++++------ httpcore/_sync/http2.py | 10 +++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index 75d24d73..3705865c 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -198,9 +198,9 @@ async def handle_async_request(self, request: Request) -> Response: {"headers_sent": False, "body_sent": False}, ) raise ConnectionGoingAway( - self._connection_terminated, - last_stream_id=self._connection_terminated.last_stream_id, - error_code=self._connection_terminated.error_code, + self._connection_terminated, # type: ignore[arg-type] + last_stream_id=self._connection_terminated.last_stream_id, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -404,7 +404,7 @@ async def _receive_events( raise ConnectionGoingAway( f"GOAWAY received: stream {stream_id} > last_stream_id {last_stream_id}", last_stream_id=last_stream_id, - error_code=self._connection_terminated.error_code, + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -416,7 +416,7 @@ async def _receive_events( last_stream_id=last_stream_id if last_stream_id is not None else 0, - error_code=self._connection_terminated.error_code, + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -531,7 +531,7 @@ async def _read_incoming_data( raise ConnectionGoingAway( "Server disconnected after GOAWAY", last_stream_id=last_stream_id if last_stream_id else 0, - error_code=self._connection_terminated.error_code, + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index 96dac847..40c18afb 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -195,9 +195,9 @@ def handle_request(self, request: Request) -> Response: stream_id, {"headers_sent": False, "body_sent": False}, ) raise ConnectionGoingAway( - self._connection_terminated, - last_stream_id=self._connection_terminated.last_stream_id, - error_code=self._connection_terminated.error_code, + self._connection_terminated, # type: ignore[arg-type] + last_stream_id=self._connection_terminated.last_stream_id, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -397,7 +397,7 @@ def _receive_events( raise ConnectionGoingAway( f"GOAWAY received: stream {stream_id} > last_stream_id {last_stream_id}", last_stream_id=last_stream_id, - error_code=self._connection_terminated.error_code, + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -407,7 +407,7 @@ def _receive_events( raise ConnectionGoingAway( f"GOAWAY received: stream {stream_id} <= last_stream_id {last_stream_id}", last_stream_id=last_stream_id if last_stream_id is not None else 0, - error_code=self._connection_terminated.error_code, + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], From 5cf1d0c93a11ded6981264668652726a6de12aa5 Mon Sep 17 00:00:00 2001 From: Magd Bayoumi Date: Thu, 29 Jan 2026 08:17:34 -0500 Subject: [PATCH 05/15] fixup! fixup! fixup! fixup! fix: handle http2 goaway race conditions --- httpcore/_async/http2.py | 12 ++++++------ httpcore/_sync/http2.py | 10 +++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index 3705865c..fadc7cfe 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -198,9 +198,9 @@ async def handle_async_request(self, request: Request) -> Response: {"headers_sent": False, "body_sent": False}, ) raise ConnectionGoingAway( - self._connection_terminated, # type: ignore[arg-type] - last_stream_id=self._connection_terminated.last_stream_id, # type: ignore[arg-type] - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + self._connection_terminated, # type: ignore[arg-type] + last_stream_id=self._connection_terminated.last_stream_id, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -404,7 +404,7 @@ async def _receive_events( raise ConnectionGoingAway( f"GOAWAY received: stream {stream_id} > last_stream_id {last_stream_id}", last_stream_id=last_stream_id, - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -416,7 +416,7 @@ async def _receive_events( last_stream_id=last_stream_id if last_stream_id is not None else 0, - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -531,7 +531,7 @@ async def _read_incoming_data( raise ConnectionGoingAway( "Server disconnected after GOAWAY", last_stream_id=last_stream_id if last_stream_id else 0, - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index 40c18afb..53b780bd 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -195,9 +195,9 @@ def handle_request(self, request: Request) -> Response: stream_id, {"headers_sent": False, "body_sent": False}, ) raise ConnectionGoingAway( - self._connection_terminated, # type: ignore[arg-type] - last_stream_id=self._connection_terminated.last_stream_id, # type: ignore[arg-type] - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + self._connection_terminated, # type: ignore[arg-type] + last_stream_id=self._connection_terminated.last_stream_id, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -397,7 +397,7 @@ def _receive_events( raise ConnectionGoingAway( f"GOAWAY received: stream {stream_id} > last_stream_id {last_stream_id}", last_stream_id=last_stream_id, - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -407,7 +407,7 @@ def _receive_events( raise ConnectionGoingAway( f"GOAWAY received: stream {stream_id} <= last_stream_id {last_stream_id}", last_stream_id=last_stream_id if last_stream_id is not None else 0, - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], From 61afe917a4d21222d3cc29a1685231d6482d2d7c Mon Sep 17 00:00:00 2001 From: Magd Bayoumi Date: Thu, 29 Jan 2026 08:18:57 -0500 Subject: [PATCH 06/15] fixup! fixup! fixup! fixup! fixup! fix: handle http2 goaway race conditions --- httpcore/_sync/http2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index 53b780bd..ac41ec09 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -522,7 +522,7 @@ def _read_incoming_data( raise ConnectionGoingAway( "Server disconnected after GOAWAY", last_stream_id=last_stream_id if last_stream_id else 0, - error_code=self._connection_terminated.error_code, + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], From 2fe76e784577467a210218e9f5594010ad887976 Mon Sep 17 00:00:00 2001 From: Magd Bayoumi Date: Thu, 29 Jan 2026 08:21:12 -0500 Subject: [PATCH 07/15] fixup! fixup! fixup! fixup! fixup! fixup! fix: handle http2 goaway race conditions --- httpcore/_async/connection_pool.py | 3 ++- httpcore/_sync/connection_pool.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 31cb4bff..e3b5bee2 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -261,7 +261,8 @@ async def handle_async_request(self, request: Request) -> Response: # Likely safe to retry. continue else: - # Request may have been processed. Propagate error with context so application can decide whether to retry. + # Request may have been processed. Propagate error with context so application can decide + # whether to retry. msg = "GOAWAY recieved: request may have been processed" # QUESTION: What is the best way to propagate the context for the applications? raise RemoteProtocolError(msg) from exc diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 8217218a..aea2c645 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -261,7 +261,8 @@ def handle_request(self, request: Request) -> Response: # Likely safe to retry. continue else: - # Request may have been processed. Propagate error with context so application can decide whether to retry. + # Request may have been processed. Propagate error with context so application can decide + # whether to retry. msg = ( "GOAWAY recieved: request may have been processed" ) From 83056e3a3e8dc8773b6bc467f18f795eadcfb846 Mon Sep 17 00:00:00 2001 From: Magd Bayoumi Date: Thu, 29 Jan 2026 08:32:13 -0500 Subject: [PATCH 08/15] fixup! fixup! fixup! fixup! fixup! fixup! fixup! fix: handle http2 goaway race conditions --- httpcore/_async/http2.py | 22 +++++++++--- httpcore/_sync/connection_pool.py | 4 +-- httpcore/_sync/http2.py | 57 +++++++++++++++++++++++-------- tests/_sync/test_http_proxy.py | 2 +- 4 files changed, 62 insertions(+), 23 deletions(-) diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index fadc7cfe..9bd4af73 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -195,7 +195,10 @@ async def handle_async_request(self, request: Request) -> Response: if self._connection_terminated: # pragma: nocover phase = self._stream_requests.get( stream_id, - {"headers_sent": False, "body_sent": False}, + { + "headers_sent": False, + "body_sent": False, + }, ) raise ConnectionGoingAway( self._connection_terminated, # type: ignore[arg-type] @@ -213,7 +216,10 @@ async def handle_async_request(self, request: Request) -> Response: ): phase = self._stream_requests.get( stream_id, - {"headers_sent": False, "body_sent": False}, + { + "headers_sent": False, + "body_sent": False, + }, ) msg = f"Connection closed: {exc}" raise ConnectionGoingAway( @@ -396,7 +402,11 @@ async def _receive_events( last_stream_id = self._connection_terminated.last_stream_id if stream_id is not None: phase = self._stream_requests.get( - stream_id, {"headers_sent": False, "body_sent": False} + stream_id, + { + "headers_sent": False, + "body_sent": False, + }, ) if last_stream_id is not None and stream_id > last_stream_id: # stream_id > last_stream_id: guaranteed unprocessed, safe to retry @@ -523,7 +533,11 @@ async def _read_incoming_data( # Server disconnected. Check if this is related to GOAWAY. if stream_id is not None: phase = self._stream_requests.get( - stream_id, {"headers_sent": False, "body_sent": False} + stream_id, + { + "headers_sent": False, + "body_sent": False, + }, ) # If we have a GOAWAY recorded, this disconnect is GOAWAY-related if self._connection_terminated is not None: diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index aea2c645..ad3564b7 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -263,9 +263,7 @@ def handle_request(self, request: Request) -> Response: else: # Request may have been processed. Propagate error with context so application can decide # whether to retry. - msg = ( - "GOAWAY recieved: request may have been processed" - ) + msg = "GOAWAY recieved: request may have been processed" # QUESTION: What is the best way to propagate the context for the applications? raise RemoteProtocolError(msg) from exc else: diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index ac41ec09..456ce60b 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -89,7 +89,6 @@ def __init__( # TODO: Consider shifting this to a dataclass or typeddict self._stream_requests: dict[int, dict[str, bool]] = {} - def handle_request(self, request: Request) -> Response: if not self.can_handle_request(request.url.origin): # This cannot occur in normal operation, since the connection pool @@ -142,7 +141,10 @@ def handle_request(self, request: Request) -> Response: stream_id = self._h2_state.get_next_available_stream_id() self._events[stream_id] = [] # Initialize phase tracking for this stream - self._stream_requests[stream_id] = {"headers_sent": False, "body_sent": False} + self._stream_requests[stream_id] = { + "headers_sent": False, + "body_sent": False, + } except h2.exceptions.NoAvailableStreamIDError: # pragma: nocover self._used_all_stream_ids = True self._request_count -= 1 @@ -192,7 +194,11 @@ def handle_request(self, request: Request) -> Response: # it as a ConnectionGoingAway if applicable, or RemoteProtocolError. if self._connection_terminated: # pragma: nocover phase = self._stream_requests.get( - stream_id, {"headers_sent": False, "body_sent": False}, + stream_id, + { + "headers_sent": False, + "body_sent": False, + }, ) raise ConnectionGoingAway( self._connection_terminated, # type: ignore[arg-type] @@ -204,15 +210,22 @@ def handle_request(self, request: Request) -> Response: ) # Check if h2 is in CLOSED state due to GOAWAY. This can happen when # GOAWAY was recieved but we haven't processed the event yet (race condition). - if self._h2_state.state_machine.state == h2.connection.ConnectionState.CLOSED: + if ( + self._h2_state.state_machine.state + == h2.connection.ConnectionState.CLOSED + ): phase = self._stream_requests.get( - stream_id, {"headers_sent": False, "body_sent": False}, + stream_id, + { + "headers_sent": False, + "body_sent": False, + }, ) msg = f"Connection closed: {exc}" raise ConnectionGoingAway( msg, - last_stream_id=stream_id, # Conservative: assume this stream may have been processed - error_code=0, # Assume graceful shutdown + last_stream_id=stream_id, # Conservative: assume this stream may have been processed + error_code=0, # Assume graceful shutdown request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -389,7 +402,11 @@ def _receive_events( last_stream_id = self._connection_terminated.last_stream_id if stream_id is not None: phase = self._stream_requests.get( - stream_id, {"headers_sent": False, "body_sent": False} + stream_id, + { + "headers_sent": False, + "body_sent": False, + }, ) if last_stream_id is not None and stream_id > last_stream_id: # stream_id > last_stream_id: guaranteed unprocessed, safe to retry @@ -402,11 +419,13 @@ def _receive_events( headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], ) - # stream_id <= last_stream_id: may have been processed if self._state != HTTPConnectionState.DRAINING: + # stream_id <= last_stream_id: may have been processed raise ConnectionGoingAway( f"GOAWAY received: stream {stream_id} <= last_stream_id {last_stream_id}", - last_stream_id=last_stream_id if last_stream_id is not None else 0, + last_stream_id=last_stream_id + if last_stream_id is not None + else 0, error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], @@ -477,7 +496,7 @@ def _receive_remote_settings_change( def _response_closed(self, stream_id: int) -> None: self._max_streams_semaphore.release() del self._events[stream_id] - self._stream_requests.pop(stream_id, None) # Clean up phase tracking + self._stream_requests.pop(stream_id, None) # Clean up phase tracking with self._state_lock: if self._connection_terminated and not self._events: self.close() @@ -514,7 +533,11 @@ def _read_incoming_data( # Server disconnected. Check if this is related to GOAWAY. if stream_id is not None: phase = self._stream_requests.get( - stream_id, {"headers_sent": False, "body_sent": False} + stream_id, + { + "headers_sent": False, + "body_sent": False, + }, ) # If we have a GOAWAY recorded, this disconnect is GOAWAY-related if self._connection_terminated is not None: @@ -522,13 +545,16 @@ def _read_incoming_data( raise ConnectionGoingAway( "Server disconnected after GOAWAY", last_stream_id=last_stream_id if last_stream_id else 0, - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], ) # Check if h2 is in CLOSED state (GOAWAY received but not processed) - if self._h2_state.state_machine.state == h2.connection.ConnectionState.CLOSED: + if ( + self._h2_state.state_machine.state + == h2.connection.ConnectionState.CLOSED + ): raise ConnectionGoingAway( "Server disconnected (connection closed)", last_stream_id=stream_id, # Conservative @@ -607,7 +633,8 @@ def can_handle_request(self, origin: Origin) -> bool: def is_available(self) -> bool: return ( - self._state not in (HTTPConnectionState.DRAINING, HTTPConnectionState.CLOSED) + self._state + not in (HTTPConnectionState.DRAINING, HTTPConnectionState.CLOSED) and not self._connection_error and not self._used_all_stream_ids and not ( diff --git a/tests/_sync/test_http_proxy.py b/tests/_sync/test_http_proxy.py index 966672dd..3b46d103 100644 --- a/tests/_sync/test_http_proxy.py +++ b/tests/_sync/test_http_proxy.py @@ -224,7 +224,7 @@ def test_proxy_tunneling_with_403(): """ network_backend = MockBackend( [ - b"HTTP/1.1 403 Permission Denied\r\n" b"\r\n", + b"HTTP/1.1 403 Permission Denied\r\n\r\n", ] ) From efac8b8e8a594ae728f1907af53ba5cba7feb691 Mon Sep 17 00:00:00 2001 From: Magd Bayoumi Date: Thu, 29 Jan 2026 08:47:50 -0500 Subject: [PATCH 09/15] fixup! fixup! fixup! fixup! fixup! fixup! fixup! fixup! fix: handle http2 goaway race conditions --- httpcore/_async/http2.py | 12 ++++++------ httpcore/_sync/http2.py | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index 9bd4af73..53b852a0 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -201,9 +201,9 @@ async def handle_async_request(self, request: Request) -> Response: }, ) raise ConnectionGoingAway( - self._connection_terminated, # type: ignore[arg-type] - last_stream_id=self._connection_terminated.last_stream_id, # type: ignore[arg-type] - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + self._connection_terminated, # type: ignore[arg-type,unused-ignore] + last_stream_id=self._connection_terminated.last_stream_id, # type: ignore[arg-type,unused-ignore] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type,unused-ignore] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -414,7 +414,7 @@ async def _receive_events( raise ConnectionGoingAway( f"GOAWAY received: stream {stream_id} > last_stream_id {last_stream_id}", last_stream_id=last_stream_id, - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type,unused-ignore] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -426,7 +426,7 @@ async def _receive_events( last_stream_id=last_stream_id if last_stream_id is not None else 0, - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type,unused-ignore] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -545,7 +545,7 @@ async def _read_incoming_data( raise ConnectionGoingAway( "Server disconnected after GOAWAY", last_stream_id=last_stream_id if last_stream_id else 0, - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type,unused-ignore] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index 456ce60b..28e1d10a 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -201,9 +201,9 @@ def handle_request(self, request: Request) -> Response: }, ) raise ConnectionGoingAway( - self._connection_terminated, # type: ignore[arg-type] - last_stream_id=self._connection_terminated.last_stream_id, # type: ignore[arg-type] - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + self._connection_terminated, # type: ignore[arg-type,unused-ignore] + last_stream_id=self._connection_terminated.last_stream_id, # type: ignore[arg-type,unused-ignore] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type,unused-ignore] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -414,7 +414,7 @@ def _receive_events( raise ConnectionGoingAway( f"GOAWAY received: stream {stream_id} > last_stream_id {last_stream_id}", last_stream_id=last_stream_id, - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type,unused-ignore] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -426,7 +426,7 @@ def _receive_events( last_stream_id=last_stream_id if last_stream_id is not None else 0, - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type,unused-ignore] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], @@ -545,7 +545,7 @@ def _read_incoming_data( raise ConnectionGoingAway( "Server disconnected after GOAWAY", last_stream_id=last_stream_id if last_stream_id else 0, - error_code=self._connection_terminated.error_code, # type: ignore[arg-type] + error_code=self._connection_terminated.error_code, # type: ignore[arg-type,unused-ignore] request_stream_id=stream_id, headers_sent=phase["headers_sent"], body_sent=phase["body_sent"], From 6d0511a591ca9db79f51548b29c3a116bd158204 Mon Sep 17 00:00:00 2001 From: bayoumi17m Date: Thu, 29 Jan 2026 17:35:13 -0500 Subject: [PATCH 10/15] test: add comprehensive tests for HTTP/2 GOAWAY handling (#1) --- httpcore/_async/connection_pool.py | 14 +- httpcore/_sync/connection_pool.py | 15 +- tests/_async/test_http2_goaway.py | 848 +++++++++++++++++++++++++++++ 3 files changed, 865 insertions(+), 12 deletions(-) create mode 100644 tests/_async/test_http2_goaway.py diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index e3b5bee2..03d2eb25 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -241,15 +241,11 @@ async def handle_async_request(self, request: Request) -> Response: response = await connection.handle_async_request( pool_request.request ) - except ConnectionNotAvailable: - # In some cases a connection may initially be available to - # handle a request, but then become unavailable. - # - # In this case we clear the connection and try again. - pool_request.clear_connection() except ConnectionGoingAway as exc: # GOAWAY frame recieved during request processing. # Determine if we can safely retry based on RFC 7540 semantics. + # NOTE: This must be caught before ConnectionNotAvailable since + # ConnectionGoingAway is a subclass of ConnectionNotAvailable. pool_request.clear_connection() if exc.is_safe_to_retry: @@ -266,6 +262,12 @@ async def handle_async_request(self, request: Request) -> Response: msg = "GOAWAY recieved: request may have been processed" # QUESTION: What is the best way to propagate the context for the applications? raise RemoteProtocolError(msg) from exc + except ConnectionNotAvailable: + # In some cases a connection may initially be available to + # handle a request, but then become unavailable. + # + # In this case we clear the connection and try again. + pool_request.clear_connection() else: break # pragma: nocover diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index ad3564b7..38d1c0b0 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -241,13 +241,10 @@ def handle_request(self, request: Request) -> Response: response = connection.handle_request( pool_request.request ) - except ConnectionNotAvailable: - # In some cases a connection may initially be available to - # handle a request, but then become unavailable. - # - # In this case we clear the connection and try again. - pool_request.clear_connection() except ConnectionGoingAway as exc: + # NOTE: This must be caught before ConnectionNotAvailable since + # ConnectionGoingAway is a subclass of ConnectionNotAvailable. + # # GOAWAY frame recieved during request processing. # Determine if we can safely retry based on RFC 7540 semantics. pool_request.clear_connection() @@ -266,6 +263,12 @@ def handle_request(self, request: Request) -> Response: msg = "GOAWAY recieved: request may have been processed" # QUESTION: What is the best way to propagate the context for the applications? raise RemoteProtocolError(msg) from exc + except ConnectionNotAvailable: + # In some cases a connection may initially be available to + # handle a request, but then become unavailable. + # + # In this case we clear the connection and try again. + pool_request.clear_connection() else: break # pragma: nocover diff --git a/tests/_async/test_http2_goaway.py b/tests/_async/test_http2_goaway.py new file mode 100644 index 00000000..d68fe4de --- /dev/null +++ b/tests/_async/test_http2_goaway.py @@ -0,0 +1,848 @@ +""" +Comprehensive tests for HTTP/2 GOAWAY handling. + +These tests cover the new GOAWAY functionality introduced to handle race conditions +when servers send GOAWAY frames during HTTP/2 connections. Key features tested: + +1. ConnectionGoingAway exception properties (is_safe_to_retry, is_graceful_shutdown, may_have_side_effects) +2. DRAINING connection state for graceful shutdowns +3. Request phase tracking (headers_sent, body_sent) +4. Pool-level retry logic based on GOAWAY context +5. Various race conditions between GOAWAY and request processing +""" +from __future__ import annotations + +import hpack +import hyperframe.frame +import pytest + +import httpcore + + +# ============================================================================= +# Tests for ConnectionGoingAway Exception Properties +# ============================================================================= + + +class TestConnectionGoingAwayException: + """Tests for the ConnectionGoingAway exception class and its properties.""" + + def test_is_safe_to_retry_when_stream_id_greater_than_last_stream_id(self): + """ + Per RFC 7540 Section 6.8: streams with IDs > last_stream_id are guaranteed + unprocessed and safe to retry. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=3, # > last_stream_id + headers_sent=True, + body_sent=True, + ) + assert exc.is_safe_to_retry is True + + def test_is_not_safe_to_retry_when_stream_id_equals_last_stream_id(self): + """ + Streams with IDs <= last_stream_id may have been processed by the server. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=1, # == last_stream_id + headers_sent=True, + body_sent=True, + ) + assert exc.is_safe_to_retry is False + + def test_is_not_safe_to_retry_when_stream_id_less_than_last_stream_id(self): + """ + Streams with IDs <= last_stream_id may have been processed by the server. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=5, + error_code=0, + request_stream_id=1, # < last_stream_id + headers_sent=True, + body_sent=True, + ) + assert exc.is_safe_to_retry is False + + def test_is_graceful_shutdown_when_error_code_is_zero(self): + """ + NO_ERROR (0x0) indicates administrative shutdown such as server restart, + connection limit reached, or idle timeout. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, # NO_ERROR + request_stream_id=1, + ) + assert exc.is_graceful_shutdown is True + + def test_is_not_graceful_shutdown_when_error_code_is_nonzero(self): + """ + Non-zero error codes indicate an error condition. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=1, # PROTOCOL_ERROR + request_stream_id=1, + ) + assert exc.is_graceful_shutdown is False + + def test_may_have_side_effects_when_stream_id_greater_than_last_stream_id(self): + """ + If stream_id > last_stream_id, the request was guaranteed unprocessed, + so no side effects are possible. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=3, # > last_stream_id + headers_sent=True, # Even with headers sent, no side effects possible + body_sent=True, + ) + assert exc.may_have_side_effects is False + + def test_may_have_side_effects_when_headers_sent(self): + """ + If stream_id <= last_stream_id AND headers were sent, side effects are possible. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=1, # <= last_stream_id + headers_sent=True, + body_sent=False, + ) + assert exc.may_have_side_effects is True + + def test_may_have_side_effects_when_body_sent(self): + """ + If stream_id <= last_stream_id AND body was sent, side effects are possible. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=1, # <= last_stream_id + headers_sent=False, + body_sent=True, + ) + assert exc.may_have_side_effects is True + + def test_no_side_effects_when_nothing_sent(self): + """ + If stream_id <= last_stream_id but nothing was sent, no side effects. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=1, # <= last_stream_id + headers_sent=False, + body_sent=False, + ) + assert exc.may_have_side_effects is False + + def test_repr(self): + """Test the __repr__ method provides useful debugging info.""" + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=3, + headers_sent=True, + body_sent=True, + ) + repr_str = repr(exc) + assert "ConnectionGoingAway" in repr_str + assert "last_stream_id=1" in repr_str + assert "error_code=0" in repr_str + assert "request_stream_id=3" in repr_str + assert "is_safe_to_retry=True" in repr_str + assert "is_graceful_shutdown=True" in repr_str + + def test_inheritance_from_connection_not_available(self): + """ConnectionGoingAway should be a subclass of ConnectionNotAvailable.""" + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=1, + ) + assert isinstance(exc, httpcore.ConnectionNotAvailable) + + +# ============================================================================= +# Tests for HTTP/2 Connection GOAWAY Handling +# ============================================================================= + + +@pytest.mark.anyio +async def test_http2_goaway_non_graceful_shutdown(): + """ + Non-graceful shutdown (error_code != 0) should raise ConnectionGoingAway + with is_graceful_shutdown=False. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.AsyncMockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + # Non-graceful GOAWAY with PROTOCOL_ERROR (error_code=1) + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=1, last_stream_id=1 + ).serialize(), + b"", + ] + ) + async with httpcore.AsyncHTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + # First request should fail with ConnectionGoingAway due to non-graceful GOAWAY + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + await conn.request("GET", "https://example.com/") + + # Verify it's not a graceful shutdown + assert exc_info.value.is_graceful_shutdown is False + assert exc_info.value.error_code == 1 + + +@pytest.mark.anyio +async def test_http2_goaway_graceful_shutdown_properties(): + """ + When GOAWAY with NO_ERROR is received, the exception should have + is_graceful_shutdown=True. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.AsyncMockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + # Graceful GOAWAY with NO_ERROR + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=1 + ).serialize(), + b"", + ] + ) + async with httpcore.AsyncHTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + # Request should raise ConnectionGoingAway since GOAWAY is received + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + await conn.request("GET", "https://example.com/") + + # Should be a graceful shutdown + assert exc_info.value.is_graceful_shutdown is True + assert exc_info.value.error_code == 0 + + +@pytest.mark.anyio +async def test_http2_goaway_stream_id_greater_than_last_stream_id(): + """ + When stream_id > last_stream_id, the request is guaranteed unprocessed + and should raise ConnectionGoingAway with is_safe_to_retry=True. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.AsyncMockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + # GOAWAY with last_stream_id=0 before any streams were processed + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=0 + ).serialize(), + b"", + ] + ) + async with httpcore.AsyncHTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + await conn.request("GET", "https://example.com/") + + # Stream 1 > last_stream_id (0), so safe to retry + assert exc_info.value.request_stream_id == 1 + assert exc_info.value.last_stream_id == 0 + assert exc_info.value.is_safe_to_retry is True + + +@pytest.mark.anyio +async def test_http2_goaway_stream_id_less_than_or_equal_to_last_stream_id(): + """ + When stream_id <= last_stream_id and connection is not DRAINING, + the request may have been processed and should raise ConnectionGoingAway. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.AsyncMockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + # GOAWAY with last_stream_id=1, so stream 1 may have been processed + # Using error_code=1 to trigger non-DRAINING state + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=1, last_stream_id=1 + ).serialize(), + b"", + ] + ) + async with httpcore.AsyncHTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + await conn.request("GET", "https://example.com/") + + # Stream 1 <= last_stream_id (1), NOT safe to retry + assert exc_info.value.request_stream_id == 1 + assert exc_info.value.last_stream_id == 1 + assert exc_info.value.is_safe_to_retry is False + + +@pytest.mark.anyio +async def test_http2_server_disconnect_after_goaway(): + """ + When server disconnects after sending GOAWAY, the exception should + include GOAWAY context. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.AsyncMockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + # GOAWAY followed immediately by disconnect + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=0 + ).serialize(), + b"", # Server disconnect + ] + ) + async with httpcore.AsyncHTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + await conn.request("GET", "https://example.com/") + + # Should include GOAWAY context + assert exc_info.value.last_stream_id == 0 + assert exc_info.value.is_graceful_shutdown is True + + +@pytest.mark.anyio +async def test_http2_tracks_request_phase_headers_sent(): + """ + The connection should track when headers have been sent for GOAWAY context. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.AsyncMockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + # GOAWAY after headers would be sent but before response + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=1 + ).serialize(), + b"", + ] + ) + async with httpcore.AsyncHTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + await conn.request("GET", "https://example.com/") + + # Headers should have been sent + assert exc_info.value.headers_sent is True + + +@pytest.mark.anyio +async def test_http2_tracks_request_phase_body_sent(): + """ + The connection should track when body has been sent for GOAWAY context. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.AsyncMockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + # GOAWAY after body would be sent but before response + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=1 + ).serialize(), + b"", + ] + ) + async with httpcore.AsyncHTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + await conn.request( + "POST", + "https://example.com/", + headers={b"content-length": b"11"}, + content=b"Hello World", + ) + + # Body should have been sent + assert exc_info.value.body_sent is True + + +@pytest.mark.anyio +async def test_http2_draining_connection_goaway_after_complete_response(): + """ + When GOAWAY is sent after a complete response, the first request succeeds. + The GOAWAY is only discovered on the next request attempt, which then fails. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.AsyncMockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + hyperframe.frame.DataFrame( + stream_id=1, data=b"Hello, world!", flags=["END_STREAM"] + ).serialize(), + # GOAWAY after the first response completes - discovered on next request + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=1 + ).serialize(), + b"", # Disconnect after GOAWAY + ] + ) + async with httpcore.AsyncHTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + # First request should complete successfully + response = await conn.request("GET", "https://example.com/") + assert response.status == 200 + assert response.content == b"Hello, world!" + + # Connection appears available because GOAWAY hasn't been read yet + # (it comes after the complete response) + # Second request attempts to use the connection and discovers GOAWAY + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + await conn.request("GET", "https://example.com/") + + # The second request (stream 3) > last_stream_id (1), so safe to retry + assert exc_info.value.is_safe_to_retry is True + + + + +# ============================================================================= +# Custom Mock Backend for Retry Tests +# ============================================================================= + + +class AsyncMockStreamWithFailure(httpcore.AsyncMockStream): + """A mock stream that can simulate different behaviors for testing retries.""" + + def __init__(self, buffers_by_connection: list[list[bytes]], http2: bool = False): + self._all_buffers = buffers_by_connection + self._connection_index = 0 + super().__init__( + buffers_by_connection[0] if buffers_by_connection else [], http2=http2 + ) + + +class AsyncMockBackendWithRetry(httpcore.AsyncMockBackend): + """A mock backend that returns different data for each connection.""" + + def __init__(self, buffers_by_connection: list[list[bytes]], http2: bool = False): + self._all_buffers = buffers_by_connection + self._connection_index = 0 + self._http2 = http2 + super().__init__([], http2=http2) + + async def connect_tcp( + self, + host: str, + port: int, + timeout: float | None = None, + local_address: str | None = None, + socket_options=None, + ): + if self._connection_index < len(self._all_buffers): + buffer = list(self._all_buffers[self._connection_index]) + self._connection_index += 1 + else: + buffer = [] + return httpcore.AsyncMockStream(buffer, http2=self._http2) + + +# ============================================================================= +# Tests for Connection Pool GOAWAY Retry Logic +# ============================================================================= + + +@pytest.mark.anyio +async def test_connection_pool_retries_when_safe_to_retry(): + """ + Connection pool should automatically retry when is_safe_to_retry is True + (stream_id > last_stream_id, guaranteed unprocessed). + """ + network_backend = AsyncMockBackendWithRetry( + buffers_by_connection=[ + # First connection: GOAWAY with last_stream_id=0 (stream 1 > 0, safe to retry) + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=0 + ).serialize(), + b"", + ], + # Second connection: normal response + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + hyperframe.frame.DataFrame( + stream_id=1, data=b"Hello, world!", flags=["END_STREAM"] + ).serialize(), + ], + ], + http2=True, + ) + + async with httpcore.AsyncConnectionPool( + network_backend=network_backend, + ) as pool: + # Request should succeed after automatic retry + response = await pool.request("GET", "https://example.com/") + assert response.status == 200 + assert response.content == b"Hello, world!" + + +@pytest.mark.anyio +async def test_connection_pool_retries_graceful_no_side_effects(): + """ + Connection pool should retry when is_graceful_shutdown is True + AND may_have_side_effects is False (headers not sent yet, stream > last_stream). + """ + network_backend = AsyncMockBackendWithRetry( + buffers_by_connection=[ + # First connection: Graceful GOAWAY with last_stream_id=0 + # stream 1 > 0, so safe to retry + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=0 + ).serialize(), + b"", + ], + # Second connection: normal response + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + hyperframe.frame.DataFrame( + stream_id=1, data=b"Success!", flags=["END_STREAM"] + ).serialize(), + ], + ], + http2=True, + ) + + async with httpcore.AsyncConnectionPool( + network_backend=network_backend, + ) as pool: + response = await pool.request("GET", "https://example.com/") + assert response.status == 200 + assert response.content == b"Success!" + + +@pytest.mark.anyio +async def test_connection_pool_raises_when_not_safe_to_retry(): + """ + Connection pool should raise RemoteProtocolError when is_safe_to_retry is False + and the request may have been processed. + """ + network_backend = AsyncMockBackendWithRetry( + buffers_by_connection=[ + # First connection: GOAWAY with last_stream_id=1 (stream 1 <= 1, not safe) + # Non-graceful shutdown (error_code=1) + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=1, last_stream_id=1 + ).serialize(), + b"", + ], + ], + http2=True, + ) + + async with httpcore.AsyncConnectionPool( + network_backend=network_backend, + ) as pool: + with pytest.raises(httpcore.RemoteProtocolError) as exc_info: + await pool.request("GET", "https://example.com/") + + # Verify the error message indicates GOAWAY was received + assert "GOAWAY" in str(exc_info.value) + + +@pytest.mark.anyio +async def test_connection_pool_raises_when_may_have_side_effects(): + """ + Connection pool should raise RemoteProtocolError when graceful shutdown + but request may have had side effects (headers sent, stream <= last_stream_id). + """ + network_backend = AsyncMockBackendWithRetry( + buffers_by_connection=[ + # First connection: Graceful GOAWAY with last_stream_id=1 + # Headers were sent, may have side effects + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=1 + ).serialize(), + b"", + ], + ], + http2=True, + ) + + async with httpcore.AsyncConnectionPool( + network_backend=network_backend, + ) as pool: + with pytest.raises(httpcore.RemoteProtocolError) as exc_info: + await pool.request("GET", "https://example.com/") + + # Verify the error message indicates GOAWAY was received + assert "GOAWAY" in str(exc_info.value) + + +# ============================================================================= +# Additional tests for specific code paths +# ============================================================================= + + +@pytest.mark.anyio +async def test_http2_goaway_receive_events_with_terminated_connection(): + """ + Test the _receive_events code path when connection is already terminated. + This covers the case where stream_id <= last_stream_id. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.AsyncMockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + # GOAWAY with last_stream_id=5 (so stream 1 <= 5, may have been processed) + # Non-graceful shutdown so connection goes to CLOSED, not DRAINING + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=1, last_stream_id=5 + ).serialize(), + b"", + ] + ) + async with httpcore.AsyncHTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + await conn.request("GET", "https://example.com/") + + # Stream 1 <= last_stream_id (5), NOT safe to retry + assert exc_info.value.request_stream_id == 1 + assert exc_info.value.last_stream_id == 5 + assert exc_info.value.is_safe_to_retry is False + + +@pytest.mark.anyio +async def test_http2_goaway_connection_closed_after_graceful_goaway(): + """ + Test that connection is properly closed after handling graceful GOAWAY + when the request fails (due to server disconnect after GOAWAY). + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.AsyncMockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=1 + ).serialize(), + b"", + ] + ) + async with httpcore.AsyncHTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway): + await conn.request("GET", "https://example.com/") + + # After exception handling cleanup, connection should be closed + assert conn.is_closed() + + +# ============================================================================= +# Tests for pool retry edge cases using mock connections +# ============================================================================= + + +class MockConnectionGracefulGoaway: + """ + Mock connection that raises ConnectionGoingAway with specific properties + to test the graceful shutdown + no side effects retry path. + """ + + def __init__(self, origin: httpcore.Origin, succeed_on_retry: bool = True): + self._origin = origin + self._calls = 0 + self._succeed_on_retry = succeed_on_retry + + def can_handle_request(self, origin: httpcore.Origin) -> bool: + return origin == self._origin + + def is_available(self) -> bool: + return True + + def is_idle(self) -> bool: + return self._calls == 0 + + def is_closed(self) -> bool: + return False + + def has_expired(self) -> bool: + return False + + async def handle_async_request(self, request: httpcore.Request) -> httpcore.Response: + self._calls += 1 + if self._calls == 1: + # First call: raise ConnectionGoingAway with: + # - stream_id <= last_stream_id (not safe to retry based on RFC) + # - error_code = 0 (graceful shutdown) + # - headers_sent = False, body_sent = False (no side effects) + # This triggers the "elif exc.is_graceful_shutdown and not exc.may_have_side_effects" branch + raise httpcore.ConnectionGoingAway( + "Graceful shutdown before headers sent", + last_stream_id=5, # >= stream_id (1) + error_code=0, # graceful + request_stream_id=1, # <= last_stream_id + headers_sent=False, # no side effects + body_sent=False, + ) + if self._succeed_on_retry: + return httpcore.Response(200, content=b"Success after retry!") + raise httpcore.ConnectionGoingAway( + "Another GOAWAY", + last_stream_id=5, + error_code=0, + request_stream_id=1, + headers_sent=False, + body_sent=False, + ) + + async def aclose(self) -> None: + pass + + def info(self) -> str: + return "MockConnectionGracefulGoaway" + + +@pytest.mark.anyio +async def test_connection_pool_retries_graceful_shutdown_no_headers_sent(): + """ + Test the pool retry path where: + - is_safe_to_retry = False (stream_id <= last_stream_id) + - is_graceful_shutdown = True (error_code = 0) + - may_have_side_effects = False (headers not sent yet) + + This tests line 258 in connection_pool.py. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + + # Create a custom pool that returns our mock connection + class TestPool(httpcore.AsyncConnectionPool): + def __init__(self): + super().__init__() + self._mock_connections: list = [] + + def create_connection(self, origin: httpcore.Origin): + conn = MockConnectionGracefulGoaway(origin, succeed_on_retry=True) + self._mock_connections.append(conn) + return conn + + async with TestPool() as pool: + response = await pool.request("GET", "https://example.com/") + assert response.status == 200 + assert response.content == b"Success after retry!" + + # Verify the first connection was called twice (retry happened) + assert pool._mock_connections[0]._calls == 2 + + From f2e483efeca2a9daf524a89735fcaf7a6ecf49c7 Mon Sep 17 00:00:00 2001 From: bayoumi17m Date: Fri, 30 Jan 2026 01:59:53 -0500 Subject: [PATCH 11/15] fix: resolve linting issues and regenerate sync files for GOAWAY tests (#2) --- httpcore/_sync/connection_pool.py | 5 +- tests/_async/test_http2_goaway.py | 56 +- tests/_sync/test_http2_goaway.py | 828 ++++++++++++++++++++++++++++++ 3 files changed, 848 insertions(+), 41 deletions(-) create mode 100644 tests/_sync/test_http2_goaway.py diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 38d1c0b0..3229300f 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -242,11 +242,10 @@ def handle_request(self, request: Request) -> Response: pool_request.request ) except ConnectionGoingAway as exc: - # NOTE: This must be caught before ConnectionNotAvailable since - # ConnectionGoingAway is a subclass of ConnectionNotAvailable. - # # GOAWAY frame recieved during request processing. # Determine if we can safely retry based on RFC 7540 semantics. + # NOTE: This must be caught before ConnectionNotAvailable since + # ConnectionGoingAway is a subclass of ConnectionNotAvailable. pool_request.clear_connection() if exc.is_safe_to_retry: diff --git a/tests/_async/test_http2_goaway.py b/tests/_async/test_http2_goaway.py index d68fe4de..b478457e 100644 --- a/tests/_async/test_http2_goaway.py +++ b/tests/_async/test_http2_goaway.py @@ -10,15 +10,17 @@ 4. Pool-level retry logic based on GOAWAY context 5. Various race conditions between GOAWAY and request processing """ + from __future__ import annotations +from typing import Any + import hpack import hyperframe.frame import pytest import httpcore - # ============================================================================= # Tests for ConnectionGoingAway Exception Properties # ============================================================================= @@ -466,24 +468,11 @@ async def test_http2_draining_connection_goaway_after_complete_response(): assert exc_info.value.is_safe_to_retry is True - - # ============================================================================= # Custom Mock Backend for Retry Tests # ============================================================================= -class AsyncMockStreamWithFailure(httpcore.AsyncMockStream): - """A mock stream that can simulate different behaviors for testing retries.""" - - def __init__(self, buffers_by_connection: list[list[bytes]], http2: bool = False): - self._all_buffers = buffers_by_connection - self._connection_index = 0 - super().__init__( - buffers_by_connection[0] if buffers_by_connection else [], http2=http2 - ) - - class AsyncMockBackendWithRetry(httpcore.AsyncMockBackend): """A mock backend that returns different data for each connection.""" @@ -499,8 +488,8 @@ async def connect_tcp( port: int, timeout: float | None = None, local_address: str | None = None, - socket_options=None, - ): + socket_options: Any = None, + ) -> httpcore.AsyncMockStream: if self._connection_index < len(self._all_buffers): buffer = list(self._all_buffers[self._connection_index]) self._connection_index += 1 @@ -754,16 +743,15 @@ async def test_http2_goaway_connection_closed_after_graceful_goaway(): # ============================================================================= -class MockConnectionGracefulGoaway: +class MockConnectionGracefulGoaway(httpcore.AsyncConnectionInterface): """ Mock connection that raises ConnectionGoingAway with specific properties to test the graceful shutdown + no side effects retry path. """ - def __init__(self, origin: httpcore.Origin, succeed_on_retry: bool = True): + def __init__(self, origin: httpcore.Origin) -> None: self._origin = origin self._calls = 0 - self._succeed_on_retry = succeed_on_retry def can_handle_request(self, origin: httpcore.Origin) -> bool: return origin == self._origin @@ -780,7 +768,9 @@ def is_closed(self) -> bool: def has_expired(self) -> bool: return False - async def handle_async_request(self, request: httpcore.Request) -> httpcore.Response: + async def handle_async_request( + self, request: httpcore.Request + ) -> httpcore.Response: self._calls += 1 if self._calls == 1: # First call: raise ConnectionGoingAway with: @@ -796,16 +786,7 @@ async def handle_async_request(self, request: httpcore.Request) -> httpcore.Resp headers_sent=False, # no side effects body_sent=False, ) - if self._succeed_on_retry: - return httpcore.Response(200, content=b"Success after retry!") - raise httpcore.ConnectionGoingAway( - "Another GOAWAY", - last_stream_id=5, - error_code=0, - request_stream_id=1, - headers_sent=False, - body_sent=False, - ) + return httpcore.Response(200, content=b"Success after retry!") async def aclose(self) -> None: pass @@ -824,16 +805,17 @@ async def test_connection_pool_retries_graceful_shutdown_no_headers_sent(): This tests line 258 in connection_pool.py. """ - origin = httpcore.Origin(b"https", b"example.com", 443) # Create a custom pool that returns our mock connection class TestPool(httpcore.AsyncConnectionPool): - def __init__(self): + def __init__(self) -> None: super().__init__() - self._mock_connections: list = [] + self._mock_connections: list[MockConnectionGracefulGoaway] = [] - def create_connection(self, origin: httpcore.Origin): - conn = MockConnectionGracefulGoaway(origin, succeed_on_retry=True) + def create_connection( + self, origin: httpcore.Origin + ) -> MockConnectionGracefulGoaway: + conn = MockConnectionGracefulGoaway(origin) self._mock_connections.append(conn) return conn @@ -843,6 +825,4 @@ def create_connection(self, origin: httpcore.Origin): assert response.content == b"Success after retry!" # Verify the first connection was called twice (retry happened) - assert pool._mock_connections[0]._calls == 2 - - + assert pool._mock_connections[0]._calls == 2 # type: ignore[attr-defined] diff --git a/tests/_sync/test_http2_goaway.py b/tests/_sync/test_http2_goaway.py new file mode 100644 index 00000000..23710dde --- /dev/null +++ b/tests/_sync/test_http2_goaway.py @@ -0,0 +1,828 @@ +""" +Comprehensive tests for HTTP/2 GOAWAY handling. + +These tests cover the new GOAWAY functionality introduced to handle race conditions +when servers send GOAWAY frames during HTTP/2 connections. Key features tested: + +1. ConnectionGoingAway exception properties (is_safe_to_retry, is_graceful_shutdown, may_have_side_effects) +2. DRAINING connection state for graceful shutdowns +3. Request phase tracking (headers_sent, body_sent) +4. Pool-level retry logic based on GOAWAY context +5. Various race conditions between GOAWAY and request processing +""" + +from __future__ import annotations + +from typing import Any + +import hpack +import hyperframe.frame +import pytest + +import httpcore + +# ============================================================================= +# Tests for ConnectionGoingAway Exception Properties +# ============================================================================= + + +class TestConnectionGoingAwayException: + """Tests for the ConnectionGoingAway exception class and its properties.""" + + def test_is_safe_to_retry_when_stream_id_greater_than_last_stream_id(self): + """ + Per RFC 7540 Section 6.8: streams with IDs > last_stream_id are guaranteed + unprocessed and safe to retry. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=3, # > last_stream_id + headers_sent=True, + body_sent=True, + ) + assert exc.is_safe_to_retry is True + + def test_is_not_safe_to_retry_when_stream_id_equals_last_stream_id(self): + """ + Streams with IDs <= last_stream_id may have been processed by the server. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=1, # == last_stream_id + headers_sent=True, + body_sent=True, + ) + assert exc.is_safe_to_retry is False + + def test_is_not_safe_to_retry_when_stream_id_less_than_last_stream_id(self): + """ + Streams with IDs <= last_stream_id may have been processed by the server. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=5, + error_code=0, + request_stream_id=1, # < last_stream_id + headers_sent=True, + body_sent=True, + ) + assert exc.is_safe_to_retry is False + + def test_is_graceful_shutdown_when_error_code_is_zero(self): + """ + NO_ERROR (0x0) indicates administrative shutdown such as server restart, + connection limit reached, or idle timeout. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, # NO_ERROR + request_stream_id=1, + ) + assert exc.is_graceful_shutdown is True + + def test_is_not_graceful_shutdown_when_error_code_is_nonzero(self): + """ + Non-zero error codes indicate an error condition. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=1, # PROTOCOL_ERROR + request_stream_id=1, + ) + assert exc.is_graceful_shutdown is False + + def test_may_have_side_effects_when_stream_id_greater_than_last_stream_id(self): + """ + If stream_id > last_stream_id, the request was guaranteed unprocessed, + so no side effects are possible. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=3, # > last_stream_id + headers_sent=True, # Even with headers sent, no side effects possible + body_sent=True, + ) + assert exc.may_have_side_effects is False + + def test_may_have_side_effects_when_headers_sent(self): + """ + If stream_id <= last_stream_id AND headers were sent, side effects are possible. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=1, # <= last_stream_id + headers_sent=True, + body_sent=False, + ) + assert exc.may_have_side_effects is True + + def test_may_have_side_effects_when_body_sent(self): + """ + If stream_id <= last_stream_id AND body was sent, side effects are possible. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=1, # <= last_stream_id + headers_sent=False, + body_sent=True, + ) + assert exc.may_have_side_effects is True + + def test_no_side_effects_when_nothing_sent(self): + """ + If stream_id <= last_stream_id but nothing was sent, no side effects. + """ + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=1, # <= last_stream_id + headers_sent=False, + body_sent=False, + ) + assert exc.may_have_side_effects is False + + def test_repr(self): + """Test the __repr__ method provides useful debugging info.""" + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=3, + headers_sent=True, + body_sent=True, + ) + repr_str = repr(exc) + assert "ConnectionGoingAway" in repr_str + assert "last_stream_id=1" in repr_str + assert "error_code=0" in repr_str + assert "request_stream_id=3" in repr_str + assert "is_safe_to_retry=True" in repr_str + assert "is_graceful_shutdown=True" in repr_str + + def test_inheritance_from_connection_not_available(self): + """ConnectionGoingAway should be a subclass of ConnectionNotAvailable.""" + exc = httpcore.ConnectionGoingAway( + "GOAWAY received", + last_stream_id=1, + error_code=0, + request_stream_id=1, + ) + assert isinstance(exc, httpcore.ConnectionNotAvailable) + + +# ============================================================================= +# Tests for HTTP/2 Connection GOAWAY Handling +# ============================================================================= + + + +def test_http2_goaway_non_graceful_shutdown(): + """ + Non-graceful shutdown (error_code != 0) should raise ConnectionGoingAway + with is_graceful_shutdown=False. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.MockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + # Non-graceful GOAWAY with PROTOCOL_ERROR (error_code=1) + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=1, last_stream_id=1 + ).serialize(), + b"", + ] + ) + with httpcore.HTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + # First request should fail with ConnectionGoingAway due to non-graceful GOAWAY + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + conn.request("GET", "https://example.com/") + + # Verify it's not a graceful shutdown + assert exc_info.value.is_graceful_shutdown is False + assert exc_info.value.error_code == 1 + + + +def test_http2_goaway_graceful_shutdown_properties(): + """ + When GOAWAY with NO_ERROR is received, the exception should have + is_graceful_shutdown=True. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.MockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + # Graceful GOAWAY with NO_ERROR + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=1 + ).serialize(), + b"", + ] + ) + with httpcore.HTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + # Request should raise ConnectionGoingAway since GOAWAY is received + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + conn.request("GET", "https://example.com/") + + # Should be a graceful shutdown + assert exc_info.value.is_graceful_shutdown is True + assert exc_info.value.error_code == 0 + + + +def test_http2_goaway_stream_id_greater_than_last_stream_id(): + """ + When stream_id > last_stream_id, the request is guaranteed unprocessed + and should raise ConnectionGoingAway with is_safe_to_retry=True. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.MockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + # GOAWAY with last_stream_id=0 before any streams were processed + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=0 + ).serialize(), + b"", + ] + ) + with httpcore.HTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + conn.request("GET", "https://example.com/") + + # Stream 1 > last_stream_id (0), so safe to retry + assert exc_info.value.request_stream_id == 1 + assert exc_info.value.last_stream_id == 0 + assert exc_info.value.is_safe_to_retry is True + + + +def test_http2_goaway_stream_id_less_than_or_equal_to_last_stream_id(): + """ + When stream_id <= last_stream_id and connection is not DRAINING, + the request may have been processed and should raise ConnectionGoingAway. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.MockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + # GOAWAY with last_stream_id=1, so stream 1 may have been processed + # Using error_code=1 to trigger non-DRAINING state + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=1, last_stream_id=1 + ).serialize(), + b"", + ] + ) + with httpcore.HTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + conn.request("GET", "https://example.com/") + + # Stream 1 <= last_stream_id (1), NOT safe to retry + assert exc_info.value.request_stream_id == 1 + assert exc_info.value.last_stream_id == 1 + assert exc_info.value.is_safe_to_retry is False + + + +def test_http2_server_disconnect_after_goaway(): + """ + When server disconnects after sending GOAWAY, the exception should + include GOAWAY context. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.MockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + # GOAWAY followed immediately by disconnect + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=0 + ).serialize(), + b"", # Server disconnect + ] + ) + with httpcore.HTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + conn.request("GET", "https://example.com/") + + # Should include GOAWAY context + assert exc_info.value.last_stream_id == 0 + assert exc_info.value.is_graceful_shutdown is True + + + +def test_http2_tracks_request_phase_headers_sent(): + """ + The connection should track when headers have been sent for GOAWAY context. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.MockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + # GOAWAY after headers would be sent but before response + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=1 + ).serialize(), + b"", + ] + ) + with httpcore.HTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + conn.request("GET", "https://example.com/") + + # Headers should have been sent + assert exc_info.value.headers_sent is True + + + +def test_http2_tracks_request_phase_body_sent(): + """ + The connection should track when body has been sent for GOAWAY context. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.MockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + # GOAWAY after body would be sent but before response + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=1 + ).serialize(), + b"", + ] + ) + with httpcore.HTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + conn.request( + "POST", + "https://example.com/", + headers={b"content-length": b"11"}, + content=b"Hello World", + ) + + # Body should have been sent + assert exc_info.value.body_sent is True + + + +def test_http2_draining_connection_goaway_after_complete_response(): + """ + When GOAWAY is sent after a complete response, the first request succeeds. + The GOAWAY is only discovered on the next request attempt, which then fails. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.MockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + hyperframe.frame.DataFrame( + stream_id=1, data=b"Hello, world!", flags=["END_STREAM"] + ).serialize(), + # GOAWAY after the first response completes - discovered on next request + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=1 + ).serialize(), + b"", # Disconnect after GOAWAY + ] + ) + with httpcore.HTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + # First request should complete successfully + response = conn.request("GET", "https://example.com/") + assert response.status == 200 + assert response.content == b"Hello, world!" + + # Connection appears available because GOAWAY hasn't been read yet + # (it comes after the complete response) + # Second request attempts to use the connection and discovers GOAWAY + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + conn.request("GET", "https://example.com/") + + # The second request (stream 3) > last_stream_id (1), so safe to retry + assert exc_info.value.is_safe_to_retry is True + + +# ============================================================================= +# Custom Mock Backend for Retry Tests +# ============================================================================= + + +class MockBackendWithRetry(httpcore.MockBackend): + """A mock backend that returns different data for each connection.""" + + def __init__(self, buffers_by_connection: list[list[bytes]], http2: bool = False): + self._all_buffers = buffers_by_connection + self._connection_index = 0 + self._http2 = http2 + super().__init__([], http2=http2) + + def connect_tcp( + self, + host: str, + port: int, + timeout: float | None = None, + local_address: str | None = None, + socket_options: Any = None, + ) -> httpcore.MockStream: + if self._connection_index < len(self._all_buffers): + buffer = list(self._all_buffers[self._connection_index]) + self._connection_index += 1 + else: + buffer = [] + return httpcore.MockStream(buffer, http2=self._http2) + + +# ============================================================================= +# Tests for Connection Pool GOAWAY Retry Logic +# ============================================================================= + + + +def test_connection_pool_retries_when_safe_to_retry(): + """ + Connection pool should automatically retry when is_safe_to_retry is True + (stream_id > last_stream_id, guaranteed unprocessed). + """ + network_backend = MockBackendWithRetry( + buffers_by_connection=[ + # First connection: GOAWAY with last_stream_id=0 (stream 1 > 0, safe to retry) + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=0 + ).serialize(), + b"", + ], + # Second connection: normal response + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + hyperframe.frame.DataFrame( + stream_id=1, data=b"Hello, world!", flags=["END_STREAM"] + ).serialize(), + ], + ], + http2=True, + ) + + with httpcore.ConnectionPool( + network_backend=network_backend, + ) as pool: + # Request should succeed after automatic retry + response = pool.request("GET", "https://example.com/") + assert response.status == 200 + assert response.content == b"Hello, world!" + + + +def test_connection_pool_retries_graceful_no_side_effects(): + """ + Connection pool should retry when is_graceful_shutdown is True + AND may_have_side_effects is False (headers not sent yet, stream > last_stream). + """ + network_backend = MockBackendWithRetry( + buffers_by_connection=[ + # First connection: Graceful GOAWAY with last_stream_id=0 + # stream 1 > 0, so safe to retry + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=0 + ).serialize(), + b"", + ], + # Second connection: normal response + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + hyperframe.frame.DataFrame( + stream_id=1, data=b"Success!", flags=["END_STREAM"] + ).serialize(), + ], + ], + http2=True, + ) + + with httpcore.ConnectionPool( + network_backend=network_backend, + ) as pool: + response = pool.request("GET", "https://example.com/") + assert response.status == 200 + assert response.content == b"Success!" + + + +def test_connection_pool_raises_when_not_safe_to_retry(): + """ + Connection pool should raise RemoteProtocolError when is_safe_to_retry is False + and the request may have been processed. + """ + network_backend = MockBackendWithRetry( + buffers_by_connection=[ + # First connection: GOAWAY with last_stream_id=1 (stream 1 <= 1, not safe) + # Non-graceful shutdown (error_code=1) + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=1, last_stream_id=1 + ).serialize(), + b"", + ], + ], + http2=True, + ) + + with httpcore.ConnectionPool( + network_backend=network_backend, + ) as pool: + with pytest.raises(httpcore.RemoteProtocolError) as exc_info: + pool.request("GET", "https://example.com/") + + # Verify the error message indicates GOAWAY was received + assert "GOAWAY" in str(exc_info.value) + + + +def test_connection_pool_raises_when_may_have_side_effects(): + """ + Connection pool should raise RemoteProtocolError when graceful shutdown + but request may have had side effects (headers sent, stream <= last_stream_id). + """ + network_backend = MockBackendWithRetry( + buffers_by_connection=[ + # First connection: Graceful GOAWAY with last_stream_id=1 + # Headers were sent, may have side effects + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=1 + ).serialize(), + b"", + ], + ], + http2=True, + ) + + with httpcore.ConnectionPool( + network_backend=network_backend, + ) as pool: + with pytest.raises(httpcore.RemoteProtocolError) as exc_info: + pool.request("GET", "https://example.com/") + + # Verify the error message indicates GOAWAY was received + assert "GOAWAY" in str(exc_info.value) + + +# ============================================================================= +# Additional tests for specific code paths +# ============================================================================= + + + +def test_http2_goaway_receive_events_with_terminated_connection(): + """ + Test the _receive_events code path when connection is already terminated. + This covers the case where stream_id <= last_stream_id. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.MockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + # GOAWAY with last_stream_id=5 (so stream 1 <= 5, may have been processed) + # Non-graceful shutdown so connection goes to CLOSED, not DRAINING + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=1, last_stream_id=5 + ).serialize(), + b"", + ] + ) + with httpcore.HTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + conn.request("GET", "https://example.com/") + + # Stream 1 <= last_stream_id (5), NOT safe to retry + assert exc_info.value.request_stream_id == 1 + assert exc_info.value.last_stream_id == 5 + assert exc_info.value.is_safe_to_retry is False + + + +def test_http2_goaway_connection_closed_after_graceful_goaway(): + """ + Test that connection is properly closed after handling graceful GOAWAY + when the request fails (due to server disconnect after GOAWAY). + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.MockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.HeadersFrame( + stream_id=1, + data=hpack.Encoder().encode( + [ + (b":status", b"200"), + (b"content-type", b"plain/text"), + ] + ), + flags=["END_HEADERS"], + ).serialize(), + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=1 + ).serialize(), + b"", + ] + ) + with httpcore.HTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + with pytest.raises(httpcore.ConnectionGoingAway): + conn.request("GET", "https://example.com/") + + # After exception handling cleanup, connection should be closed + assert conn.is_closed() + + +# ============================================================================= +# Tests for pool retry edge cases using mock connections +# ============================================================================= + + +class MockConnectionGracefulGoaway(httpcore.ConnectionInterface): + """ + Mock connection that raises ConnectionGoingAway with specific properties + to test the graceful shutdown + no side effects retry path. + """ + + def __init__(self, origin: httpcore.Origin) -> None: + self._origin = origin + self._calls = 0 + + def can_handle_request(self, origin: httpcore.Origin) -> bool: + return origin == self._origin + + def is_available(self) -> bool: + return True + + def is_idle(self) -> bool: + return self._calls == 0 + + def is_closed(self) -> bool: + return False + + def has_expired(self) -> bool: + return False + + def handle_request( + self, request: httpcore.Request + ) -> httpcore.Response: + self._calls += 1 + if self._calls == 1: + # First call: raise ConnectionGoingAway with: + # - stream_id <= last_stream_id (not safe to retry based on RFC) + # - error_code = 0 (graceful shutdown) + # - headers_sent = False, body_sent = False (no side effects) + # This triggers the "elif exc.is_graceful_shutdown and not exc.may_have_side_effects" branch + raise httpcore.ConnectionGoingAway( + "Graceful shutdown before headers sent", + last_stream_id=5, # >= stream_id (1) + error_code=0, # graceful + request_stream_id=1, # <= last_stream_id + headers_sent=False, # no side effects + body_sent=False, + ) + return httpcore.Response(200, content=b"Success after retry!") + + def close(self) -> None: + pass + + def info(self) -> str: + return "MockConnectionGracefulGoaway" + + + +def test_connection_pool_retries_graceful_shutdown_no_headers_sent(): + """ + Test the pool retry path where: + - is_safe_to_retry = False (stream_id <= last_stream_id) + - is_graceful_shutdown = True (error_code = 0) + - may_have_side_effects = False (headers not sent yet) + + This tests line 258 in connection_pool.py. + """ + + # Create a custom pool that returns our mock connection + class TestPool(httpcore.ConnectionPool): + def __init__(self) -> None: + super().__init__() + self._mock_connections: list[MockConnectionGracefulGoaway] = [] + + def create_connection( + self, origin: httpcore.Origin + ) -> MockConnectionGracefulGoaway: + conn = MockConnectionGracefulGoaway(origin) + self._mock_connections.append(conn) + return conn + + with TestPool() as pool: + response = pool.request("GET", "https://example.com/") + assert response.status == 200 + assert response.content == b"Success after retry!" + + # Verify the first connection was called twice (retry happened) + assert pool._mock_connections[0]._calls == 2 # type: ignore[attr-defined] From e40186430b633ac3897e409b68b6091bf3f4c238 Mon Sep 17 00:00:00 2001 From: bayoumi17m Date: Fri, 30 Jan 2026 02:56:18 -0500 Subject: [PATCH 12/15] test: add edge case tests for HTTP/2 GOAWAY handling to achieve 100% coverage (#3) --- tests/_async/test_http2_goaway.py | 168 ++++++++++++++++++++++++++++++ tests/_sync/test_http2_goaway.py | 168 ++++++++++++++++++++++++++++++ 2 files changed, 336 insertions(+) diff --git a/tests/_async/test_http2_goaway.py b/tests/_async/test_http2_goaway.py index b478457e..44bd3ffb 100644 --- a/tests/_async/test_http2_goaway.py +++ b/tests/_async/test_http2_goaway.py @@ -826,3 +826,171 @@ def create_connection( # Verify the first connection was called twice (retry happened) assert pool._mock_connections[0]._calls == 2 # type: ignore[attr-defined] + + +# ============================================================================= +# Tests for edge cases in HTTP/2 GOAWAY handling +# ============================================================================= + + +@pytest.mark.anyio +async def test_http2_receive_events_with_terminated_connection_no_stream_id(): + """ + Test _receive_events when connection is terminated and stream_id is None. + This covers line 435 in http2.py - the RemoteProtocolError path. + + This scenario occurs when _receive_events is called without a stream_id + (e.g., from _wait_for_outgoing_flow) after the connection has terminated. + We test this by directly manipulating the connection state. + """ + import h2.events + + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.AsyncMockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + # We'll manipulate the connection state after initialization + ] + ) + async with httpcore.AsyncHTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + # Directly set _connection_terminated to simulate a terminated connection + # This mimics the state after receiving a GOAWAY but before cleanup + terminated = h2.events.ConnectionTerminated() + terminated.error_code = 0 + terminated.last_stream_id = 0 + terminated.additional_data = b"" + conn._connection_terminated = terminated + + # Create a mock request for the _receive_events call + request = httpcore.Request( + method=b"GET", + url=httpcore.URL("https://example.com/"), + headers=[(b"host", b"example.com")], + ) + + # Call _receive_events with stream_id=None to trigger line 435 + with pytest.raises(httpcore.RemoteProtocolError): + await conn._receive_events(request, stream_id=None) + + +@pytest.mark.anyio +async def test_http2_server_disconnect_with_h2_closed_state(): + """ + Test server disconnect when h2 state machine is CLOSED but _connection_terminated + is not yet set. This covers line 558 in http2.py. + + This simulates a race condition where h2 has processed GOAWAY internally + (transitioning to CLOSED state) but we haven't processed the event yet. + """ + import h2.connection + + origin = httpcore.Origin(b"https", b"example.com", 443) + + # Create a mock stream that sets state to CLOSED BEFORE returning empty data + # This simulates the race condition accurately + class MockStreamWithClosedStateOnDisconnect(httpcore.AsyncMockStream): + def __init__(self, conn_ref: list) -> None: + self._conn_ref = conn_ref + self._read_count = 0 + super().__init__([hyperframe.frame.SettingsFrame().serialize()], http2=True) + + async def read(self, max_bytes: int, timeout: float | None = None) -> bytes: + self._read_count += 1 + if self._read_count == 1: + # First read returns settings + return hyperframe.frame.SettingsFrame().serialize() + # Before returning empty (disconnect), set h2 state to CLOSED + # This simulates h2 having processed GOAWAY internally + if self._conn_ref and self._conn_ref[0]: + self._conn_ref[ + 0 + ]._h2_state.state_machine.state = h2.connection.ConnectionState.CLOSED + self._conn_ref[0]._connection_terminated = None + return b"" # Server disconnect + + conn_ref: list = [] + stream = MockStreamWithClosedStateOnDisconnect(conn_ref) + + async with httpcore.AsyncHTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + conn_ref.append(conn) + + # Set up stream tracking for the request + conn._stream_requests[1] = {"headers_sent": True, "body_sent": False} + + # Create a mock request + request = httpcore.Request( + method=b"GET", + url=httpcore.URL("https://example.com/"), + headers=[(b"host", b"example.com")], + ) + + # First call consumes the initial settings frame + await conn._read_incoming_data(request, stream_id=1) + + # Second call should hit line 558 when mock returns empty data + # and sets h2 state to CLOSED + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + await conn._read_incoming_data(request, stream_id=1) + + # Verify the exception has the expected properties + assert exc_info.value.request_stream_id == 1 + assert exc_info.value.error_code == 0 # Assumed graceful + + +@pytest.mark.anyio +async def test_http2_protocol_error_with_h2_closed_state(): + """ + Test h2 ProtocolError when state machine is CLOSED. + This covers lines 213-225 in http2.py. + + This simulates the race condition where h2 raises a ProtocolError + and the state machine is in CLOSED state, but _connection_terminated + is not yet set. + """ + import h2.connection + + origin = httpcore.Origin(b"https", b"example.com", 443) + + # Create a mock stream that sets state to CLOSED during write + # This causes h2 to raise ProtocolError when trying to read the next frame + class MockStreamWithClosedOnWrite(httpcore.AsyncMockStream): + def __init__(self, conn_ref: list) -> None: + self._conn_ref = conn_ref + self._write_count = 0 + super().__init__([hyperframe.frame.SettingsFrame().serialize()], http2=True) + + async def write(self, data: bytes, timeout: float | None = None) -> None: + self._write_count += 1 + # After first write (settings ACK), set state to CLOSED + # to simulate race condition during request sending + if self._write_count > 1 and self._conn_ref and self._conn_ref[0]: + self._conn_ref[ + 0 + ]._h2_state.state_machine.state = h2.connection.ConnectionState.CLOSED + self._conn_ref[0]._connection_terminated = None + + conn_ref: list = [] + stream = MockStreamWithClosedOnWrite(conn_ref) + + async with httpcore.AsyncHTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + conn_ref.append(conn) + + # Use handle_async_request which has the try-except block + # The request will fail when h2 raises ProtocolError in CLOSED state + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + await conn.handle_async_request( + httpcore.Request( + method=b"GET", + url=httpcore.URL("https://example.com/"), + headers=[(b"host", b"example.com")], + ) + ) + + # Verify the exception properties + assert exc_info.value.error_code == 0 # Assumed graceful diff --git a/tests/_sync/test_http2_goaway.py b/tests/_sync/test_http2_goaway.py index 23710dde..6d106588 100644 --- a/tests/_sync/test_http2_goaway.py +++ b/tests/_sync/test_http2_goaway.py @@ -826,3 +826,171 @@ def create_connection( # Verify the first connection was called twice (retry happened) assert pool._mock_connections[0]._calls == 2 # type: ignore[attr-defined] + + +# ============================================================================= +# Tests for edge cases in HTTP/2 GOAWAY handling +# ============================================================================= + + + +def test_http2_receive_events_with_terminated_connection_no_stream_id(): + """ + Test _receive_events when connection is terminated and stream_id is None. + This covers line 435 in http2.py - the RemoteProtocolError path. + + This scenario occurs when _receive_events is called without a stream_id + (e.g., from _wait_for_outgoing_flow) after the connection has terminated. + We test this by directly manipulating the connection state. + """ + import h2.events + + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = httpcore.MockStream( + [ + hyperframe.frame.SettingsFrame().serialize(), + # We'll manipulate the connection state after initialization + ] + ) + with httpcore.HTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + # Directly set _connection_terminated to simulate a terminated connection + # This mimics the state after receiving a GOAWAY but before cleanup + terminated = h2.events.ConnectionTerminated() + terminated.error_code = 0 + terminated.last_stream_id = 0 + terminated.additional_data = b"" + conn._connection_terminated = terminated + + # Create a mock request for the _receive_events call + request = httpcore.Request( + method=b"GET", + url=httpcore.URL("https://example.com/"), + headers=[(b"host", b"example.com")], + ) + + # Call _receive_events with stream_id=None to trigger line 435 + with pytest.raises(httpcore.RemoteProtocolError): + conn._receive_events(request, stream_id=None) + + + +def test_http2_server_disconnect_with_h2_closed_state(): + """ + Test server disconnect when h2 state machine is CLOSED but _connection_terminated + is not yet set. This covers line 558 in http2.py. + + This simulates a race condition where h2 has processed GOAWAY internally + (transitioning to CLOSED state) but we haven't processed the event yet. + """ + import h2.connection + + origin = httpcore.Origin(b"https", b"example.com", 443) + + # Create a mock stream that sets state to CLOSED BEFORE returning empty data + # This simulates the race condition accurately + class MockStreamWithClosedStateOnDisconnect(httpcore.MockStream): + def __init__(self, conn_ref: list) -> None: + self._conn_ref = conn_ref + self._read_count = 0 + super().__init__([hyperframe.frame.SettingsFrame().serialize()], http2=True) + + def read(self, max_bytes: int, timeout: float | None = None) -> bytes: + self._read_count += 1 + if self._read_count == 1: + # First read returns settings + return hyperframe.frame.SettingsFrame().serialize() + # Before returning empty (disconnect), set h2 state to CLOSED + # This simulates h2 having processed GOAWAY internally + if self._conn_ref and self._conn_ref[0]: + self._conn_ref[ + 0 + ]._h2_state.state_machine.state = h2.connection.ConnectionState.CLOSED + self._conn_ref[0]._connection_terminated = None + return b"" # Server disconnect + + conn_ref: list = [] + stream = MockStreamWithClosedStateOnDisconnect(conn_ref) + + with httpcore.HTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + conn_ref.append(conn) + + # Set up stream tracking for the request + conn._stream_requests[1] = {"headers_sent": True, "body_sent": False} + + # Create a mock request + request = httpcore.Request( + method=b"GET", + url=httpcore.URL("https://example.com/"), + headers=[(b"host", b"example.com")], + ) + + # First call consumes the initial settings frame + conn._read_incoming_data(request, stream_id=1) + + # Second call should hit line 558 when mock returns empty data + # and sets h2 state to CLOSED + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + conn._read_incoming_data(request, stream_id=1) + + # Verify the exception has the expected properties + assert exc_info.value.request_stream_id == 1 + assert exc_info.value.error_code == 0 # Assumed graceful + + + +def test_http2_protocol_error_with_h2_closed_state(): + """ + Test h2 ProtocolError when state machine is CLOSED. + This covers lines 213-225 in http2.py. + + This simulates the race condition where h2 raises a ProtocolError + and the state machine is in CLOSED state, but _connection_terminated + is not yet set. + """ + import h2.connection + + origin = httpcore.Origin(b"https", b"example.com", 443) + + # Create a mock stream that sets state to CLOSED during write + # This causes h2 to raise ProtocolError when trying to read the next frame + class MockStreamWithClosedOnWrite(httpcore.MockStream): + def __init__(self, conn_ref: list) -> None: + self._conn_ref = conn_ref + self._write_count = 0 + super().__init__([hyperframe.frame.SettingsFrame().serialize()], http2=True) + + def write(self, data: bytes, timeout: float | None = None) -> None: + self._write_count += 1 + # After first write (settings ACK), set state to CLOSED + # to simulate race condition during request sending + if self._write_count > 1 and self._conn_ref and self._conn_ref[0]: + self._conn_ref[ + 0 + ]._h2_state.state_machine.state = h2.connection.ConnectionState.CLOSED + self._conn_ref[0]._connection_terminated = None + + conn_ref: list = [] + stream = MockStreamWithClosedOnWrite(conn_ref) + + with httpcore.HTTP2Connection( + origin=origin, stream=stream, keepalive_expiry=5.0 + ) as conn: + conn_ref.append(conn) + + # Use handle_request which has the try-except block + # The request will fail when h2 raises ProtocolError in CLOSED state + with pytest.raises(httpcore.ConnectionGoingAway) as exc_info: + conn.handle_request( + httpcore.Request( + method=b"GET", + url=httpcore.URL("https://example.com/"), + headers=[(b"host", b"example.com")], + ) + ) + + # Verify the exception properties + assert exc_info.value.error_code == 0 # Assumed graceful From a3615da91d5adfb35edbf6a735d30414d36ddde6 Mon Sep 17 00:00:00 2001 From: bayoumi17m Date: Fri, 30 Jan 2026 11:44:48 -0500 Subject: [PATCH 13/15] fix: handle http2 goaway race conditions (#4) --- tests/_async/test_http2_goaway.py | 10 +++++----- tests/_sync/test_http2_goaway.py | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/_async/test_http2_goaway.py b/tests/_async/test_http2_goaway.py index 44bd3ffb..686d62cc 100644 --- a/tests/_async/test_http2_goaway.py +++ b/tests/_async/test_http2_goaway.py @@ -891,7 +891,7 @@ async def test_http2_server_disconnect_with_h2_closed_state(): # Create a mock stream that sets state to CLOSED BEFORE returning empty data # This simulates the race condition accurately class MockStreamWithClosedStateOnDisconnect(httpcore.AsyncMockStream): - def __init__(self, conn_ref: list) -> None: + def __init__(self, conn_ref: list[httpcore.AsyncHTTP2Connection]) -> None: self._conn_ref = conn_ref self._read_count = 0 super().__init__([hyperframe.frame.SettingsFrame().serialize()], http2=True) @@ -900,7 +900,7 @@ async def read(self, max_bytes: int, timeout: float | None = None) -> bytes: self._read_count += 1 if self._read_count == 1: # First read returns settings - return hyperframe.frame.SettingsFrame().serialize() + return bytes(hyperframe.frame.SettingsFrame().serialize()) # Before returning empty (disconnect), set h2 state to CLOSED # This simulates h2 having processed GOAWAY internally if self._conn_ref and self._conn_ref[0]: @@ -910,7 +910,7 @@ async def read(self, max_bytes: int, timeout: float | None = None) -> bytes: self._conn_ref[0]._connection_terminated = None return b"" # Server disconnect - conn_ref: list = [] + conn_ref: list[httpcore.AsyncHTTP2Connection] = [] stream = MockStreamWithClosedStateOnDisconnect(conn_ref) async with httpcore.AsyncHTTP2Connection( @@ -958,7 +958,7 @@ async def test_http2_protocol_error_with_h2_closed_state(): # Create a mock stream that sets state to CLOSED during write # This causes h2 to raise ProtocolError when trying to read the next frame class MockStreamWithClosedOnWrite(httpcore.AsyncMockStream): - def __init__(self, conn_ref: list) -> None: + def __init__(self, conn_ref: list[httpcore.AsyncHTTP2Connection]) -> None: self._conn_ref = conn_ref self._write_count = 0 super().__init__([hyperframe.frame.SettingsFrame().serialize()], http2=True) @@ -973,7 +973,7 @@ async def write(self, data: bytes, timeout: float | None = None) -> None: ]._h2_state.state_machine.state = h2.connection.ConnectionState.CLOSED self._conn_ref[0]._connection_terminated = None - conn_ref: list = [] + conn_ref: list[httpcore.AsyncHTTP2Connection] = [] stream = MockStreamWithClosedOnWrite(conn_ref) async with httpcore.AsyncHTTP2Connection( diff --git a/tests/_sync/test_http2_goaway.py b/tests/_sync/test_http2_goaway.py index 6d106588..517529d0 100644 --- a/tests/_sync/test_http2_goaway.py +++ b/tests/_sync/test_http2_goaway.py @@ -891,7 +891,7 @@ def test_http2_server_disconnect_with_h2_closed_state(): # Create a mock stream that sets state to CLOSED BEFORE returning empty data # This simulates the race condition accurately class MockStreamWithClosedStateOnDisconnect(httpcore.MockStream): - def __init__(self, conn_ref: list) -> None: + def __init__(self, conn_ref: list[httpcore.HTTP2Connection]) -> None: self._conn_ref = conn_ref self._read_count = 0 super().__init__([hyperframe.frame.SettingsFrame().serialize()], http2=True) @@ -900,7 +900,7 @@ def read(self, max_bytes: int, timeout: float | None = None) -> bytes: self._read_count += 1 if self._read_count == 1: # First read returns settings - return hyperframe.frame.SettingsFrame().serialize() + return bytes(hyperframe.frame.SettingsFrame().serialize()) # Before returning empty (disconnect), set h2 state to CLOSED # This simulates h2 having processed GOAWAY internally if self._conn_ref and self._conn_ref[0]: @@ -910,7 +910,7 @@ def read(self, max_bytes: int, timeout: float | None = None) -> bytes: self._conn_ref[0]._connection_terminated = None return b"" # Server disconnect - conn_ref: list = [] + conn_ref: list[httpcore.HTTP2Connection] = [] stream = MockStreamWithClosedStateOnDisconnect(conn_ref) with httpcore.HTTP2Connection( @@ -958,7 +958,7 @@ def test_http2_protocol_error_with_h2_closed_state(): # Create a mock stream that sets state to CLOSED during write # This causes h2 to raise ProtocolError when trying to read the next frame class MockStreamWithClosedOnWrite(httpcore.MockStream): - def __init__(self, conn_ref: list) -> None: + def __init__(self, conn_ref: list[httpcore.HTTP2Connection]) -> None: self._conn_ref = conn_ref self._write_count = 0 super().__init__([hyperframe.frame.SettingsFrame().serialize()], http2=True) @@ -973,7 +973,7 @@ def write(self, data: bytes, timeout: float | None = None) -> None: ]._h2_state.state_machine.state = h2.connection.ConnectionState.CLOSED self._conn_ref[0]._connection_terminated = None - conn_ref: list = [] + conn_ref: list[httpcore.HTTP2Connection] = [] stream = MockStreamWithClosedOnWrite(conn_ref) with httpcore.HTTP2Connection( From 91706f9a61cbc9be70c63769738c5582c1550da9 Mon Sep 17 00:00:00 2001 From: bayoumi17m Date: Fri, 30 Jan 2026 14:14:36 -0500 Subject: [PATCH 14/15] test: add missing test coverage for GOAWAY test helper classes (#5) --- tests/_async/test_http2_goaway.py | 42 +++++++++++++++++++++++++++++++ tests/_sync/test_http2_goaway.py | 40 +++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/tests/_async/test_http2_goaway.py b/tests/_async/test_http2_goaway.py index 686d62cc..a9a6aeb4 100644 --- a/tests/_async/test_http2_goaway.py +++ b/tests/_async/test_http2_goaway.py @@ -994,3 +994,45 @@ async def write(self, data: bytes, timeout: float | None = None) -> None: # Verify the exception properties assert exc_info.value.error_code == 0 # Assumed graceful + + +@pytest.mark.anyio +async def test_mock_backend_with_retry_exhausted_buffers(): + """ + Test the AsyncMockBackendWithRetry when more connections are made + than buffers provided. This covers the else branch at line 497. + """ + network_backend = AsyncMockBackendWithRetry( + buffers_by_connection=[ + # Only one buffer set provided + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=0 + ).serialize(), + b"", + ], + ], + http2=True, + ) + + # First connection uses the buffer + stream1 = await network_backend.connect_tcp("example.com", 443) + assert stream1 is not None + + # Second connection should hit the else branch (empty buffer) + stream2 = await network_backend.connect_tcp("example.com", 443) + assert stream2 is not None + + +@pytest.mark.anyio +async def test_mock_connection_graceful_goaway_info(): + """ + Test the info() method of MockConnectionGracefulGoaway. + This covers line 795. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + mock_conn = MockConnectionGracefulGoaway(origin) + + # Verify the info method returns the expected string + assert mock_conn.info() == "MockConnectionGracefulGoaway" diff --git a/tests/_sync/test_http2_goaway.py b/tests/_sync/test_http2_goaway.py index 517529d0..b8154c32 100644 --- a/tests/_sync/test_http2_goaway.py +++ b/tests/_sync/test_http2_goaway.py @@ -994,3 +994,43 @@ def write(self, data: bytes, timeout: float | None = None) -> None: # Verify the exception properties assert exc_info.value.error_code == 0 # Assumed graceful + + +def test_mock_backend_with_retry_exhausted_buffers(): + """ + Test the MockBackendWithRetry when more connections are made + than buffers provided. This covers the else branch at line 497. + """ + network_backend = MockBackendWithRetry( + buffers_by_connection=[ + # Only one buffer set provided + [ + hyperframe.frame.SettingsFrame().serialize(), + hyperframe.frame.GoAwayFrame( + stream_id=0, error_code=0, last_stream_id=0 + ).serialize(), + b"", + ], + ], + http2=True, + ) + + # First connection uses the buffer + stream1 = network_backend.connect_tcp("example.com", 443) + assert stream1 is not None + + # Second connection should hit the else branch (empty buffer) + stream2 = network_backend.connect_tcp("example.com", 443) + assert stream2 is not None + + +def test_mock_connection_graceful_goaway_info(): + """ + Test the info() method of MockConnectionGracefulGoaway. + This covers line 795. + """ + origin = httpcore.Origin(b"https", b"example.com", 443) + mock_conn = MockConnectionGracefulGoaway(origin) + + # Verify the info method returns the expected string + assert mock_conn.info() == "MockConnectionGracefulGoaway" From 1d8e8b719a7203aceef94562ab635d681b48c1ba Mon Sep 17 00:00:00 2001 From: bayoumi17m Date: Fri, 30 Jan 2026 14:54:19 -0500 Subject: [PATCH 15/15] fix: regenerate sync files to fix unasync mismatch (#6) --- tests/_sync/test_http2_goaway.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/_sync/test_http2_goaway.py b/tests/_sync/test_http2_goaway.py index b8154c32..493674b4 100644 --- a/tests/_sync/test_http2_goaway.py +++ b/tests/_sync/test_http2_goaway.py @@ -996,6 +996,7 @@ def write(self, data: bytes, timeout: float | None = None) -> None: assert exc_info.value.error_code == 0 # Assumed graceful + def test_mock_backend_with_retry_exhausted_buffers(): """ Test the MockBackendWithRetry when more connections are made @@ -1024,6 +1025,7 @@ def test_mock_backend_with_retry_exhausted_buffers(): assert stream2 is not None + def test_mock_connection_graceful_goaway_info(): """ Test the info() method of MockConnectionGracefulGoaway.