stream.py 57 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425
  1. """
  2. h2/stream
  3. ~~~~~~~~~
  4. An implementation of a HTTP/2 stream.
  5. """
  6. from __future__ import annotations
  7. from enum import Enum, IntEnum
  8. from typing import TYPE_CHECKING, Any, Union, cast
  9. from hpack import HeaderTuple
  10. from hyperframe.frame import AltSvcFrame, ContinuationFrame, DataFrame, Frame, HeadersFrame, PushPromiseFrame, RstStreamFrame, WindowUpdateFrame
  11. from .errors import ErrorCodes, _error_code_from_int
  12. from .events import (
  13. AlternativeServiceAvailable,
  14. DataReceived,
  15. Event,
  16. InformationalResponseReceived,
  17. PushedStreamReceived,
  18. RequestReceived,
  19. ResponseReceived,
  20. StreamEnded,
  21. StreamReset,
  22. TrailersReceived,
  23. WindowUpdated,
  24. _PushedRequestSent,
  25. _RequestSent,
  26. _ResponseSent,
  27. _TrailersSent,
  28. )
  29. from .exceptions import FlowControlError, InvalidBodyLengthError, ProtocolError, StreamClosedError
  30. from .utilities import (
  31. HeaderValidationFlags,
  32. authority_from_headers,
  33. extract_method_header,
  34. guard_increment_window,
  35. is_informational_response,
  36. normalize_inbound_headers,
  37. normalize_outbound_headers,
  38. utf8_encode_headers,
  39. validate_headers,
  40. validate_outbound_headers,
  41. )
  42. from .windows import WindowManager
  43. if TYPE_CHECKING: # pragma: no cover
  44. from collections.abc import Callable, Generator, Iterable
  45. from hpack.hpack import Encoder
  46. from hpack.struct import Header, HeaderWeaklyTyped
  47. from .config import H2Configuration
  48. class StreamState(IntEnum):
  49. IDLE = 0
  50. RESERVED_REMOTE = 1
  51. RESERVED_LOCAL = 2
  52. OPEN = 3
  53. HALF_CLOSED_REMOTE = 4
  54. HALF_CLOSED_LOCAL = 5
  55. CLOSED = 6
  56. class StreamInputs(Enum):
  57. SEND_HEADERS = 0
  58. SEND_PUSH_PROMISE = 1
  59. SEND_RST_STREAM = 2
  60. SEND_DATA = 3
  61. SEND_WINDOW_UPDATE = 4
  62. SEND_END_STREAM = 5
  63. RECV_HEADERS = 6
  64. RECV_PUSH_PROMISE = 7
  65. RECV_RST_STREAM = 8
  66. RECV_DATA = 9
  67. RECV_WINDOW_UPDATE = 10
  68. RECV_END_STREAM = 11
  69. RECV_CONTINUATION = 12 # Added in 2.0.0
  70. SEND_INFORMATIONAL_HEADERS = 13 # Added in 2.2.0
  71. RECV_INFORMATIONAL_HEADERS = 14 # Added in 2.2.0
  72. SEND_ALTERNATIVE_SERVICE = 15 # Added in 2.3.0
  73. RECV_ALTERNATIVE_SERVICE = 16 # Added in 2.3.0
  74. UPGRADE_CLIENT = 17 # Added 2.3.0
  75. UPGRADE_SERVER = 18 # Added 2.3.0
  76. class StreamClosedBy(Enum):
  77. SEND_END_STREAM = 0
  78. RECV_END_STREAM = 1
  79. SEND_RST_STREAM = 2
  80. RECV_RST_STREAM = 3
  81. # This array is initialized once, and is indexed by the stream states above.
  82. # It indicates whether a stream in the given state is open. The reason we do
  83. # this is that we potentially check whether a stream in a given state is open
  84. # quite frequently: given that we check so often, we should do so in the
  85. # fastest and most performant way possible.
  86. STREAM_OPEN = [False for _ in range(len(StreamState))]
  87. STREAM_OPEN[StreamState.OPEN] = True
  88. STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True
  89. STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True
  90. class H2StreamStateMachine:
  91. """
  92. A single HTTP/2 stream state machine.
  93. This stream object implements basically the state machine described in
  94. RFC 7540 section 5.1.
  95. :param stream_id: The stream ID of this stream. This is stored primarily
  96. for logging purposes.
  97. """
  98. def __init__(self, stream_id: int) -> None:
  99. self.state = StreamState.IDLE
  100. self.stream_id = stream_id
  101. #: Whether this peer is the client side of this stream.
  102. self.client: bool | None = None
  103. # Whether trailers have been sent/received on this stream or not.
  104. self.headers_sent: bool | None = None
  105. self.trailers_sent: bool | None = None
  106. self.headers_received: bool | None = None
  107. self.trailers_received: bool | None = None
  108. # How the stream was closed. One of StreamClosedBy.
  109. self.stream_closed_by: StreamClosedBy | None = None
  110. def process_input(self, input_: StreamInputs) -> list[Event]:
  111. """
  112. Process a specific input in the state machine.
  113. """
  114. if not isinstance(input_, StreamInputs):
  115. msg = "Input must be an instance of StreamInputs"
  116. raise ValueError(msg) # noqa: TRY004
  117. try:
  118. func, target_state = _transitions[(self.state, input_)]
  119. except KeyError as err:
  120. old_state = self.state
  121. self.state = StreamState.CLOSED
  122. msg = f"Invalid input {input_} in state {old_state}"
  123. raise ProtocolError(msg) from err
  124. else:
  125. previous_state = self.state
  126. self.state = target_state
  127. if func is not None:
  128. try:
  129. return func(self, previous_state)
  130. except ProtocolError:
  131. self.state = StreamState.CLOSED
  132. raise
  133. except AssertionError as err: # pragma: no cover
  134. self.state = StreamState.CLOSED
  135. raise ProtocolError(err) from err
  136. return []
  137. def request_sent(self, previous_state: StreamState) -> list[Event]:
  138. """
  139. Fires when a request is sent.
  140. """
  141. self.client = True
  142. self.headers_sent = True
  143. event = _RequestSent()
  144. return [event]
  145. def response_sent(self, previous_state: StreamState) -> list[Event]:
  146. """
  147. Fires when something that should be a response is sent. This 'response'
  148. may actually be trailers.
  149. """
  150. if not self.headers_sent:
  151. if self.client is True or self.client is None:
  152. msg = "Client cannot send responses."
  153. raise ProtocolError(msg)
  154. self.headers_sent = True
  155. return [_ResponseSent()]
  156. assert not self.trailers_sent
  157. self.trailers_sent = True
  158. return [_TrailersSent()]
  159. def request_received(self, previous_state: StreamState) -> list[Event]:
  160. """
  161. Fires when a request is received.
  162. """
  163. assert not self.headers_received
  164. assert not self.trailers_received
  165. self.client = False
  166. self.headers_received = True
  167. event = RequestReceived(stream_id=self.stream_id)
  168. return [event]
  169. def response_received(self, previous_state: StreamState) -> list[Event]:
  170. """
  171. Fires when a response is received. Also disambiguates between responses
  172. and trailers.
  173. """
  174. event: ResponseReceived | TrailersReceived
  175. if not self.headers_received:
  176. assert self.client is True
  177. self.headers_received = True
  178. event = ResponseReceived(stream_id=self.stream_id)
  179. else:
  180. assert not self.trailers_received
  181. self.trailers_received = True
  182. event = TrailersReceived(stream_id=self.stream_id)
  183. event.stream_id = self.stream_id
  184. return [event]
  185. def data_received(self, previous_state: StreamState) -> list[Event]:
  186. """
  187. Fires when data is received.
  188. """
  189. if not self.headers_received:
  190. msg = "cannot receive data before headers"
  191. raise ProtocolError(msg)
  192. event = DataReceived(stream_id=self.stream_id)
  193. return [event]
  194. def window_updated(self, previous_state: StreamState) -> list[Event]:
  195. """
  196. Fires when a window update frame is received.
  197. """
  198. return [WindowUpdated(stream_id=self.stream_id)]
  199. def stream_half_closed(self, previous_state: StreamState) -> list[Event]:
  200. """
  201. Fires when an END_STREAM flag is received in the OPEN state,
  202. transitioning this stream to a HALF_CLOSED_REMOTE state.
  203. """
  204. event = StreamEnded(stream_id=self.stream_id)
  205. return [event]
  206. def stream_ended(self, previous_state: StreamState) -> list[Event]:
  207. """
  208. Fires when a stream is cleanly ended.
  209. """
  210. self.stream_closed_by = StreamClosedBy.RECV_END_STREAM
  211. event = StreamEnded(stream_id=self.stream_id)
  212. return [event]
  213. def stream_reset(self, previous_state: StreamState) -> list[Event]:
  214. """
  215. Fired when a stream is forcefully reset.
  216. """
  217. self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM
  218. return [StreamReset(stream_id=self.stream_id)]
  219. def send_new_pushed_stream(self, previous_state: StreamState) -> list[Event]:
  220. """
  221. Fires on the newly pushed stream, when pushed by the local peer.
  222. No event here, but definitionally this peer must be a server.
  223. """
  224. assert self.client is None
  225. self.client = False
  226. self.headers_received = True
  227. return []
  228. def recv_new_pushed_stream(self, previous_state: StreamState) -> list[Event]:
  229. """
  230. Fires on the newly pushed stream, when pushed by the remote peer.
  231. No event here, but definitionally this peer must be a client.
  232. """
  233. assert self.client is None
  234. self.client = True
  235. self.headers_sent = True
  236. return []
  237. def send_push_promise(self, previous_state: StreamState) -> list[Event]:
  238. """
  239. Fires on the already-existing stream when a PUSH_PROMISE frame is sent.
  240. We may only send PUSH_PROMISE frames if we're a server.
  241. """
  242. if self.client is True:
  243. msg = "Cannot push streams from client peers."
  244. raise ProtocolError(msg)
  245. event = _PushedRequestSent()
  246. return [event]
  247. def recv_push_promise(self, previous_state: StreamState) -> list[Event]:
  248. """
  249. Fires on the already-existing stream when a PUSH_PROMISE frame is
  250. received. We may only receive PUSH_PROMISE frames if we're a client.
  251. Fires a PushedStreamReceived event.
  252. """
  253. if not self.client:
  254. if self.client is None: # pragma: no cover
  255. msg = "Idle streams cannot receive pushes"
  256. else: # pragma: no cover
  257. msg = "Cannot receive pushed streams as a server"
  258. raise ProtocolError(msg)
  259. event = PushedStreamReceived()
  260. event.parent_stream_id = self.stream_id
  261. return [event]
  262. def send_end_stream(self, previous_state: StreamState) -> list[Event]:
  263. """
  264. Called when an attempt is made to send END_STREAM in the
  265. HALF_CLOSED_REMOTE state.
  266. """
  267. self.stream_closed_by = StreamClosedBy.SEND_END_STREAM
  268. return []
  269. def send_reset_stream(self, previous_state: StreamState) -> list[Event]:
  270. """
  271. Called when an attempt is made to send RST_STREAM in a non-closed
  272. stream state.
  273. """
  274. self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
  275. return []
  276. def reset_stream_on_error(self, previous_state: StreamState) -> list[Event]:
  277. """
  278. Called when we need to forcefully emit another RST_STREAM frame on
  279. behalf of the state machine.
  280. If this is the first time we've done this, we should also hang an event
  281. off the StreamClosedError so that the user can be informed. We know
  282. it's the first time we've done this if the stream is currently in a
  283. state other than CLOSED.
  284. """
  285. self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
  286. error = StreamClosedError(self.stream_id)
  287. error._events = [
  288. StreamReset(
  289. stream_id=self.stream_id,
  290. error_code=ErrorCodes.STREAM_CLOSED,
  291. remote_reset=False,
  292. ),
  293. ]
  294. raise error
  295. def recv_on_closed_stream(self, previous_state: StreamState) -> list[Event]:
  296. """
  297. Called when an unexpected frame is received on an already-closed
  298. stream.
  299. An endpoint that receives an unexpected frame should treat it as
  300. a stream error or connection error with type STREAM_CLOSED, depending
  301. on the specific frame. The error handling is done at a higher level:
  302. this just raises the appropriate error.
  303. """
  304. raise StreamClosedError(self.stream_id)
  305. def send_on_closed_stream(self, previous_state: StreamState) -> list[Event]:
  306. """
  307. Called when an attempt is made to send data on an already-closed
  308. stream.
  309. This essentially overrides the standard logic by throwing a
  310. more-specific error: StreamClosedError. This is a ProtocolError, so it
  311. matches the standard API of the state machine, but provides more detail
  312. to the user.
  313. """
  314. raise StreamClosedError(self.stream_id)
  315. def recv_push_on_closed_stream(self, previous_state: StreamState) -> list[Event]:
  316. """
  317. Called when a PUSH_PROMISE frame is received on a full stop
  318. stream.
  319. If the stream was closed by us sending a RST_STREAM frame, then we
  320. presume that the PUSH_PROMISE was in flight when we reset the parent
  321. stream. Rathen than accept the new stream, we just reset it.
  322. Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a
  323. naturally closed stream is a real problem because it creates a brand
  324. new stream that the remote peer now believes exists.
  325. """
  326. assert self.stream_closed_by is not None
  327. if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM:
  328. raise StreamClosedError(self.stream_id)
  329. msg = "Attempted to push on closed stream."
  330. raise ProtocolError(msg)
  331. def send_push_on_closed_stream(self, previous_state: StreamState) -> list[Event]:
  332. """
  333. Called when an attempt is made to push on an already-closed stream.
  334. This essentially overrides the standard logic by providing a more
  335. useful error message. It's necessary because simply indicating that the
  336. stream is closed is not enough: there is now a new stream that is not
  337. allowed to be there. The only recourse is to tear the whole connection
  338. down.
  339. """
  340. msg = "Attempted to push on closed stream."
  341. raise ProtocolError(msg)
  342. def send_informational_response(self, previous_state: StreamState) -> list[Event]:
  343. """
  344. Called when an informational header block is sent (that is, a block
  345. where the :status header has a 1XX value).
  346. Only enforces that these are sent *before* final headers are sent.
  347. """
  348. if self.headers_sent:
  349. msg = "Information response after final response"
  350. raise ProtocolError(msg)
  351. event = _ResponseSent()
  352. return [event]
  353. def recv_informational_response(self, previous_state: StreamState) -> list[Event]:
  354. """
  355. Called when an informational header block is received (that is, a block
  356. where the :status header has a 1XX value).
  357. """
  358. if self.headers_received:
  359. msg = "Informational response after final response"
  360. raise ProtocolError(msg)
  361. return [InformationalResponseReceived(stream_id=self.stream_id)]
  362. def recv_alt_svc(self, previous_state: StreamState) -> list[Event]:
  363. """
  364. Called when receiving an ALTSVC frame.
  365. RFC 7838 allows us to receive ALTSVC frames at any stream state, which
  366. is really absurdly overzealous. For that reason, we want to limit the
  367. states in which we can actually receive it. It's really only sensible
  368. to receive it after we've sent our own headers and before the server
  369. has sent its header block: the server can't guarantee that we have any
  370. state around after it completes its header block, and the server
  371. doesn't know what origin we're talking about before we've sent ours.
  372. For that reason, this function applies a few extra checks on both state
  373. and some of the little state variables we keep around. If those suggest
  374. an unreasonable situation for the ALTSVC frame to have been sent in,
  375. we quietly ignore it (as RFC 7838 suggests).
  376. This function is also *not* always called by the state machine. In some
  377. states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it,
  378. because we know the frame cannot be valid in that state (IDLE because
  379. the server cannot know what origin the stream applies to, CLOSED
  380. because the server cannot assume we still have state around,
  381. RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL
  382. state then *we* are the server).
  383. """
  384. # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore
  385. # them.
  386. if self.client is False:
  387. return []
  388. # If we've received the response headers from the server they can't
  389. # guarantee we still have any state around. Other implementations
  390. # (like nghttp2) ignore ALTSVC in this state, so we will too.
  391. if self.headers_received:
  392. return []
  393. # Otherwise, this is a sensible enough frame to have received. Return
  394. # the event and let it get populated.
  395. return [AlternativeServiceAvailable()]
  396. def send_alt_svc(self, previous_state: StreamState) -> list[Event]:
  397. """
  398. Called when sending an ALTSVC frame on this stream.
  399. For consistency with the restrictions we apply on receiving ALTSVC
  400. frames in ``recv_alt_svc``, we want to restrict when users can send
  401. ALTSVC frames to the situations when we ourselves would accept them.
  402. That means: when we are a server, when we have received the request
  403. headers, and when we have not yet sent our own response headers.
  404. """
  405. # We should not send ALTSVC after we've sent response headers, as the
  406. # client may have disposed of its state.
  407. if self.headers_sent:
  408. msg = "Cannot send ALTSVC after sending response headers."
  409. raise ProtocolError(msg)
  410. return []
  411. # STATE MACHINE
  412. #
  413. # The stream state machine is defined here to avoid the need to allocate it
  414. # repeatedly for each stream. It cannot be defined in the stream class because
  415. # it needs to be able to reference the callbacks defined on the class, but
  416. # because Python's scoping rules are weird the class object is not actually in
  417. # scope during the body of the class object.
  418. #
  419. # For the sake of clarity, we reproduce the RFC 7540 state machine here:
  420. #
  421. # +--------+
  422. # send PP | | recv PP
  423. # ,--------| idle |--------.
  424. # / | | \
  425. # v +--------+ v
  426. # +----------+ | +----------+
  427. # | | | send H / | |
  428. # ,------| reserved | | recv H | reserved |------.
  429. # | | (local) | | | (remote) | |
  430. # | +----------+ v +----------+ |
  431. # | | +--------+ | |
  432. # | | recv ES | | send ES | |
  433. # | send H | ,-------| open |-------. | recv H |
  434. # | | / | | \ | |
  435. # | v v +--------+ v v |
  436. # | +----------+ | +----------+ |
  437. # | | half | | | half | |
  438. # | | closed | | send R / | closed | |
  439. # | | (remote) | | recv R | (local) | |
  440. # | +----------+ | +----------+ |
  441. # | | | | |
  442. # | | send ES / | recv ES / | |
  443. # | | send R / v send R / | |
  444. # | | recv R +--------+ recv R | |
  445. # | send R / `----------->| |<-----------' send R / |
  446. # | recv R | closed | recv R |
  447. # `----------------------->| |<----------------------'
  448. # +--------+
  449. #
  450. # send: endpoint sends this frame
  451. # recv: endpoint receives this frame
  452. #
  453. # H: HEADERS frame (with implied CONTINUATIONs)
  454. # PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
  455. # ES: END_STREAM flag
  456. # R: RST_STREAM frame
  457. #
  458. # For the purposes of this state machine we treat HEADERS and their
  459. # associated CONTINUATION frames as a single jumbo frame. The protocol
  460. # allows/requires this by preventing other frames from being interleved in
  461. # between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is
  462. # received without a prior HEADERS frame, it *will* be passed to this state
  463. # machine. The state machine should always reject that frame, either as an
  464. # invalid transition or because the stream is closed.
  465. #
  466. # There is a confusing relationship around PUSH_PROMISE frames. The state
  467. # machine above considers them to be frames belonging to the new stream,
  468. # which is *somewhat* true. However, they are sent with the stream ID of
  469. # their related stream, and are only sendable in some cases.
  470. # For this reason, our state machine implementation below allows for
  471. # PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also
  472. # in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states.
  473. # Essentially, for h2, PUSH_PROMISE frames are effectively sent on
  474. # two streams.
  475. #
  476. # The _transitions dictionary contains a mapping of tuples of
  477. # (state, input) to tuples of (side_effect_function, end_state). This
  478. # map contains all allowed transitions: anything not in this map is
  479. # invalid and immediately causes a transition to ``closed``.
  480. _transitions: dict[
  481. tuple[StreamState, StreamInputs],
  482. tuple[Callable[[H2StreamStateMachine, StreamState], list[Event]] | None, StreamState],
  483. ] = {
  484. # State: idle
  485. (StreamState.IDLE, StreamInputs.SEND_HEADERS):
  486. (H2StreamStateMachine.request_sent, StreamState.OPEN),
  487. (StreamState.IDLE, StreamInputs.RECV_HEADERS):
  488. (H2StreamStateMachine.request_received, StreamState.OPEN),
  489. (StreamState.IDLE, StreamInputs.RECV_DATA):
  490. (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
  491. (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE):
  492. (H2StreamStateMachine.send_new_pushed_stream,
  493. StreamState.RESERVED_LOCAL),
  494. (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE):
  495. (H2StreamStateMachine.recv_new_pushed_stream,
  496. StreamState.RESERVED_REMOTE),
  497. (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  498. (None, StreamState.IDLE),
  499. (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT):
  500. (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL),
  501. (StreamState.IDLE, StreamInputs.UPGRADE_SERVER):
  502. (H2StreamStateMachine.request_received,
  503. StreamState.HALF_CLOSED_REMOTE),
  504. # State: reserved local
  505. (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS):
  506. (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
  507. (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA):
  508. (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
  509. (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
  510. (None, StreamState.RESERVED_LOCAL),
  511. (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
  512. (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL),
  513. (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM):
  514. (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
  515. (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM):
  516. (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
  517. (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
  518. (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL),
  519. (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  520. (None, StreamState.RESERVED_LOCAL),
  521. # State: reserved remote
  522. (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS):
  523. (H2StreamStateMachine.response_received,
  524. StreamState.HALF_CLOSED_LOCAL),
  525. (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA):
  526. (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
  527. (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
  528. (None, StreamState.RESERVED_REMOTE),
  529. (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
  530. (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE),
  531. (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM):
  532. (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
  533. (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM):
  534. (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
  535. (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  536. (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE),
  537. # State: open
  538. (StreamState.OPEN, StreamInputs.SEND_HEADERS):
  539. (H2StreamStateMachine.response_sent, StreamState.OPEN),
  540. (StreamState.OPEN, StreamInputs.RECV_HEADERS):
  541. (H2StreamStateMachine.response_received, StreamState.OPEN),
  542. (StreamState.OPEN, StreamInputs.SEND_DATA):
  543. (None, StreamState.OPEN),
  544. (StreamState.OPEN, StreamInputs.RECV_DATA):
  545. (H2StreamStateMachine.data_received, StreamState.OPEN),
  546. (StreamState.OPEN, StreamInputs.SEND_END_STREAM):
  547. (None, StreamState.HALF_CLOSED_LOCAL),
  548. (StreamState.OPEN, StreamInputs.RECV_END_STREAM):
  549. (H2StreamStateMachine.stream_half_closed,
  550. StreamState.HALF_CLOSED_REMOTE),
  551. (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE):
  552. (None, StreamState.OPEN),
  553. (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE):
  554. (H2StreamStateMachine.window_updated, StreamState.OPEN),
  555. (StreamState.OPEN, StreamInputs.SEND_RST_STREAM):
  556. (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
  557. (StreamState.OPEN, StreamInputs.RECV_RST_STREAM):
  558. (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
  559. (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE):
  560. (H2StreamStateMachine.send_push_promise, StreamState.OPEN),
  561. (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE):
  562. (H2StreamStateMachine.recv_push_promise, StreamState.OPEN),
  563. (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS):
  564. (H2StreamStateMachine.send_informational_response, StreamState.OPEN),
  565. (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS):
  566. (H2StreamStateMachine.recv_informational_response, StreamState.OPEN),
  567. (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE):
  568. (H2StreamStateMachine.send_alt_svc, StreamState.OPEN),
  569. (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  570. (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN),
  571. # State: half-closed remote
  572. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS):
  573. (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
  574. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS):
  575. (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
  576. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA):
  577. (None, StreamState.HALF_CLOSED_REMOTE),
  578. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA):
  579. (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
  580. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM):
  581. (H2StreamStateMachine.send_end_stream, StreamState.CLOSED),
  582. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
  583. (None, StreamState.HALF_CLOSED_REMOTE),
  584. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
  585. (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE),
  586. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM):
  587. (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
  588. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM):
  589. (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
  590. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE):
  591. (H2StreamStateMachine.send_push_promise,
  592. StreamState.HALF_CLOSED_REMOTE),
  593. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE):
  594. (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
  595. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS):
  596. (H2StreamStateMachine.send_informational_response,
  597. StreamState.HALF_CLOSED_REMOTE),
  598. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE):
  599. (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE),
  600. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  601. (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE),
  602. # State: half-closed local
  603. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS):
  604. (H2StreamStateMachine.response_received,
  605. StreamState.HALF_CLOSED_LOCAL),
  606. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA):
  607. (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL),
  608. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM):
  609. (H2StreamStateMachine.stream_ended, StreamState.CLOSED),
  610. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
  611. (None, StreamState.HALF_CLOSED_LOCAL),
  612. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
  613. (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL),
  614. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM):
  615. (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
  616. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM):
  617. (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
  618. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE):
  619. (H2StreamStateMachine.recv_push_promise,
  620. StreamState.HALF_CLOSED_LOCAL),
  621. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS):
  622. (H2StreamStateMachine.recv_informational_response,
  623. StreamState.HALF_CLOSED_LOCAL),
  624. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
  625. (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL),
  626. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  627. (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL),
  628. # State: closed
  629. (StreamState.CLOSED, StreamInputs.RECV_END_STREAM):
  630. (None, StreamState.CLOSED),
  631. (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  632. (None, StreamState.CLOSED),
  633. # RFC 7540 Section 5.1 defines how the end point should react when
  634. # receiving a frame on a closed stream with the following statements:
  635. #
  636. # > An endpoint that receives any frame other than PRIORITY after receiving
  637. # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED.
  638. # > An endpoint that receives any frames after receiving a frame with the
  639. # > END_STREAM flag set MUST treat that as a connection error of type
  640. # > STREAM_CLOSED.
  641. (StreamState.CLOSED, StreamInputs.RECV_HEADERS):
  642. (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
  643. (StreamState.CLOSED, StreamInputs.RECV_DATA):
  644. (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
  645. # > WINDOW_UPDATE or RST_STREAM frames can be received in this state
  646. # > for a short period after a DATA or HEADERS frame containing a
  647. # > END_STREAM flag is sent, as instructed in RFC 7540 Section 5.1. But we
  648. # > don't have access to a clock so we just always allow it.
  649. (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE):
  650. (None, StreamState.CLOSED),
  651. (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM):
  652. (None, StreamState.CLOSED),
  653. # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is
  654. # > neither "open" nor "half-closed (local)" as a connection error of type
  655. # > PROTOCOL_ERROR.
  656. (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE):
  657. (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED),
  658. # Also, users should be forbidden from sending on closed streams.
  659. (StreamState.CLOSED, StreamInputs.SEND_HEADERS):
  660. (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
  661. (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE):
  662. (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED),
  663. (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM):
  664. (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
  665. (StreamState.CLOSED, StreamInputs.SEND_DATA):
  666. (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
  667. (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE):
  668. (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
  669. (StreamState.CLOSED, StreamInputs.SEND_END_STREAM):
  670. (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
  671. }
  672. class H2Stream:
  673. """
  674. A low-level HTTP/2 stream object. This handles building and receiving
  675. frames and maintains per-stream state.
  676. This wraps a HTTP/2 Stream state machine implementation, ensuring that
  677. frames can only be sent/received when the stream is in a valid state.
  678. Attempts to create frames that cannot be sent will raise a
  679. ``ProtocolError``.
  680. """
  681. def __init__(self,
  682. stream_id: int,
  683. config: H2Configuration,
  684. inbound_window_size: int,
  685. outbound_window_size: int) -> None:
  686. self.state_machine = H2StreamStateMachine(stream_id)
  687. self.stream_id = stream_id
  688. self.max_outbound_frame_size: int | None = None
  689. self.request_method: bytes | None = None
  690. # The current value of the outbound stream flow control window
  691. self.outbound_flow_control_window = outbound_window_size
  692. # The flow control manager.
  693. self._inbound_window_manager = WindowManager(inbound_window_size)
  694. # The expected content length, if any.
  695. self._expected_content_length: int | None = None
  696. # The actual received content length. Always tracked.
  697. self._actual_content_length = 0
  698. # The authority we believe this stream belongs to.
  699. self._authority: bytes | None = None
  700. # The configuration for this stream.
  701. self.config = config
  702. def __repr__(self) -> str:
  703. return f"<{type(self).__name__} id:{self.stream_id} state:{self.state_machine.state!r}>"
  704. @property
  705. def inbound_flow_control_window(self) -> int:
  706. """
  707. The size of the inbound flow control window for the stream. This is
  708. rarely publicly useful: instead, use :meth:`remote_flow_control_window
  709. <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is
  710. largely present to provide a shortcut to this data.
  711. """
  712. return self._inbound_window_manager.current_window_size
  713. @property
  714. def open(self) -> bool:
  715. """
  716. Whether the stream is 'open' in any sense: that is, whether it counts
  717. against the number of concurrent streams.
  718. """
  719. # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either
  720. # the OPEN state or either of the HALF_CLOSED states. Perplexingly,
  721. # this excludes the reserved states.
  722. # For more detail on why we're doing this in this slightly weird way,
  723. # see the comment on ``STREAM_OPEN`` at the top of the file.
  724. return STREAM_OPEN[self.state_machine.state]
  725. @property
  726. def closed(self) -> bool:
  727. """
  728. Whether the stream is closed.
  729. """
  730. return self.state_machine.state == StreamState.CLOSED
  731. @property
  732. def closed_by(self) -> StreamClosedBy | None:
  733. """
  734. Returns how the stream was closed, as one of StreamClosedBy.
  735. """
  736. return self.state_machine.stream_closed_by
  737. def upgrade(self, client_side: bool) -> None:
  738. """
  739. Called by the connection to indicate that this stream is the initial
  740. request/response of an upgraded connection. Places the stream into an
  741. appropriate state.
  742. """
  743. self.config.logger.debug("Upgrading %r", self)
  744. assert self.stream_id == 1
  745. input_ = (
  746. StreamInputs.UPGRADE_CLIENT if client_side
  747. else StreamInputs.UPGRADE_SERVER
  748. )
  749. # This may return events, we deliberately don't want them.
  750. self.state_machine.process_input(input_)
  751. def send_headers(self,
  752. headers: Iterable[HeaderWeaklyTyped],
  753. encoder: Encoder,
  754. end_stream: bool = False) -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]:
  755. """
  756. Returns a list of HEADERS/CONTINUATION frames to emit as either headers
  757. or trailers.
  758. """
  759. self.config.logger.debug("Send headers %s on %r", headers, self)
  760. # Because encoding headers makes an irreversible change to the header
  761. # compression context, we make the state transition before we encode
  762. # them.
  763. # First, check if we're a client. If we are, no problem: if we aren't,
  764. # we need to scan the header block to see if this is an informational
  765. # response.
  766. input_ = StreamInputs.SEND_HEADERS
  767. bytes_headers = utf8_encode_headers(headers)
  768. if ((not self.state_machine.client) and
  769. is_informational_response(bytes_headers)):
  770. if end_stream:
  771. msg = "Cannot set END_STREAM on informational responses."
  772. raise ProtocolError(msg)
  773. input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS
  774. events = self.state_machine.process_input(input_)
  775. hf = HeadersFrame(self.stream_id)
  776. hdr_validation_flags = self._build_hdr_validation_flags(events)
  777. frames = self._build_headers_frames(
  778. bytes_headers, encoder, hf, hdr_validation_flags,
  779. )
  780. if end_stream:
  781. # Not a bug: the END_STREAM flag is valid on the initial HEADERS
  782. # frame, not the CONTINUATION frames that follow.
  783. self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
  784. frames[0].flags.add("END_STREAM")
  785. if self.state_machine.trailers_sent and not end_stream:
  786. msg = "Trailers must have END_STREAM set."
  787. raise ProtocolError(msg)
  788. if self.state_machine.client and self._authority is None:
  789. self._authority = authority_from_headers(bytes_headers)
  790. # store request method for _initialize_content_length
  791. self.request_method = extract_method_header(bytes_headers)
  792. return frames
  793. def push_stream_in_band(self,
  794. related_stream_id: int,
  795. headers: Iterable[HeaderWeaklyTyped],
  796. encoder: Encoder) -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]:
  797. """
  798. Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed
  799. stream header. Called on the stream that has the PUSH_PROMISE frame
  800. sent on it.
  801. """
  802. self.config.logger.debug("Push stream %r", self)
  803. # Because encoding headers makes an irreversible change to the header
  804. # compression context, we make the state transition *first*.
  805. events = self.state_machine.process_input(
  806. StreamInputs.SEND_PUSH_PROMISE,
  807. )
  808. ppf = PushPromiseFrame(self.stream_id)
  809. ppf.promised_stream_id = related_stream_id
  810. hdr_validation_flags = self._build_hdr_validation_flags(events)
  811. bytes_headers = utf8_encode_headers(headers)
  812. return self._build_headers_frames(
  813. bytes_headers, encoder, ppf, hdr_validation_flags,
  814. )
  815. def locally_pushed(self) -> list[Frame]:
  816. """
  817. Mark this stream as one that was pushed by this peer. Must be called
  818. immediately after initialization. Sends no frames, simply updates the
  819. state machine.
  820. """
  821. # This does not trigger any events.
  822. events = self.state_machine.process_input(
  823. StreamInputs.SEND_PUSH_PROMISE,
  824. )
  825. assert not events
  826. return []
  827. def send_data(self,
  828. data: bytes | memoryview,
  829. end_stream: bool = False,
  830. pad_length: int | None = None) -> list[Frame]:
  831. """
  832. Prepare some data frames. Optionally end the stream.
  833. .. warning:: Does not perform flow control checks.
  834. """
  835. self.config.logger.debug(
  836. "Send data on %r with end stream set to %s", self, end_stream,
  837. )
  838. self.state_machine.process_input(StreamInputs.SEND_DATA)
  839. df = DataFrame(self.stream_id)
  840. df.data = data
  841. if end_stream:
  842. self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
  843. df.flags.add("END_STREAM")
  844. if pad_length is not None:
  845. df.flags.add("PADDED")
  846. df.pad_length = pad_length
  847. # Subtract flow_controlled_length to account for possible padding
  848. self.outbound_flow_control_window -= df.flow_controlled_length
  849. assert self.outbound_flow_control_window >= 0
  850. return [df]
  851. def end_stream(self) -> list[Frame]:
  852. """
  853. End a stream without sending data.
  854. """
  855. self.config.logger.debug("End stream %r", self)
  856. self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
  857. df = DataFrame(self.stream_id)
  858. df.flags.add("END_STREAM")
  859. return [df]
  860. def advertise_alternative_service(self, field_value: bytes) -> list[Frame]:
  861. """
  862. Advertise an RFC 7838 alternative service. The semantics of this are
  863. better documented in the ``H2Connection`` class.
  864. """
  865. self.config.logger.debug(
  866. "Advertise alternative service of %r for %r", field_value, self,
  867. )
  868. self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE)
  869. asf = AltSvcFrame(self.stream_id)
  870. asf.field = field_value
  871. return [asf]
  872. def increase_flow_control_window(self, increment: int) -> list[Frame]:
  873. """
  874. Increase the size of the flow control window for the remote side.
  875. """
  876. self.config.logger.debug(
  877. "Increase flow control window for %r by %d",
  878. self, increment,
  879. )
  880. self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE)
  881. self._inbound_window_manager.window_opened(increment)
  882. wuf = WindowUpdateFrame(self.stream_id)
  883. wuf.window_increment = increment
  884. return [wuf]
  885. def receive_push_promise_in_band(self,
  886. promised_stream_id: int,
  887. headers: Iterable[Header],
  888. header_encoding: bool | str | None) -> tuple[list[Frame], list[Event]]:
  889. """
  890. Receives a push promise frame sent on this stream, pushing a remote
  891. stream. This is called on the stream that has the PUSH_PROMISE sent
  892. on it.
  893. """
  894. self.config.logger.debug(
  895. "Receive Push Promise on %r for remote stream %d",
  896. self, promised_stream_id,
  897. )
  898. events = self.state_machine.process_input(
  899. StreamInputs.RECV_PUSH_PROMISE,
  900. )
  901. push_event = cast("PushedStreamReceived", events[0])
  902. push_event.pushed_stream_id = promised_stream_id
  903. hdr_validation_flags = self._build_hdr_validation_flags(events)
  904. push_event.headers = self._process_received_headers(
  905. headers, hdr_validation_flags, header_encoding,
  906. )
  907. return [], events
  908. def remotely_pushed(self, pushed_headers: Iterable[Header]) -> tuple[list[Frame], list[Event]]:
  909. """
  910. Mark this stream as one that was pushed by the remote peer. Must be
  911. called immediately after initialization. Sends no frames, simply
  912. updates the state machine.
  913. """
  914. self.config.logger.debug("%r pushed by remote peer", self)
  915. events = self.state_machine.process_input(
  916. StreamInputs.RECV_PUSH_PROMISE,
  917. )
  918. self._authority = authority_from_headers(pushed_headers)
  919. return [], events
  920. def receive_headers(self,
  921. headers: Iterable[Header],
  922. end_stream: bool,
  923. header_encoding: bool | str | None) -> tuple[list[Frame], list[Event]]:
  924. """
  925. Receive a set of headers (or trailers).
  926. """
  927. if is_informational_response(headers):
  928. if end_stream:
  929. msg = "Cannot set END_STREAM on informational responses"
  930. raise ProtocolError(msg)
  931. input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS
  932. else:
  933. input_ = StreamInputs.RECV_HEADERS
  934. events = self.state_machine.process_input(input_)
  935. headers_event = cast(
  936. "Union[RequestReceived, ResponseReceived, TrailersReceived, InformationalResponseReceived]",
  937. events[0],
  938. )
  939. if end_stream:
  940. es_events = self.state_machine.process_input(
  941. StreamInputs.RECV_END_STREAM,
  942. )
  943. # We ensured it's not an information response at the beginning of the method.
  944. cast(
  945. "Union[RequestReceived, ResponseReceived, TrailersReceived]",
  946. headers_event,
  947. ).stream_ended = cast("StreamEnded", es_events[0])
  948. events += es_events
  949. self._initialize_content_length(headers)
  950. if isinstance(headers_event, TrailersReceived) and not end_stream:
  951. msg = "Trailers must have END_STREAM set"
  952. raise ProtocolError(msg)
  953. hdr_validation_flags = self._build_hdr_validation_flags(events)
  954. headers_event.headers = self._process_received_headers(
  955. headers, hdr_validation_flags, header_encoding,
  956. )
  957. return [], events
  958. def receive_data(self, data: bytes, end_stream: bool, flow_control_len: int) -> tuple[list[Frame], list[Event]]:
  959. """
  960. Receive some data.
  961. """
  962. self.config.logger.debug(
  963. "Receive data on %r with end stream %s and flow control length "
  964. "set to %d", self, end_stream, flow_control_len,
  965. )
  966. events = self.state_machine.process_input(StreamInputs.RECV_DATA)
  967. data_event = cast("DataReceived", events[0])
  968. self._inbound_window_manager.window_consumed(flow_control_len)
  969. self._track_content_length(len(data), end_stream)
  970. if end_stream:
  971. es_events = self.state_machine.process_input(
  972. StreamInputs.RECV_END_STREAM,
  973. )
  974. data_event.stream_ended = cast("StreamEnded", es_events[0])
  975. events.extend(es_events)
  976. data_event.data = data
  977. data_event.flow_controlled_length = flow_control_len
  978. return [], events
  979. def receive_window_update(self, increment: int) -> tuple[list[Frame], list[Event]]:
  980. """
  981. Handle a WINDOW_UPDATE increment.
  982. """
  983. self.config.logger.debug(
  984. "Receive Window Update on %r for increment of %d",
  985. self, increment,
  986. )
  987. events = self.state_machine.process_input(
  988. StreamInputs.RECV_WINDOW_UPDATE,
  989. )
  990. frames = []
  991. # If we encounter a problem with incrementing the flow control window,
  992. # this should be treated as a *stream* error, not a *connection* error.
  993. # That means we need to catch the error and forcibly close the stream.
  994. if events:
  995. cast("WindowUpdated", events[0]).delta = increment
  996. try:
  997. self.outbound_flow_control_window = guard_increment_window(
  998. self.outbound_flow_control_window,
  999. increment,
  1000. )
  1001. except FlowControlError:
  1002. # Ok, this is bad. We're going to need to perform a local
  1003. # reset.
  1004. events = [
  1005. StreamReset(
  1006. stream_id=self.stream_id,
  1007. error_code=ErrorCodes.FLOW_CONTROL_ERROR,
  1008. remote_reset=False,
  1009. ),
  1010. ]
  1011. frames = self.reset_stream(ErrorCodes.FLOW_CONTROL_ERROR)
  1012. return frames, events
  1013. def receive_continuation(self) -> None:
  1014. """
  1015. A naked CONTINUATION frame has been received. This is always an error,
  1016. but the type of error it is depends on the state of the stream and must
  1017. transition the state of the stream, so we need to handle it.
  1018. """
  1019. self.config.logger.debug("Receive Continuation frame on %r", self)
  1020. self.state_machine.process_input(
  1021. StreamInputs.RECV_CONTINUATION,
  1022. )
  1023. msg = "Should not be reachable" # pragma: no cover
  1024. raise AssertionError(msg) # pragma: no cover
  1025. def receive_alt_svc(self, frame: AltSvcFrame) -> tuple[list[Frame], list[Event]]:
  1026. """
  1027. An Alternative Service frame was received on the stream. This frame
  1028. inherits the origin associated with this stream.
  1029. """
  1030. self.config.logger.debug(
  1031. "Receive Alternative Service frame on stream %r", self,
  1032. )
  1033. # If the origin is present, RFC 7838 says we have to ignore it.
  1034. if frame.origin:
  1035. return [], []
  1036. events = self.state_machine.process_input(
  1037. StreamInputs.RECV_ALTERNATIVE_SERVICE,
  1038. )
  1039. # There are lots of situations where we want to ignore the ALTSVC
  1040. # frame. If we need to pay attention, we'll have an event and should
  1041. # fill it out.
  1042. if events:
  1043. assert isinstance(events[0], AlternativeServiceAvailable)
  1044. events[0].origin = self._authority
  1045. events[0].field_value = frame.field
  1046. return [], events
  1047. def reset_stream(self, error_code: ErrorCodes | int = 0) -> list[Frame]:
  1048. """
  1049. Close the stream locally. Reset the stream with an error code.
  1050. """
  1051. self.config.logger.debug(
  1052. "Local reset %r with error code: %d", self, error_code,
  1053. )
  1054. self.state_machine.process_input(StreamInputs.SEND_RST_STREAM)
  1055. rsf = RstStreamFrame(self.stream_id)
  1056. rsf.error_code = error_code
  1057. return [rsf]
  1058. def stream_reset(self, frame: RstStreamFrame) -> tuple[list[Frame], list[Event]]:
  1059. """
  1060. Handle a stream being reset remotely.
  1061. """
  1062. self.config.logger.debug(
  1063. "Remote reset %r with error code: %d", self, frame.error_code,
  1064. )
  1065. events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM)
  1066. if events:
  1067. # We don't fire an event if this stream is already closed.
  1068. cast("StreamReset", events[0]).error_code = _error_code_from_int(frame.error_code)
  1069. return [], events
  1070. def acknowledge_received_data(self, acknowledged_size: int) -> list[Frame]:
  1071. """
  1072. The user has informed us that they've processed some amount of data
  1073. that was received on this stream. Pass that to the window manager and
  1074. potentially return some WindowUpdate frames.
  1075. """
  1076. self.config.logger.debug(
  1077. "Acknowledge received data with size %d on %r",
  1078. acknowledged_size, self,
  1079. )
  1080. increment = self._inbound_window_manager.process_bytes(
  1081. acknowledged_size,
  1082. )
  1083. if increment:
  1084. f = WindowUpdateFrame(self.stream_id)
  1085. f.window_increment = increment
  1086. return [f]
  1087. return []
  1088. def _build_hdr_validation_flags(self, events: Any) -> HeaderValidationFlags:
  1089. """
  1090. Constructs a set of header validation flags for use when normalizing
  1091. and validating header blocks.
  1092. """
  1093. is_trailer = isinstance(
  1094. events[0], (_TrailersSent, TrailersReceived),
  1095. )
  1096. is_response_header = isinstance(
  1097. events[0],
  1098. (
  1099. _ResponseSent,
  1100. ResponseReceived,
  1101. InformationalResponseReceived,
  1102. ),
  1103. )
  1104. is_push_promise = isinstance(
  1105. events[0], (PushedStreamReceived, _PushedRequestSent),
  1106. )
  1107. return HeaderValidationFlags(
  1108. is_client=self.state_machine.client or False,
  1109. is_trailer=is_trailer,
  1110. is_response_header=is_response_header,
  1111. is_push_promise=is_push_promise,
  1112. )
  1113. def _build_headers_frames(self,
  1114. headers: Iterable[Header],
  1115. encoder: Encoder,
  1116. first_frame: HeadersFrame | PushPromiseFrame,
  1117. hdr_validation_flags: HeaderValidationFlags) \
  1118. -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]:
  1119. """
  1120. Helper method to build headers or push promise frames.
  1121. """
  1122. # We need to lowercase the header names, and to ensure that secure
  1123. # header fields are kept out of compression contexts.
  1124. if self.config.normalize_outbound_headers:
  1125. # also we may want to split outbound cookies to improve
  1126. # headers compression
  1127. should_split_outbound_cookies = self.config.split_outbound_cookies
  1128. headers = normalize_outbound_headers(
  1129. headers, hdr_validation_flags, should_split_outbound_cookies,
  1130. )
  1131. if self.config.validate_outbound_headers:
  1132. headers = validate_outbound_headers(
  1133. headers, hdr_validation_flags,
  1134. )
  1135. encoded_headers = encoder.encode(headers)
  1136. # Slice into blocks of max_outbound_frame_size. Be careful with this:
  1137. # it only works right because we never send padded frames or priority
  1138. # information on the frames. Revisit this if we do.
  1139. header_blocks = [
  1140. encoded_headers[i:i+(self.max_outbound_frame_size or 0)]
  1141. for i in range(
  1142. 0, len(encoded_headers), (self.max_outbound_frame_size or 0),
  1143. )
  1144. ]
  1145. frames: list[HeadersFrame | ContinuationFrame | PushPromiseFrame] = []
  1146. first_frame.data = header_blocks[0]
  1147. frames.append(first_frame)
  1148. for block in header_blocks[1:]:
  1149. cf = ContinuationFrame(self.stream_id)
  1150. cf.data = block
  1151. frames.append(cf)
  1152. frames[-1].flags.add("END_HEADERS")
  1153. return frames
  1154. def _process_received_headers(self,
  1155. headers: Iterable[Header],
  1156. header_validation_flags: HeaderValidationFlags,
  1157. header_encoding: bool | str | None) -> list[Header]:
  1158. """
  1159. When headers have been received from the remote peer, run a processing
  1160. pipeline on them to transform them into the appropriate form for
  1161. attaching to an event.
  1162. """
  1163. if self.config.normalize_inbound_headers:
  1164. headers = normalize_inbound_headers(
  1165. headers, header_validation_flags,
  1166. )
  1167. if self.config.validate_inbound_headers:
  1168. headers = validate_headers(headers, header_validation_flags)
  1169. if isinstance(header_encoding, str):
  1170. headers = _decode_headers(headers, header_encoding)
  1171. # The above steps are all generators, so we need to concretize the
  1172. # headers now.
  1173. return list(headers)
  1174. def _initialize_content_length(self, headers: Iterable[Header]) -> None:
  1175. """
  1176. Checks the headers for a content-length header and initializes the
  1177. _expected_content_length field from it. It's not an error for no
  1178. Content-Length header to be present.
  1179. """
  1180. if self.request_method == b"HEAD":
  1181. self._expected_content_length = 0
  1182. return
  1183. for n, v in headers:
  1184. if n == b"content-length":
  1185. try:
  1186. self._expected_content_length = int(v, 10)
  1187. except ValueError as err:
  1188. msg = f"Invalid content-length header: {v!r}"
  1189. raise ProtocolError(msg) from err
  1190. return
  1191. def _track_content_length(self, length: int, end_stream: bool) -> None:
  1192. """
  1193. Update the expected content length in response to data being received.
  1194. Validates that the appropriate amount of data is sent. Always updates
  1195. the received data, but only validates the length against the
  1196. content-length header if one was sent.
  1197. :param length: The length of the body chunk received.
  1198. :param end_stream: If this is the last body chunk received.
  1199. """
  1200. self._actual_content_length += length
  1201. actual = self._actual_content_length
  1202. expected = self._expected_content_length
  1203. if expected is not None:
  1204. if expected < actual:
  1205. raise InvalidBodyLengthError(expected, actual)
  1206. if end_stream and expected != actual:
  1207. raise InvalidBodyLengthError(expected, actual)
  1208. def _inbound_flow_control_change_from_settings(self, delta: int) -> None:
  1209. """
  1210. We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to
  1211. update the target window size for flow control. For our flow control
  1212. strategy, this means we need to do two things: we need to adjust the
  1213. current window size, but we also need to set the target maximum window
  1214. size to the new value.
  1215. """
  1216. new_max_size = self._inbound_window_manager.max_window_size + delta
  1217. self._inbound_window_manager.window_opened(delta)
  1218. self._inbound_window_manager.max_window_size = new_max_size
  1219. def _decode_headers(headers: Iterable[HeaderWeaklyTyped], encoding: str) -> Generator[HeaderTuple, None, None]:
  1220. """
  1221. Given an iterable of header two-tuples and an encoding, decodes those
  1222. headers using that encoding while preserving the type of the header tuple.
  1223. This ensures that the use of ``HeaderTuple`` is preserved.
  1224. """
  1225. for header in headers:
  1226. # This function expects to work on decoded headers, which are always
  1227. # HeaderTuple objects.
  1228. assert isinstance(header, HeaderTuple)
  1229. name, value = header
  1230. assert isinstance(name, bytes)
  1231. assert isinstance(value, bytes)
  1232. n = name.decode(encoding)
  1233. v = value.decode(encoding)
  1234. yield header.__class__(n, v)