| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425 |
- """
- h2/stream
- ~~~~~~~~~
- An implementation of a HTTP/2 stream.
- """
- from __future__ import annotations
- from enum import Enum, IntEnum
- from typing import TYPE_CHECKING, Any, Union, cast
- from hpack import HeaderTuple
- from hyperframe.frame import AltSvcFrame, ContinuationFrame, DataFrame, Frame, HeadersFrame, PushPromiseFrame, RstStreamFrame, WindowUpdateFrame
- from .errors import ErrorCodes, _error_code_from_int
- from .events import (
- AlternativeServiceAvailable,
- DataReceived,
- Event,
- InformationalResponseReceived,
- PushedStreamReceived,
- RequestReceived,
- ResponseReceived,
- StreamEnded,
- StreamReset,
- TrailersReceived,
- WindowUpdated,
- _PushedRequestSent,
- _RequestSent,
- _ResponseSent,
- _TrailersSent,
- )
- from .exceptions import FlowControlError, InvalidBodyLengthError, ProtocolError, StreamClosedError
- from .utilities import (
- HeaderValidationFlags,
- authority_from_headers,
- extract_method_header,
- guard_increment_window,
- is_informational_response,
- normalize_inbound_headers,
- normalize_outbound_headers,
- utf8_encode_headers,
- validate_headers,
- validate_outbound_headers,
- )
- from .windows import WindowManager
- if TYPE_CHECKING: # pragma: no cover
- from collections.abc import Callable, Generator, Iterable
- from hpack.hpack import Encoder
- from hpack.struct import Header, HeaderWeaklyTyped
- from .config import H2Configuration
- class StreamState(IntEnum):
- IDLE = 0
- RESERVED_REMOTE = 1
- RESERVED_LOCAL = 2
- OPEN = 3
- HALF_CLOSED_REMOTE = 4
- HALF_CLOSED_LOCAL = 5
- CLOSED = 6
- class StreamInputs(Enum):
- SEND_HEADERS = 0
- SEND_PUSH_PROMISE = 1
- SEND_RST_STREAM = 2
- SEND_DATA = 3
- SEND_WINDOW_UPDATE = 4
- SEND_END_STREAM = 5
- RECV_HEADERS = 6
- RECV_PUSH_PROMISE = 7
- RECV_RST_STREAM = 8
- RECV_DATA = 9
- RECV_WINDOW_UPDATE = 10
- RECV_END_STREAM = 11
- RECV_CONTINUATION = 12 # Added in 2.0.0
- SEND_INFORMATIONAL_HEADERS = 13 # Added in 2.2.0
- RECV_INFORMATIONAL_HEADERS = 14 # Added in 2.2.0
- SEND_ALTERNATIVE_SERVICE = 15 # Added in 2.3.0
- RECV_ALTERNATIVE_SERVICE = 16 # Added in 2.3.0
- UPGRADE_CLIENT = 17 # Added 2.3.0
- UPGRADE_SERVER = 18 # Added 2.3.0
- class StreamClosedBy(Enum):
- SEND_END_STREAM = 0
- RECV_END_STREAM = 1
- SEND_RST_STREAM = 2
- RECV_RST_STREAM = 3
- # This array is initialized once, and is indexed by the stream states above.
- # It indicates whether a stream in the given state is open. The reason we do
- # this is that we potentially check whether a stream in a given state is open
- # quite frequently: given that we check so often, we should do so in the
- # fastest and most performant way possible.
- STREAM_OPEN = [False for _ in range(len(StreamState))]
- STREAM_OPEN[StreamState.OPEN] = True
- STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True
- STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True
- class H2StreamStateMachine:
- """
- A single HTTP/2 stream state machine.
- This stream object implements basically the state machine described in
- RFC 7540 section 5.1.
- :param stream_id: The stream ID of this stream. This is stored primarily
- for logging purposes.
- """
- def __init__(self, stream_id: int) -> None:
- self.state = StreamState.IDLE
- self.stream_id = stream_id
- #: Whether this peer is the client side of this stream.
- self.client: bool | None = None
- # Whether trailers have been sent/received on this stream or not.
- self.headers_sent: bool | None = None
- self.trailers_sent: bool | None = None
- self.headers_received: bool | None = None
- self.trailers_received: bool | None = None
- # How the stream was closed. One of StreamClosedBy.
- self.stream_closed_by: StreamClosedBy | None = None
- def process_input(self, input_: StreamInputs) -> list[Event]:
- """
- Process a specific input in the state machine.
- """
- if not isinstance(input_, StreamInputs):
- msg = "Input must be an instance of StreamInputs"
- raise ValueError(msg) # noqa: TRY004
- try:
- func, target_state = _transitions[(self.state, input_)]
- except KeyError as err:
- old_state = self.state
- self.state = StreamState.CLOSED
- msg = f"Invalid input {input_} in state {old_state}"
- raise ProtocolError(msg) from err
- else:
- previous_state = self.state
- self.state = target_state
- if func is not None:
- try:
- return func(self, previous_state)
- except ProtocolError:
- self.state = StreamState.CLOSED
- raise
- except AssertionError as err: # pragma: no cover
- self.state = StreamState.CLOSED
- raise ProtocolError(err) from err
- return []
- def request_sent(self, previous_state: StreamState) -> list[Event]:
- """
- Fires when a request is sent.
- """
- self.client = True
- self.headers_sent = True
- event = _RequestSent()
- return [event]
- def response_sent(self, previous_state: StreamState) -> list[Event]:
- """
- Fires when something that should be a response is sent. This 'response'
- may actually be trailers.
- """
- if not self.headers_sent:
- if self.client is True or self.client is None:
- msg = "Client cannot send responses."
- raise ProtocolError(msg)
- self.headers_sent = True
- return [_ResponseSent()]
- assert not self.trailers_sent
- self.trailers_sent = True
- return [_TrailersSent()]
- def request_received(self, previous_state: StreamState) -> list[Event]:
- """
- Fires when a request is received.
- """
- assert not self.headers_received
- assert not self.trailers_received
- self.client = False
- self.headers_received = True
- event = RequestReceived(stream_id=self.stream_id)
- return [event]
- def response_received(self, previous_state: StreamState) -> list[Event]:
- """
- Fires when a response is received. Also disambiguates between responses
- and trailers.
- """
- event: ResponseReceived | TrailersReceived
- if not self.headers_received:
- assert self.client is True
- self.headers_received = True
- event = ResponseReceived(stream_id=self.stream_id)
- else:
- assert not self.trailers_received
- self.trailers_received = True
- event = TrailersReceived(stream_id=self.stream_id)
- event.stream_id = self.stream_id
- return [event]
- def data_received(self, previous_state: StreamState) -> list[Event]:
- """
- Fires when data is received.
- """
- if not self.headers_received:
- msg = "cannot receive data before headers"
- raise ProtocolError(msg)
- event = DataReceived(stream_id=self.stream_id)
- return [event]
- def window_updated(self, previous_state: StreamState) -> list[Event]:
- """
- Fires when a window update frame is received.
- """
- return [WindowUpdated(stream_id=self.stream_id)]
- def stream_half_closed(self, previous_state: StreamState) -> list[Event]:
- """
- Fires when an END_STREAM flag is received in the OPEN state,
- transitioning this stream to a HALF_CLOSED_REMOTE state.
- """
- event = StreamEnded(stream_id=self.stream_id)
- return [event]
- def stream_ended(self, previous_state: StreamState) -> list[Event]:
- """
- Fires when a stream is cleanly ended.
- """
- self.stream_closed_by = StreamClosedBy.RECV_END_STREAM
- event = StreamEnded(stream_id=self.stream_id)
- return [event]
- def stream_reset(self, previous_state: StreamState) -> list[Event]:
- """
- Fired when a stream is forcefully reset.
- """
- self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM
- return [StreamReset(stream_id=self.stream_id)]
- def send_new_pushed_stream(self, previous_state: StreamState) -> list[Event]:
- """
- Fires on the newly pushed stream, when pushed by the local peer.
- No event here, but definitionally this peer must be a server.
- """
- assert self.client is None
- self.client = False
- self.headers_received = True
- return []
- def recv_new_pushed_stream(self, previous_state: StreamState) -> list[Event]:
- """
- Fires on the newly pushed stream, when pushed by the remote peer.
- No event here, but definitionally this peer must be a client.
- """
- assert self.client is None
- self.client = True
- self.headers_sent = True
- return []
- def send_push_promise(self, previous_state: StreamState) -> list[Event]:
- """
- Fires on the already-existing stream when a PUSH_PROMISE frame is sent.
- We may only send PUSH_PROMISE frames if we're a server.
- """
- if self.client is True:
- msg = "Cannot push streams from client peers."
- raise ProtocolError(msg)
- event = _PushedRequestSent()
- return [event]
- def recv_push_promise(self, previous_state: StreamState) -> list[Event]:
- """
- Fires on the already-existing stream when a PUSH_PROMISE frame is
- received. We may only receive PUSH_PROMISE frames if we're a client.
- Fires a PushedStreamReceived event.
- """
- if not self.client:
- if self.client is None: # pragma: no cover
- msg = "Idle streams cannot receive pushes"
- else: # pragma: no cover
- msg = "Cannot receive pushed streams as a server"
- raise ProtocolError(msg)
- event = PushedStreamReceived()
- event.parent_stream_id = self.stream_id
- return [event]
- def send_end_stream(self, previous_state: StreamState) -> list[Event]:
- """
- Called when an attempt is made to send END_STREAM in the
- HALF_CLOSED_REMOTE state.
- """
- self.stream_closed_by = StreamClosedBy.SEND_END_STREAM
- return []
- def send_reset_stream(self, previous_state: StreamState) -> list[Event]:
- """
- Called when an attempt is made to send RST_STREAM in a non-closed
- stream state.
- """
- self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
- return []
- def reset_stream_on_error(self, previous_state: StreamState) -> list[Event]:
- """
- Called when we need to forcefully emit another RST_STREAM frame on
- behalf of the state machine.
- If this is the first time we've done this, we should also hang an event
- off the StreamClosedError so that the user can be informed. We know
- it's the first time we've done this if the stream is currently in a
- state other than CLOSED.
- """
- self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
- error = StreamClosedError(self.stream_id)
- error._events = [
- StreamReset(
- stream_id=self.stream_id,
- error_code=ErrorCodes.STREAM_CLOSED,
- remote_reset=False,
- ),
- ]
- raise error
- def recv_on_closed_stream(self, previous_state: StreamState) -> list[Event]:
- """
- Called when an unexpected frame is received on an already-closed
- stream.
- An endpoint that receives an unexpected frame should treat it as
- a stream error or connection error with type STREAM_CLOSED, depending
- on the specific frame. The error handling is done at a higher level:
- this just raises the appropriate error.
- """
- raise StreamClosedError(self.stream_id)
- def send_on_closed_stream(self, previous_state: StreamState) -> list[Event]:
- """
- Called when an attempt is made to send data on an already-closed
- stream.
- This essentially overrides the standard logic by throwing a
- more-specific error: StreamClosedError. This is a ProtocolError, so it
- matches the standard API of the state machine, but provides more detail
- to the user.
- """
- raise StreamClosedError(self.stream_id)
- def recv_push_on_closed_stream(self, previous_state: StreamState) -> list[Event]:
- """
- Called when a PUSH_PROMISE frame is received on a full stop
- stream.
- If the stream was closed by us sending a RST_STREAM frame, then we
- presume that the PUSH_PROMISE was in flight when we reset the parent
- stream. Rathen than accept the new stream, we just reset it.
- Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a
- naturally closed stream is a real problem because it creates a brand
- new stream that the remote peer now believes exists.
- """
- assert self.stream_closed_by is not None
- if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM:
- raise StreamClosedError(self.stream_id)
- msg = "Attempted to push on closed stream."
- raise ProtocolError(msg)
- def send_push_on_closed_stream(self, previous_state: StreamState) -> list[Event]:
- """
- Called when an attempt is made to push on an already-closed stream.
- This essentially overrides the standard logic by providing a more
- useful error message. It's necessary because simply indicating that the
- stream is closed is not enough: there is now a new stream that is not
- allowed to be there. The only recourse is to tear the whole connection
- down.
- """
- msg = "Attempted to push on closed stream."
- raise ProtocolError(msg)
- def send_informational_response(self, previous_state: StreamState) -> list[Event]:
- """
- Called when an informational header block is sent (that is, a block
- where the :status header has a 1XX value).
- Only enforces that these are sent *before* final headers are sent.
- """
- if self.headers_sent:
- msg = "Information response after final response"
- raise ProtocolError(msg)
- event = _ResponseSent()
- return [event]
- def recv_informational_response(self, previous_state: StreamState) -> list[Event]:
- """
- Called when an informational header block is received (that is, a block
- where the :status header has a 1XX value).
- """
- if self.headers_received:
- msg = "Informational response after final response"
- raise ProtocolError(msg)
- return [InformationalResponseReceived(stream_id=self.stream_id)]
- def recv_alt_svc(self, previous_state: StreamState) -> list[Event]:
- """
- Called when receiving an ALTSVC frame.
- RFC 7838 allows us to receive ALTSVC frames at any stream state, which
- is really absurdly overzealous. For that reason, we want to limit the
- states in which we can actually receive it. It's really only sensible
- to receive it after we've sent our own headers and before the server
- has sent its header block: the server can't guarantee that we have any
- state around after it completes its header block, and the server
- doesn't know what origin we're talking about before we've sent ours.
- For that reason, this function applies a few extra checks on both state
- and some of the little state variables we keep around. If those suggest
- an unreasonable situation for the ALTSVC frame to have been sent in,
- we quietly ignore it (as RFC 7838 suggests).
- This function is also *not* always called by the state machine. In some
- states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it,
- because we know the frame cannot be valid in that state (IDLE because
- the server cannot know what origin the stream applies to, CLOSED
- because the server cannot assume we still have state around,
- RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL
- state then *we* are the server).
- """
- # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore
- # them.
- if self.client is False:
- return []
- # If we've received the response headers from the server they can't
- # guarantee we still have any state around. Other implementations
- # (like nghttp2) ignore ALTSVC in this state, so we will too.
- if self.headers_received:
- return []
- # Otherwise, this is a sensible enough frame to have received. Return
- # the event and let it get populated.
- return [AlternativeServiceAvailable()]
- def send_alt_svc(self, previous_state: StreamState) -> list[Event]:
- """
- Called when sending an ALTSVC frame on this stream.
- For consistency with the restrictions we apply on receiving ALTSVC
- frames in ``recv_alt_svc``, we want to restrict when users can send
- ALTSVC frames to the situations when we ourselves would accept them.
- That means: when we are a server, when we have received the request
- headers, and when we have not yet sent our own response headers.
- """
- # We should not send ALTSVC after we've sent response headers, as the
- # client may have disposed of its state.
- if self.headers_sent:
- msg = "Cannot send ALTSVC after sending response headers."
- raise ProtocolError(msg)
- return []
- # STATE MACHINE
- #
- # The stream state machine is defined here to avoid the need to allocate it
- # repeatedly for each stream. It cannot be defined in the stream class because
- # it needs to be able to reference the callbacks defined on the class, but
- # because Python's scoping rules are weird the class object is not actually in
- # scope during the body of the class object.
- #
- # For the sake of clarity, we reproduce the RFC 7540 state machine here:
- #
- # +--------+
- # send PP | | recv PP
- # ,--------| idle |--------.
- # / | | \
- # v +--------+ v
- # +----------+ | +----------+
- # | | | send H / | |
- # ,------| reserved | | recv H | reserved |------.
- # | | (local) | | | (remote) | |
- # | +----------+ v +----------+ |
- # | | +--------+ | |
- # | | recv ES | | send ES | |
- # | send H | ,-------| open |-------. | recv H |
- # | | / | | \ | |
- # | v v +--------+ v v |
- # | +----------+ | +----------+ |
- # | | half | | | half | |
- # | | closed | | send R / | closed | |
- # | | (remote) | | recv R | (local) | |
- # | +----------+ | +----------+ |
- # | | | | |
- # | | send ES / | recv ES / | |
- # | | send R / v send R / | |
- # | | recv R +--------+ recv R | |
- # | send R / `----------->| |<-----------' send R / |
- # | recv R | closed | recv R |
- # `----------------------->| |<----------------------'
- # +--------+
- #
- # send: endpoint sends this frame
- # recv: endpoint receives this frame
- #
- # H: HEADERS frame (with implied CONTINUATIONs)
- # PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
- # ES: END_STREAM flag
- # R: RST_STREAM frame
- #
- # For the purposes of this state machine we treat HEADERS and their
- # associated CONTINUATION frames as a single jumbo frame. The protocol
- # allows/requires this by preventing other frames from being interleved in
- # between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is
- # received without a prior HEADERS frame, it *will* be passed to this state
- # machine. The state machine should always reject that frame, either as an
- # invalid transition or because the stream is closed.
- #
- # There is a confusing relationship around PUSH_PROMISE frames. The state
- # machine above considers them to be frames belonging to the new stream,
- # which is *somewhat* true. However, they are sent with the stream ID of
- # their related stream, and are only sendable in some cases.
- # For this reason, our state machine implementation below allows for
- # PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also
- # in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states.
- # Essentially, for h2, PUSH_PROMISE frames are effectively sent on
- # two streams.
- #
- # The _transitions dictionary contains a mapping of tuples of
- # (state, input) to tuples of (side_effect_function, end_state). This
- # map contains all allowed transitions: anything not in this map is
- # invalid and immediately causes a transition to ``closed``.
- _transitions: dict[
- tuple[StreamState, StreamInputs],
- tuple[Callable[[H2StreamStateMachine, StreamState], list[Event]] | None, StreamState],
- ] = {
- # State: idle
- (StreamState.IDLE, StreamInputs.SEND_HEADERS):
- (H2StreamStateMachine.request_sent, StreamState.OPEN),
- (StreamState.IDLE, StreamInputs.RECV_HEADERS):
- (H2StreamStateMachine.request_received, StreamState.OPEN),
- (StreamState.IDLE, StreamInputs.RECV_DATA):
- (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
- (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE):
- (H2StreamStateMachine.send_new_pushed_stream,
- StreamState.RESERVED_LOCAL),
- (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE):
- (H2StreamStateMachine.recv_new_pushed_stream,
- StreamState.RESERVED_REMOTE),
- (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
- (None, StreamState.IDLE),
- (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT):
- (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL),
- (StreamState.IDLE, StreamInputs.UPGRADE_SERVER):
- (H2StreamStateMachine.request_received,
- StreamState.HALF_CLOSED_REMOTE),
- # State: reserved local
- (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS):
- (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
- (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA):
- (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
- (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
- (None, StreamState.RESERVED_LOCAL),
- (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
- (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL),
- (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM):
- (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
- (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM):
- (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
- (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
- (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL),
- (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
- (None, StreamState.RESERVED_LOCAL),
- # State: reserved remote
- (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS):
- (H2StreamStateMachine.response_received,
- StreamState.HALF_CLOSED_LOCAL),
- (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA):
- (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
- (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
- (None, StreamState.RESERVED_REMOTE),
- (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
- (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE),
- (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM):
- (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
- (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM):
- (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
- (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
- (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE),
- # State: open
- (StreamState.OPEN, StreamInputs.SEND_HEADERS):
- (H2StreamStateMachine.response_sent, StreamState.OPEN),
- (StreamState.OPEN, StreamInputs.RECV_HEADERS):
- (H2StreamStateMachine.response_received, StreamState.OPEN),
- (StreamState.OPEN, StreamInputs.SEND_DATA):
- (None, StreamState.OPEN),
- (StreamState.OPEN, StreamInputs.RECV_DATA):
- (H2StreamStateMachine.data_received, StreamState.OPEN),
- (StreamState.OPEN, StreamInputs.SEND_END_STREAM):
- (None, StreamState.HALF_CLOSED_LOCAL),
- (StreamState.OPEN, StreamInputs.RECV_END_STREAM):
- (H2StreamStateMachine.stream_half_closed,
- StreamState.HALF_CLOSED_REMOTE),
- (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE):
- (None, StreamState.OPEN),
- (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE):
- (H2StreamStateMachine.window_updated, StreamState.OPEN),
- (StreamState.OPEN, StreamInputs.SEND_RST_STREAM):
- (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
- (StreamState.OPEN, StreamInputs.RECV_RST_STREAM):
- (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
- (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE):
- (H2StreamStateMachine.send_push_promise, StreamState.OPEN),
- (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE):
- (H2StreamStateMachine.recv_push_promise, StreamState.OPEN),
- (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS):
- (H2StreamStateMachine.send_informational_response, StreamState.OPEN),
- (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS):
- (H2StreamStateMachine.recv_informational_response, StreamState.OPEN),
- (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE):
- (H2StreamStateMachine.send_alt_svc, StreamState.OPEN),
- (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE):
- (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN),
- # State: half-closed remote
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS):
- (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS):
- (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA):
- (None, StreamState.HALF_CLOSED_REMOTE),
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA):
- (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM):
- (H2StreamStateMachine.send_end_stream, StreamState.CLOSED),
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
- (None, StreamState.HALF_CLOSED_REMOTE),
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
- (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE),
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM):
- (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM):
- (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE):
- (H2StreamStateMachine.send_push_promise,
- StreamState.HALF_CLOSED_REMOTE),
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE):
- (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS):
- (H2StreamStateMachine.send_informational_response,
- StreamState.HALF_CLOSED_REMOTE),
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE):
- (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE),
- (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
- (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE),
- # State: half-closed local
- (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS):
- (H2StreamStateMachine.response_received,
- StreamState.HALF_CLOSED_LOCAL),
- (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA):
- (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL),
- (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM):
- (H2StreamStateMachine.stream_ended, StreamState.CLOSED),
- (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
- (None, StreamState.HALF_CLOSED_LOCAL),
- (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
- (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL),
- (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM):
- (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
- (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM):
- (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
- (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE):
- (H2StreamStateMachine.recv_push_promise,
- StreamState.HALF_CLOSED_LOCAL),
- (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS):
- (H2StreamStateMachine.recv_informational_response,
- StreamState.HALF_CLOSED_LOCAL),
- (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
- (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL),
- (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
- (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL),
- # State: closed
- (StreamState.CLOSED, StreamInputs.RECV_END_STREAM):
- (None, StreamState.CLOSED),
- (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE):
- (None, StreamState.CLOSED),
- # RFC 7540 Section 5.1 defines how the end point should react when
- # receiving a frame on a closed stream with the following statements:
- #
- # > An endpoint that receives any frame other than PRIORITY after receiving
- # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED.
- # > An endpoint that receives any frames after receiving a frame with the
- # > END_STREAM flag set MUST treat that as a connection error of type
- # > STREAM_CLOSED.
- (StreamState.CLOSED, StreamInputs.RECV_HEADERS):
- (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
- (StreamState.CLOSED, StreamInputs.RECV_DATA):
- (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
- # > WINDOW_UPDATE or RST_STREAM frames can be received in this state
- # > for a short period after a DATA or HEADERS frame containing a
- # > END_STREAM flag is sent, as instructed in RFC 7540 Section 5.1. But we
- # > don't have access to a clock so we just always allow it.
- (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE):
- (None, StreamState.CLOSED),
- (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM):
- (None, StreamState.CLOSED),
- # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is
- # > neither "open" nor "half-closed (local)" as a connection error of type
- # > PROTOCOL_ERROR.
- (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE):
- (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED),
- # Also, users should be forbidden from sending on closed streams.
- (StreamState.CLOSED, StreamInputs.SEND_HEADERS):
- (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
- (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE):
- (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED),
- (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM):
- (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
- (StreamState.CLOSED, StreamInputs.SEND_DATA):
- (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
- (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE):
- (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
- (StreamState.CLOSED, StreamInputs.SEND_END_STREAM):
- (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
- }
- class H2Stream:
- """
- A low-level HTTP/2 stream object. This handles building and receiving
- frames and maintains per-stream state.
- This wraps a HTTP/2 Stream state machine implementation, ensuring that
- frames can only be sent/received when the stream is in a valid state.
- Attempts to create frames that cannot be sent will raise a
- ``ProtocolError``.
- """
- def __init__(self,
- stream_id: int,
- config: H2Configuration,
- inbound_window_size: int,
- outbound_window_size: int) -> None:
- self.state_machine = H2StreamStateMachine(stream_id)
- self.stream_id = stream_id
- self.max_outbound_frame_size: int | None = None
- self.request_method: bytes | None = None
- # The current value of the outbound stream flow control window
- self.outbound_flow_control_window = outbound_window_size
- # The flow control manager.
- self._inbound_window_manager = WindowManager(inbound_window_size)
- # The expected content length, if any.
- self._expected_content_length: int | None = None
- # The actual received content length. Always tracked.
- self._actual_content_length = 0
- # The authority we believe this stream belongs to.
- self._authority: bytes | None = None
- # The configuration for this stream.
- self.config = config
- def __repr__(self) -> str:
- return f"<{type(self).__name__} id:{self.stream_id} state:{self.state_machine.state!r}>"
- @property
- def inbound_flow_control_window(self) -> int:
- """
- The size of the inbound flow control window for the stream. This is
- rarely publicly useful: instead, use :meth:`remote_flow_control_window
- <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is
- largely present to provide a shortcut to this data.
- """
- return self._inbound_window_manager.current_window_size
- @property
- def open(self) -> bool:
- """
- Whether the stream is 'open' in any sense: that is, whether it counts
- against the number of concurrent streams.
- """
- # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either
- # the OPEN state or either of the HALF_CLOSED states. Perplexingly,
- # this excludes the reserved states.
- # For more detail on why we're doing this in this slightly weird way,
- # see the comment on ``STREAM_OPEN`` at the top of the file.
- return STREAM_OPEN[self.state_machine.state]
- @property
- def closed(self) -> bool:
- """
- Whether the stream is closed.
- """
- return self.state_machine.state == StreamState.CLOSED
- @property
- def closed_by(self) -> StreamClosedBy | None:
- """
- Returns how the stream was closed, as one of StreamClosedBy.
- """
- return self.state_machine.stream_closed_by
- def upgrade(self, client_side: bool) -> None:
- """
- Called by the connection to indicate that this stream is the initial
- request/response of an upgraded connection. Places the stream into an
- appropriate state.
- """
- self.config.logger.debug("Upgrading %r", self)
- assert self.stream_id == 1
- input_ = (
- StreamInputs.UPGRADE_CLIENT if client_side
- else StreamInputs.UPGRADE_SERVER
- )
- # This may return events, we deliberately don't want them.
- self.state_machine.process_input(input_)
- def send_headers(self,
- headers: Iterable[HeaderWeaklyTyped],
- encoder: Encoder,
- end_stream: bool = False) -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]:
- """
- Returns a list of HEADERS/CONTINUATION frames to emit as either headers
- or trailers.
- """
- self.config.logger.debug("Send headers %s on %r", headers, self)
- # Because encoding headers makes an irreversible change to the header
- # compression context, we make the state transition before we encode
- # them.
- # First, check if we're a client. If we are, no problem: if we aren't,
- # we need to scan the header block to see if this is an informational
- # response.
- input_ = StreamInputs.SEND_HEADERS
- bytes_headers = utf8_encode_headers(headers)
- if ((not self.state_machine.client) and
- is_informational_response(bytes_headers)):
- if end_stream:
- msg = "Cannot set END_STREAM on informational responses."
- raise ProtocolError(msg)
- input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS
- events = self.state_machine.process_input(input_)
- hf = HeadersFrame(self.stream_id)
- hdr_validation_flags = self._build_hdr_validation_flags(events)
- frames = self._build_headers_frames(
- bytes_headers, encoder, hf, hdr_validation_flags,
- )
- if end_stream:
- # Not a bug: the END_STREAM flag is valid on the initial HEADERS
- # frame, not the CONTINUATION frames that follow.
- self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
- frames[0].flags.add("END_STREAM")
- if self.state_machine.trailers_sent and not end_stream:
- msg = "Trailers must have END_STREAM set."
- raise ProtocolError(msg)
- if self.state_machine.client and self._authority is None:
- self._authority = authority_from_headers(bytes_headers)
- # store request method for _initialize_content_length
- self.request_method = extract_method_header(bytes_headers)
- return frames
- def push_stream_in_band(self,
- related_stream_id: int,
- headers: Iterable[HeaderWeaklyTyped],
- encoder: Encoder) -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]:
- """
- Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed
- stream header. Called on the stream that has the PUSH_PROMISE frame
- sent on it.
- """
- self.config.logger.debug("Push stream %r", self)
- # Because encoding headers makes an irreversible change to the header
- # compression context, we make the state transition *first*.
- events = self.state_machine.process_input(
- StreamInputs.SEND_PUSH_PROMISE,
- )
- ppf = PushPromiseFrame(self.stream_id)
- ppf.promised_stream_id = related_stream_id
- hdr_validation_flags = self._build_hdr_validation_flags(events)
- bytes_headers = utf8_encode_headers(headers)
- return self._build_headers_frames(
- bytes_headers, encoder, ppf, hdr_validation_flags,
- )
- def locally_pushed(self) -> list[Frame]:
- """
- Mark this stream as one that was pushed by this peer. Must be called
- immediately after initialization. Sends no frames, simply updates the
- state machine.
- """
- # This does not trigger any events.
- events = self.state_machine.process_input(
- StreamInputs.SEND_PUSH_PROMISE,
- )
- assert not events
- return []
- def send_data(self,
- data: bytes | memoryview,
- end_stream: bool = False,
- pad_length: int | None = None) -> list[Frame]:
- """
- Prepare some data frames. Optionally end the stream.
- .. warning:: Does not perform flow control checks.
- """
- self.config.logger.debug(
- "Send data on %r with end stream set to %s", self, end_stream,
- )
- self.state_machine.process_input(StreamInputs.SEND_DATA)
- df = DataFrame(self.stream_id)
- df.data = data
- if end_stream:
- self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
- df.flags.add("END_STREAM")
- if pad_length is not None:
- df.flags.add("PADDED")
- df.pad_length = pad_length
- # Subtract flow_controlled_length to account for possible padding
- self.outbound_flow_control_window -= df.flow_controlled_length
- assert self.outbound_flow_control_window >= 0
- return [df]
- def end_stream(self) -> list[Frame]:
- """
- End a stream without sending data.
- """
- self.config.logger.debug("End stream %r", self)
- self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
- df = DataFrame(self.stream_id)
- df.flags.add("END_STREAM")
- return [df]
- def advertise_alternative_service(self, field_value: bytes) -> list[Frame]:
- """
- Advertise an RFC 7838 alternative service. The semantics of this are
- better documented in the ``H2Connection`` class.
- """
- self.config.logger.debug(
- "Advertise alternative service of %r for %r", field_value, self,
- )
- self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE)
- asf = AltSvcFrame(self.stream_id)
- asf.field = field_value
- return [asf]
- def increase_flow_control_window(self, increment: int) -> list[Frame]:
- """
- Increase the size of the flow control window for the remote side.
- """
- self.config.logger.debug(
- "Increase flow control window for %r by %d",
- self, increment,
- )
- self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE)
- self._inbound_window_manager.window_opened(increment)
- wuf = WindowUpdateFrame(self.stream_id)
- wuf.window_increment = increment
- return [wuf]
- def receive_push_promise_in_band(self,
- promised_stream_id: int,
- headers: Iterable[Header],
- header_encoding: bool | str | None) -> tuple[list[Frame], list[Event]]:
- """
- Receives a push promise frame sent on this stream, pushing a remote
- stream. This is called on the stream that has the PUSH_PROMISE sent
- on it.
- """
- self.config.logger.debug(
- "Receive Push Promise on %r for remote stream %d",
- self, promised_stream_id,
- )
- events = self.state_machine.process_input(
- StreamInputs.RECV_PUSH_PROMISE,
- )
- push_event = cast("PushedStreamReceived", events[0])
- push_event.pushed_stream_id = promised_stream_id
- hdr_validation_flags = self._build_hdr_validation_flags(events)
- push_event.headers = self._process_received_headers(
- headers, hdr_validation_flags, header_encoding,
- )
- return [], events
- def remotely_pushed(self, pushed_headers: Iterable[Header]) -> tuple[list[Frame], list[Event]]:
- """
- Mark this stream as one that was pushed by the remote peer. Must be
- called immediately after initialization. Sends no frames, simply
- updates the state machine.
- """
- self.config.logger.debug("%r pushed by remote peer", self)
- events = self.state_machine.process_input(
- StreamInputs.RECV_PUSH_PROMISE,
- )
- self._authority = authority_from_headers(pushed_headers)
- return [], events
- def receive_headers(self,
- headers: Iterable[Header],
- end_stream: bool,
- header_encoding: bool | str | None) -> tuple[list[Frame], list[Event]]:
- """
- Receive a set of headers (or trailers).
- """
- if is_informational_response(headers):
- if end_stream:
- msg = "Cannot set END_STREAM on informational responses"
- raise ProtocolError(msg)
- input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS
- else:
- input_ = StreamInputs.RECV_HEADERS
- events = self.state_machine.process_input(input_)
- headers_event = cast(
- "Union[RequestReceived, ResponseReceived, TrailersReceived, InformationalResponseReceived]",
- events[0],
- )
- if end_stream:
- es_events = self.state_machine.process_input(
- StreamInputs.RECV_END_STREAM,
- )
- # We ensured it's not an information response at the beginning of the method.
- cast(
- "Union[RequestReceived, ResponseReceived, TrailersReceived]",
- headers_event,
- ).stream_ended = cast("StreamEnded", es_events[0])
- events += es_events
- self._initialize_content_length(headers)
- if isinstance(headers_event, TrailersReceived) and not end_stream:
- msg = "Trailers must have END_STREAM set"
- raise ProtocolError(msg)
- hdr_validation_flags = self._build_hdr_validation_flags(events)
- headers_event.headers = self._process_received_headers(
- headers, hdr_validation_flags, header_encoding,
- )
- return [], events
- def receive_data(self, data: bytes, end_stream: bool, flow_control_len: int) -> tuple[list[Frame], list[Event]]:
- """
- Receive some data.
- """
- self.config.logger.debug(
- "Receive data on %r with end stream %s and flow control length "
- "set to %d", self, end_stream, flow_control_len,
- )
- events = self.state_machine.process_input(StreamInputs.RECV_DATA)
- data_event = cast("DataReceived", events[0])
- self._inbound_window_manager.window_consumed(flow_control_len)
- self._track_content_length(len(data), end_stream)
- if end_stream:
- es_events = self.state_machine.process_input(
- StreamInputs.RECV_END_STREAM,
- )
- data_event.stream_ended = cast("StreamEnded", es_events[0])
- events.extend(es_events)
- data_event.data = data
- data_event.flow_controlled_length = flow_control_len
- return [], events
- def receive_window_update(self, increment: int) -> tuple[list[Frame], list[Event]]:
- """
- Handle a WINDOW_UPDATE increment.
- """
- self.config.logger.debug(
- "Receive Window Update on %r for increment of %d",
- self, increment,
- )
- events = self.state_machine.process_input(
- StreamInputs.RECV_WINDOW_UPDATE,
- )
- frames = []
- # If we encounter a problem with incrementing the flow control window,
- # this should be treated as a *stream* error, not a *connection* error.
- # That means we need to catch the error and forcibly close the stream.
- if events:
- cast("WindowUpdated", events[0]).delta = increment
- try:
- self.outbound_flow_control_window = guard_increment_window(
- self.outbound_flow_control_window,
- increment,
- )
- except FlowControlError:
- # Ok, this is bad. We're going to need to perform a local
- # reset.
- events = [
- StreamReset(
- stream_id=self.stream_id,
- error_code=ErrorCodes.FLOW_CONTROL_ERROR,
- remote_reset=False,
- ),
- ]
- frames = self.reset_stream(ErrorCodes.FLOW_CONTROL_ERROR)
- return frames, events
- def receive_continuation(self) -> None:
- """
- A naked CONTINUATION frame has been received. This is always an error,
- but the type of error it is depends on the state of the stream and must
- transition the state of the stream, so we need to handle it.
- """
- self.config.logger.debug("Receive Continuation frame on %r", self)
- self.state_machine.process_input(
- StreamInputs.RECV_CONTINUATION,
- )
- msg = "Should not be reachable" # pragma: no cover
- raise AssertionError(msg) # pragma: no cover
- def receive_alt_svc(self, frame: AltSvcFrame) -> tuple[list[Frame], list[Event]]:
- """
- An Alternative Service frame was received on the stream. This frame
- inherits the origin associated with this stream.
- """
- self.config.logger.debug(
- "Receive Alternative Service frame on stream %r", self,
- )
- # If the origin is present, RFC 7838 says we have to ignore it.
- if frame.origin:
- return [], []
- events = self.state_machine.process_input(
- StreamInputs.RECV_ALTERNATIVE_SERVICE,
- )
- # There are lots of situations where we want to ignore the ALTSVC
- # frame. If we need to pay attention, we'll have an event and should
- # fill it out.
- if events:
- assert isinstance(events[0], AlternativeServiceAvailable)
- events[0].origin = self._authority
- events[0].field_value = frame.field
- return [], events
- def reset_stream(self, error_code: ErrorCodes | int = 0) -> list[Frame]:
- """
- Close the stream locally. Reset the stream with an error code.
- """
- self.config.logger.debug(
- "Local reset %r with error code: %d", self, error_code,
- )
- self.state_machine.process_input(StreamInputs.SEND_RST_STREAM)
- rsf = RstStreamFrame(self.stream_id)
- rsf.error_code = error_code
- return [rsf]
- def stream_reset(self, frame: RstStreamFrame) -> tuple[list[Frame], list[Event]]:
- """
- Handle a stream being reset remotely.
- """
- self.config.logger.debug(
- "Remote reset %r with error code: %d", self, frame.error_code,
- )
- events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM)
- if events:
- # We don't fire an event if this stream is already closed.
- cast("StreamReset", events[0]).error_code = _error_code_from_int(frame.error_code)
- return [], events
- def acknowledge_received_data(self, acknowledged_size: int) -> list[Frame]:
- """
- The user has informed us that they've processed some amount of data
- that was received on this stream. Pass that to the window manager and
- potentially return some WindowUpdate frames.
- """
- self.config.logger.debug(
- "Acknowledge received data with size %d on %r",
- acknowledged_size, self,
- )
- increment = self._inbound_window_manager.process_bytes(
- acknowledged_size,
- )
- if increment:
- f = WindowUpdateFrame(self.stream_id)
- f.window_increment = increment
- return [f]
- return []
- def _build_hdr_validation_flags(self, events: Any) -> HeaderValidationFlags:
- """
- Constructs a set of header validation flags for use when normalizing
- and validating header blocks.
- """
- is_trailer = isinstance(
- events[0], (_TrailersSent, TrailersReceived),
- )
- is_response_header = isinstance(
- events[0],
- (
- _ResponseSent,
- ResponseReceived,
- InformationalResponseReceived,
- ),
- )
- is_push_promise = isinstance(
- events[0], (PushedStreamReceived, _PushedRequestSent),
- )
- return HeaderValidationFlags(
- is_client=self.state_machine.client or False,
- is_trailer=is_trailer,
- is_response_header=is_response_header,
- is_push_promise=is_push_promise,
- )
- def _build_headers_frames(self,
- headers: Iterable[Header],
- encoder: Encoder,
- first_frame: HeadersFrame | PushPromiseFrame,
- hdr_validation_flags: HeaderValidationFlags) \
- -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]:
- """
- Helper method to build headers or push promise frames.
- """
- # We need to lowercase the header names, and to ensure that secure
- # header fields are kept out of compression contexts.
- if self.config.normalize_outbound_headers:
- # also we may want to split outbound cookies to improve
- # headers compression
- should_split_outbound_cookies = self.config.split_outbound_cookies
- headers = normalize_outbound_headers(
- headers, hdr_validation_flags, should_split_outbound_cookies,
- )
- if self.config.validate_outbound_headers:
- headers = validate_outbound_headers(
- headers, hdr_validation_flags,
- )
- encoded_headers = encoder.encode(headers)
- # Slice into blocks of max_outbound_frame_size. Be careful with this:
- # it only works right because we never send padded frames or priority
- # information on the frames. Revisit this if we do.
- header_blocks = [
- encoded_headers[i:i+(self.max_outbound_frame_size or 0)]
- for i in range(
- 0, len(encoded_headers), (self.max_outbound_frame_size or 0),
- )
- ]
- frames: list[HeadersFrame | ContinuationFrame | PushPromiseFrame] = []
- first_frame.data = header_blocks[0]
- frames.append(first_frame)
- for block in header_blocks[1:]:
- cf = ContinuationFrame(self.stream_id)
- cf.data = block
- frames.append(cf)
- frames[-1].flags.add("END_HEADERS")
- return frames
- def _process_received_headers(self,
- headers: Iterable[Header],
- header_validation_flags: HeaderValidationFlags,
- header_encoding: bool | str | None) -> list[Header]:
- """
- When headers have been received from the remote peer, run a processing
- pipeline on them to transform them into the appropriate form for
- attaching to an event.
- """
- if self.config.normalize_inbound_headers:
- headers = normalize_inbound_headers(
- headers, header_validation_flags,
- )
- if self.config.validate_inbound_headers:
- headers = validate_headers(headers, header_validation_flags)
- if isinstance(header_encoding, str):
- headers = _decode_headers(headers, header_encoding)
- # The above steps are all generators, so we need to concretize the
- # headers now.
- return list(headers)
- def _initialize_content_length(self, headers: Iterable[Header]) -> None:
- """
- Checks the headers for a content-length header and initializes the
- _expected_content_length field from it. It's not an error for no
- Content-Length header to be present.
- """
- if self.request_method == b"HEAD":
- self._expected_content_length = 0
- return
- for n, v in headers:
- if n == b"content-length":
- try:
- self._expected_content_length = int(v, 10)
- except ValueError as err:
- msg = f"Invalid content-length header: {v!r}"
- raise ProtocolError(msg) from err
- return
- def _track_content_length(self, length: int, end_stream: bool) -> None:
- """
- Update the expected content length in response to data being received.
- Validates that the appropriate amount of data is sent. Always updates
- the received data, but only validates the length against the
- content-length header if one was sent.
- :param length: The length of the body chunk received.
- :param end_stream: If this is the last body chunk received.
- """
- self._actual_content_length += length
- actual = self._actual_content_length
- expected = self._expected_content_length
- if expected is not None:
- if expected < actual:
- raise InvalidBodyLengthError(expected, actual)
- if end_stream and expected != actual:
- raise InvalidBodyLengthError(expected, actual)
- def _inbound_flow_control_change_from_settings(self, delta: int) -> None:
- """
- We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to
- update the target window size for flow control. For our flow control
- strategy, this means we need to do two things: we need to adjust the
- current window size, but we also need to set the target maximum window
- size to the new value.
- """
- new_max_size = self._inbound_window_manager.max_window_size + delta
- self._inbound_window_manager.window_opened(delta)
- self._inbound_window_manager.max_window_size = new_max_size
- def _decode_headers(headers: Iterable[HeaderWeaklyTyped], encoding: str) -> Generator[HeaderTuple, None, None]:
- """
- Given an iterable of header two-tuples and an encoding, decodes those
- headers using that encoding while preserving the type of the header tuple.
- This ensures that the use of ``HeaderTuple`` is preserved.
- """
- for header in headers:
- # This function expects to work on decoded headers, which are always
- # HeaderTuple objects.
- assert isinstance(header, HeaderTuple)
- name, value = header
- assert isinstance(name, bytes)
- assert isinstance(value, bytes)
- n = name.decode(encoding)
- v = value.decode(encoding)
- yield header.__class__(n, v)
|