_channel.py 79 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252
  1. # Copyright 2016 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Invocation-side implementation of gRPC Python."""
  15. import copy
  16. import functools
  17. import logging
  18. import os
  19. import sys
  20. import threading
  21. import time
  22. import types
  23. from typing import (
  24. Any,
  25. Callable,
  26. Dict,
  27. Iterator,
  28. List,
  29. Optional,
  30. Sequence,
  31. Set,
  32. Tuple,
  33. Union,
  34. )
  35. import grpc # pytype: disable=pyi-error
  36. from grpc import _common # pytype: disable=pyi-error
  37. from grpc import _compression # pytype: disable=pyi-error
  38. from grpc import _grpcio_metadata # pytype: disable=pyi-error
  39. from grpc import _observability # pytype: disable=pyi-error
  40. from grpc._cython import cygrpc
  41. from grpc._typing import ChannelArgumentType
  42. from grpc._typing import DeserializingFunction
  43. from grpc._typing import IntegratedCallFactory
  44. from grpc._typing import MetadataType
  45. from grpc._typing import NullaryCallbackType
  46. from grpc._typing import ResponseType
  47. from grpc._typing import SerializingFunction
  48. from grpc._typing import UserTag
  49. import grpc.experimental # pytype: disable=pyi-error
  50. _LOGGER = logging.getLogger(__name__)
  51. _USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__)
  52. _EMPTY_FLAGS = 0
  53. # NOTE(rbellevi): No guarantees are given about the maintenance of this
  54. # environment variable.
  55. _DEFAULT_SINGLE_THREADED_UNARY_STREAM = (
  56. os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
  57. )
  58. _UNARY_UNARY_INITIAL_DUE = (
  59. cygrpc.OperationType.send_initial_metadata,
  60. cygrpc.OperationType.send_message,
  61. cygrpc.OperationType.send_close_from_client,
  62. cygrpc.OperationType.receive_initial_metadata,
  63. cygrpc.OperationType.receive_message,
  64. cygrpc.OperationType.receive_status_on_client,
  65. )
  66. _UNARY_STREAM_INITIAL_DUE = (
  67. cygrpc.OperationType.send_initial_metadata,
  68. cygrpc.OperationType.send_message,
  69. cygrpc.OperationType.send_close_from_client,
  70. cygrpc.OperationType.receive_initial_metadata,
  71. cygrpc.OperationType.receive_status_on_client,
  72. )
  73. _STREAM_UNARY_INITIAL_DUE = (
  74. cygrpc.OperationType.send_initial_metadata,
  75. cygrpc.OperationType.receive_initial_metadata,
  76. cygrpc.OperationType.receive_message,
  77. cygrpc.OperationType.receive_status_on_client,
  78. )
  79. _STREAM_STREAM_INITIAL_DUE = (
  80. cygrpc.OperationType.send_initial_metadata,
  81. cygrpc.OperationType.receive_initial_metadata,
  82. cygrpc.OperationType.receive_status_on_client,
  83. )
  84. _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
  85. "Exception calling channel subscription callback!"
  86. )
  87. _OK_RENDEZVOUS_REPR_FORMAT = (
  88. '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
  89. )
  90. _NON_OK_RENDEZVOUS_REPR_FORMAT = (
  91. "<{} of RPC that terminated with:\n"
  92. "\tstatus = {}\n"
  93. '\tdetails = "{}"\n'
  94. '\tdebug_error_string = "{}"\n'
  95. ">"
  96. )
  97. def _deadline(timeout: Optional[float]) -> Optional[float]:
  98. return None if timeout is None else time.time() + timeout
  99. def _unknown_code_details(
  100. unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str]
  101. ) -> str:
  102. return 'Server sent unknown code {} and details "{}"'.format(
  103. unknown_cygrpc_code, details
  104. )
  105. class _RPCState(object):
  106. condition: threading.Condition
  107. due: Set[cygrpc.OperationType]
  108. initial_metadata: Optional[MetadataType]
  109. response: Any
  110. trailing_metadata: Optional[MetadataType]
  111. code: Optional[grpc.StatusCode]
  112. details: Optional[str]
  113. debug_error_string: Optional[str]
  114. cancelled: bool
  115. callbacks: List[NullaryCallbackType]
  116. fork_epoch: Optional[int]
  117. rpc_start_time: Optional[float] # In relative seconds
  118. rpc_end_time: Optional[float] # In relative seconds
  119. method: Optional[str]
  120. target: Optional[str]
  121. def __init__(
  122. self,
  123. due: Sequence[cygrpc.OperationType],
  124. initial_metadata: Optional[MetadataType],
  125. trailing_metadata: Optional[MetadataType],
  126. code: Optional[grpc.StatusCode],
  127. details: Optional[str],
  128. ):
  129. # `condition` guards all members of _RPCState. `notify_all` is called on
  130. # `condition` when the state of the RPC has changed.
  131. self.condition = threading.Condition()
  132. # The cygrpc.OperationType objects representing events due from the RPC's
  133. # completion queue. If an operation is in `due`, it is guaranteed that
  134. # `operate()` has been called on a corresponding operation. But the
  135. # converse is not true. That is, in the case of failed `operate()`
  136. # calls, there may briefly be events in `due` that do not correspond to
  137. # operations submitted to Core.
  138. self.due = set(due)
  139. self.initial_metadata = initial_metadata
  140. self.response = None
  141. self.trailing_metadata = trailing_metadata
  142. self.code = code
  143. self.details = details
  144. self.debug_error_string = None
  145. # The following three fields are used for observability.
  146. # Updates to those fields do not trigger self.condition.
  147. self.rpc_start_time = None
  148. self.rpc_end_time = None
  149. self.method = None
  150. self.target = None
  151. # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
  152. # slightly wonky, so they have to be tracked separately from the rest of the
  153. # result of the RPC. This field tracks whether cancellation was requested
  154. # prior to termination of the RPC.
  155. self.cancelled = False
  156. self.callbacks = []
  157. self.fork_epoch = cygrpc.get_fork_epoch()
  158. def reset_postfork_child(self):
  159. self.condition = threading.Condition()
  160. def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
  161. if state.code is None:
  162. state.code = code
  163. state.details = details
  164. if state.initial_metadata is None:
  165. state.initial_metadata = ()
  166. state.trailing_metadata = ()
  167. def _handle_event(
  168. event: cygrpc.BaseEvent,
  169. state: _RPCState,
  170. response_deserializer: Optional[DeserializingFunction],
  171. ) -> List[NullaryCallbackType]:
  172. callbacks = []
  173. for batch_operation in event.batch_operations:
  174. operation_type = batch_operation.type()
  175. state.due.remove(operation_type)
  176. if operation_type == cygrpc.OperationType.receive_initial_metadata:
  177. state.initial_metadata = batch_operation.initial_metadata()
  178. elif operation_type == cygrpc.OperationType.receive_message:
  179. serialized_response = batch_operation.message()
  180. if serialized_response is not None:
  181. response = _common.deserialize(
  182. serialized_response, response_deserializer
  183. )
  184. if response is None:
  185. details = "Exception deserializing response!"
  186. _abort(state, grpc.StatusCode.INTERNAL, details)
  187. else:
  188. state.response = response
  189. elif operation_type == cygrpc.OperationType.receive_status_on_client:
  190. state.trailing_metadata = batch_operation.trailing_metadata()
  191. if state.code is None:
  192. code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
  193. batch_operation.code()
  194. )
  195. if code is None:
  196. state.code = grpc.StatusCode.UNKNOWN
  197. state.details = _unknown_code_details(
  198. code, batch_operation.details()
  199. )
  200. else:
  201. state.code = code
  202. state.details = batch_operation.details()
  203. state.debug_error_string = batch_operation.error_string()
  204. state.rpc_end_time = time.perf_counter()
  205. _observability.maybe_record_rpc_latency(state)
  206. callbacks.extend(state.callbacks)
  207. state.callbacks = None
  208. return callbacks
  209. def _event_handler(
  210. state: _RPCState, response_deserializer: Optional[DeserializingFunction]
  211. ) -> UserTag:
  212. def handle_event(event):
  213. with state.condition:
  214. callbacks = _handle_event(event, state, response_deserializer)
  215. state.condition.notify_all()
  216. done = not state.due
  217. for callback in callbacks:
  218. try:
  219. callback()
  220. except Exception as e: # pylint: disable=broad-except
  221. # NOTE(rbellevi): We suppress but log errors here so as not to
  222. # kill the channel spin thread.
  223. logging.error(
  224. "Exception in callback %s: %s", repr(callback.func), repr(e)
  225. )
  226. return done and state.fork_epoch >= cygrpc.get_fork_epoch()
  227. return handle_event
  228. # TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
  229. # pylint: disable=too-many-statements
  230. def _consume_request_iterator(
  231. request_iterator: Iterator,
  232. state: _RPCState,
  233. call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall],
  234. request_serializer: SerializingFunction,
  235. event_handler: Optional[UserTag],
  236. ) -> None:
  237. """Consume a request supplied by the user."""
  238. def consume_request_iterator(): # pylint: disable=too-many-branches
  239. # Iterate over the request iterator until it is exhausted or an error
  240. # condition is encountered.
  241. while True:
  242. return_from_user_request_generator_invoked = False
  243. try:
  244. # The thread may die in user-code. Do not block fork for this.
  245. cygrpc.enter_user_request_generator()
  246. request = next(request_iterator)
  247. except StopIteration:
  248. break
  249. except Exception: # pylint: disable=broad-except
  250. cygrpc.return_from_user_request_generator()
  251. return_from_user_request_generator_invoked = True
  252. code = grpc.StatusCode.UNKNOWN
  253. details = "Exception iterating requests!"
  254. _LOGGER.exception(details)
  255. call.cancel(
  256. _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
  257. )
  258. _abort(state, code, details)
  259. return
  260. finally:
  261. if not return_from_user_request_generator_invoked:
  262. cygrpc.return_from_user_request_generator()
  263. serialized_request = _common.serialize(request, request_serializer)
  264. with state.condition:
  265. if state.code is None and not state.cancelled:
  266. if serialized_request is None:
  267. code = grpc.StatusCode.INTERNAL
  268. details = "Exception serializing request!"
  269. call.cancel(
  270. _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
  271. details,
  272. )
  273. _abort(state, code, details)
  274. return
  275. else:
  276. state.due.add(cygrpc.OperationType.send_message)
  277. operations = (
  278. cygrpc.SendMessageOperation(
  279. serialized_request, _EMPTY_FLAGS
  280. ),
  281. )
  282. operating = call.operate(operations, event_handler)
  283. if not operating:
  284. state.due.remove(cygrpc.OperationType.send_message)
  285. return
  286. def _done():
  287. return (
  288. state.code is not None
  289. or cygrpc.OperationType.send_message
  290. not in state.due
  291. )
  292. _common.wait(
  293. state.condition.wait,
  294. _done,
  295. spin_cb=functools.partial(
  296. cygrpc.block_if_fork_in_progress, state
  297. ),
  298. )
  299. if state.code is not None:
  300. return
  301. else:
  302. return
  303. with state.condition:
  304. if state.code is None:
  305. state.due.add(cygrpc.OperationType.send_close_from_client)
  306. operations = (
  307. cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
  308. )
  309. operating = call.operate(operations, event_handler)
  310. if not operating:
  311. state.due.remove(
  312. cygrpc.OperationType.send_close_from_client
  313. )
  314. consumption_thread = cygrpc.ForkManagedThread(
  315. target=consume_request_iterator
  316. )
  317. consumption_thread.setDaemon(True)
  318. consumption_thread.start()
  319. def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
  320. """Calculates error string for RPC."""
  321. with rpc_state.condition:
  322. if rpc_state.code is None:
  323. return "<{} object>".format(class_name)
  324. elif rpc_state.code is grpc.StatusCode.OK:
  325. return _OK_RENDEZVOUS_REPR_FORMAT.format(
  326. class_name, rpc_state.code, rpc_state.details
  327. )
  328. else:
  329. return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
  330. class_name,
  331. rpc_state.code,
  332. rpc_state.details,
  333. rpc_state.debug_error_string,
  334. )
  335. class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
  336. """An RPC error not tied to the execution of a particular RPC.
  337. The RPC represented by the state object must not be in-progress or
  338. cancelled.
  339. Attributes:
  340. _state: An instance of _RPCState.
  341. """
  342. _state: _RPCState
  343. def __init__(self, state: _RPCState):
  344. with state.condition:
  345. self._state = _RPCState(
  346. (),
  347. copy.deepcopy(state.initial_metadata),
  348. copy.deepcopy(state.trailing_metadata),
  349. state.code,
  350. copy.deepcopy(state.details),
  351. )
  352. self._state.response = copy.copy(state.response)
  353. self._state.debug_error_string = copy.copy(state.debug_error_string)
  354. def initial_metadata(self) -> Optional[MetadataType]:
  355. return self._state.initial_metadata
  356. def trailing_metadata(self) -> Optional[MetadataType]:
  357. return self._state.trailing_metadata
  358. def code(self) -> Optional[grpc.StatusCode]:
  359. return self._state.code
  360. def details(self) -> Optional[str]:
  361. return _common.decode(self._state.details)
  362. def debug_error_string(self) -> Optional[str]:
  363. return _common.decode(self._state.debug_error_string)
  364. def _repr(self) -> str:
  365. return _rpc_state_string(self.__class__.__name__, self._state)
  366. def __repr__(self) -> str:
  367. return self._repr()
  368. def __str__(self) -> str:
  369. return self._repr()
  370. def cancel(self) -> bool:
  371. """See grpc.Future.cancel."""
  372. return False
  373. def cancelled(self) -> bool:
  374. """See grpc.Future.cancelled."""
  375. return False
  376. def running(self) -> bool:
  377. """See grpc.Future.running."""
  378. return False
  379. def done(self) -> bool:
  380. """See grpc.Future.done."""
  381. return True
  382. def result(
  383. self, timeout: Optional[float] = None
  384. ) -> Any: # pylint: disable=unused-argument
  385. """See grpc.Future.result."""
  386. raise self
  387. def exception(
  388. self, timeout: Optional[float] = None # pylint: disable=unused-argument
  389. ) -> Optional[Exception]:
  390. """See grpc.Future.exception."""
  391. return self
  392. def traceback(
  393. self, timeout: Optional[float] = None # pylint: disable=unused-argument
  394. ) -> Optional[types.TracebackType]:
  395. """See grpc.Future.traceback."""
  396. try:
  397. raise self
  398. except grpc.RpcError:
  399. return sys.exc_info()[2]
  400. def add_done_callback(
  401. self,
  402. fn: Callable[[grpc.Future], None],
  403. timeout: Optional[float] = None, # pylint: disable=unused-argument
  404. ) -> None:
  405. """See grpc.Future.add_done_callback."""
  406. fn(self)
  407. class _Rendezvous(grpc.RpcError, grpc.RpcContext):
  408. """An RPC iterator.
  409. Attributes:
  410. _state: An instance of _RPCState.
  411. _call: An instance of SegregatedCall or IntegratedCall.
  412. In either case, the _call object is expected to have operate, cancel,
  413. and next_event methods.
  414. _response_deserializer: A callable taking bytes and return a Python
  415. object.
  416. _deadline: A float representing the deadline of the RPC in seconds. Or
  417. possibly None, to represent an RPC with no deadline at all.
  418. """
  419. _state: _RPCState
  420. _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
  421. _response_deserializer: Optional[DeserializingFunction]
  422. _deadline: Optional[float]
  423. def __init__(
  424. self,
  425. state: _RPCState,
  426. call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall],
  427. response_deserializer: Optional[DeserializingFunction],
  428. deadline: Optional[float],
  429. ):
  430. super(_Rendezvous, self).__init__()
  431. self._state = state
  432. self._call = call
  433. self._response_deserializer = response_deserializer
  434. self._deadline = deadline
  435. def is_active(self) -> bool:
  436. """See grpc.RpcContext.is_active"""
  437. with self._state.condition:
  438. return self._state.code is None
  439. def time_remaining(self) -> Optional[float]:
  440. """See grpc.RpcContext.time_remaining"""
  441. with self._state.condition:
  442. if self._deadline is None:
  443. return None
  444. else:
  445. return max(self._deadline - time.time(), 0)
  446. def cancel(self) -> bool:
  447. """See grpc.RpcContext.cancel"""
  448. with self._state.condition:
  449. if self._state.code is None:
  450. code = grpc.StatusCode.CANCELLED
  451. details = "Locally cancelled by application!"
  452. self._call.cancel(
  453. _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
  454. )
  455. self._state.cancelled = True
  456. _abort(self._state, code, details)
  457. self._state.condition.notify_all()
  458. return True
  459. else:
  460. return False
  461. def add_callback(self, callback: NullaryCallbackType) -> bool:
  462. """See grpc.RpcContext.add_callback"""
  463. with self._state.condition:
  464. if self._state.callbacks is None:
  465. return False
  466. else:
  467. self._state.callbacks.append(callback)
  468. return True
  469. def __iter__(self):
  470. return self
  471. def next(self):
  472. return self._next()
  473. def __next__(self):
  474. return self._next()
  475. def _next(self):
  476. raise NotImplementedError()
  477. def debug_error_string(self) -> Optional[str]:
  478. raise NotImplementedError()
  479. def _repr(self) -> str:
  480. return _rpc_state_string(self.__class__.__name__, self._state)
  481. def __repr__(self) -> str:
  482. return self._repr()
  483. def __str__(self) -> str:
  484. return self._repr()
  485. def __del__(self) -> None:
  486. with self._state.condition:
  487. if self._state.code is None:
  488. self._state.code = grpc.StatusCode.CANCELLED
  489. self._state.details = "Cancelled upon garbage collection!"
  490. self._state.cancelled = True
  491. self._call.cancel(
  492. _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
  493. self._state.details,
  494. )
  495. self._state.condition.notify_all()
  496. class _SingleThreadedRendezvous(
  497. _Rendezvous, grpc.Call, grpc.Future
  498. ): # pylint: disable=too-many-ancestors
  499. """An RPC iterator operating entirely on a single thread.
  500. The __next__ method of _SingleThreadedRendezvous does not depend on the
  501. existence of any other thread, including the "channel spin thread".
  502. However, this means that its interface is entirely synchronous. So this
  503. class cannot completely fulfill the grpc.Future interface. The result,
  504. exception, and traceback methods will never block and will instead raise
  505. an exception if calling the method would result in blocking.
  506. This means that these methods are safe to call from add_done_callback
  507. handlers.
  508. """
  509. _state: _RPCState
  510. def _is_complete(self) -> bool:
  511. return self._state.code is not None
  512. def cancelled(self) -> bool:
  513. with self._state.condition:
  514. return self._state.cancelled
  515. def running(self) -> bool:
  516. with self._state.condition:
  517. return self._state.code is None
  518. def done(self) -> bool:
  519. with self._state.condition:
  520. return self._state.code is not None
  521. def result(self, timeout: Optional[float] = None) -> Any:
  522. """Returns the result of the computation or raises its exception.
  523. This method will never block. Instead, it will raise an exception
  524. if calling this method would otherwise result in blocking.
  525. Since this method will never block, any `timeout` argument passed will
  526. be ignored.
  527. """
  528. del timeout
  529. with self._state.condition:
  530. if not self._is_complete():
  531. raise grpc.experimental.UsageError(
  532. "_SingleThreadedRendezvous only supports result() when the"
  533. " RPC is complete."
  534. )
  535. if self._state.code is grpc.StatusCode.OK:
  536. return self._state.response
  537. elif self._state.cancelled:
  538. raise grpc.FutureCancelledError()
  539. else:
  540. raise self
  541. def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
  542. """Return the exception raised by the computation.
  543. This method will never block. Instead, it will raise an exception
  544. if calling this method would otherwise result in blocking.
  545. Since this method will never block, any `timeout` argument passed will
  546. be ignored.
  547. """
  548. del timeout
  549. with self._state.condition:
  550. if not self._is_complete():
  551. raise grpc.experimental.UsageError(
  552. "_SingleThreadedRendezvous only supports exception() when"
  553. " the RPC is complete."
  554. )
  555. if self._state.code is grpc.StatusCode.OK:
  556. return None
  557. elif self._state.cancelled:
  558. raise grpc.FutureCancelledError()
  559. else:
  560. return self
  561. def traceback(
  562. self, timeout: Optional[float] = None
  563. ) -> Optional[types.TracebackType]:
  564. """Access the traceback of the exception raised by the computation.
  565. This method will never block. Instead, it will raise an exception
  566. if calling this method would otherwise result in blocking.
  567. Since this method will never block, any `timeout` argument passed will
  568. be ignored.
  569. """
  570. del timeout
  571. with self._state.condition:
  572. if not self._is_complete():
  573. raise grpc.experimental.UsageError(
  574. "_SingleThreadedRendezvous only supports traceback() when"
  575. " the RPC is complete."
  576. )
  577. if self._state.code is grpc.StatusCode.OK:
  578. return None
  579. elif self._state.cancelled:
  580. raise grpc.FutureCancelledError()
  581. else:
  582. try:
  583. raise self
  584. except grpc.RpcError:
  585. return sys.exc_info()[2]
  586. def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
  587. with self._state.condition:
  588. if self._state.code is None:
  589. self._state.callbacks.append(functools.partial(fn, self))
  590. return
  591. fn(self)
  592. def initial_metadata(self) -> Optional[MetadataType]:
  593. """See grpc.Call.initial_metadata"""
  594. with self._state.condition:
  595. # NOTE(gnossen): Based on our initial call batch, we are guaranteed
  596. # to receive initial metadata before any messages.
  597. while self._state.initial_metadata is None:
  598. self._consume_next_event()
  599. return self._state.initial_metadata
  600. def trailing_metadata(self) -> Optional[MetadataType]:
  601. """See grpc.Call.trailing_metadata"""
  602. with self._state.condition:
  603. if self._state.trailing_metadata is None:
  604. raise grpc.experimental.UsageError(
  605. "Cannot get trailing metadata until RPC is completed."
  606. )
  607. return self._state.trailing_metadata
  608. def code(self) -> Optional[grpc.StatusCode]:
  609. """See grpc.Call.code"""
  610. with self._state.condition:
  611. if self._state.code is None:
  612. raise grpc.experimental.UsageError(
  613. "Cannot get code until RPC is completed."
  614. )
  615. return self._state.code
  616. def details(self) -> Optional[str]:
  617. """See grpc.Call.details"""
  618. with self._state.condition:
  619. if self._state.details is None:
  620. raise grpc.experimental.UsageError(
  621. "Cannot get details until RPC is completed."
  622. )
  623. return _common.decode(self._state.details)
  624. def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
  625. event = self._call.next_event()
  626. with self._state.condition:
  627. callbacks = _handle_event(
  628. event, self._state, self._response_deserializer
  629. )
  630. for callback in callbacks:
  631. # NOTE(gnossen): We intentionally allow exceptions to bubble up
  632. # to the user when running on a single thread.
  633. callback()
  634. return event
  635. def _next_response(self) -> Any:
  636. while True:
  637. self._consume_next_event()
  638. with self._state.condition:
  639. if self._state.response is not None:
  640. response = self._state.response
  641. self._state.response = None
  642. return response
  643. elif (
  644. cygrpc.OperationType.receive_message not in self._state.due
  645. ):
  646. if self._state.code is grpc.StatusCode.OK:
  647. raise StopIteration()
  648. elif self._state.code is not None:
  649. raise self
  650. def _next(self) -> Any:
  651. with self._state.condition:
  652. if self._state.code is None:
  653. # We tentatively add the operation as expected and remove
  654. # it if the enqueue operation fails. This allows us to guarantee that
  655. # if an event has been submitted to the core completion queue,
  656. # it is in `due`. If we waited until after a successful
  657. # enqueue operation then a signal could interrupt this
  658. # thread between the enqueue operation and the addition of the
  659. # operation to `due`. This would cause an exception on the
  660. # channel spin thread when the operation completes and no
  661. # corresponding operation would be present in state.due.
  662. # Note that, since `condition` is held through this block, there is
  663. # no data race on `due`.
  664. self._state.due.add(cygrpc.OperationType.receive_message)
  665. operating = self._call.operate(
  666. (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None
  667. )
  668. if not operating:
  669. self._state.due.remove(cygrpc.OperationType.receive_message)
  670. elif self._state.code is grpc.StatusCode.OK:
  671. raise StopIteration()
  672. else:
  673. raise self
  674. return self._next_response()
  675. def debug_error_string(self) -> Optional[str]:
  676. with self._state.condition:
  677. if self._state.debug_error_string is None:
  678. raise grpc.experimental.UsageError(
  679. "Cannot get debug error string until RPC is completed."
  680. )
  681. return _common.decode(self._state.debug_error_string)
  682. class _MultiThreadedRendezvous(
  683. _Rendezvous, grpc.Call, grpc.Future
  684. ): # pylint: disable=too-many-ancestors
  685. """An RPC iterator that depends on a channel spin thread.
  686. This iterator relies upon a per-channel thread running in the background,
  687. dequeueing events from the completion queue, and notifying threads waiting
  688. on the threading.Condition object in the _RPCState object.
  689. This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
  690. and to mediate a bidirection streaming RPC.
  691. """
  692. _state: _RPCState
  693. def initial_metadata(self) -> Optional[MetadataType]:
  694. """See grpc.Call.initial_metadata"""
  695. with self._state.condition:
  696. def _done():
  697. return self._state.initial_metadata is not None
  698. _common.wait(self._state.condition.wait, _done)
  699. return self._state.initial_metadata
  700. def trailing_metadata(self) -> Optional[MetadataType]:
  701. """See grpc.Call.trailing_metadata"""
  702. with self._state.condition:
  703. def _done():
  704. return self._state.trailing_metadata is not None
  705. _common.wait(self._state.condition.wait, _done)
  706. return self._state.trailing_metadata
  707. def code(self) -> Optional[grpc.StatusCode]:
  708. """See grpc.Call.code"""
  709. with self._state.condition:
  710. def _done():
  711. return self._state.code is not None
  712. _common.wait(self._state.condition.wait, _done)
  713. return self._state.code
  714. def details(self) -> Optional[str]:
  715. """See grpc.Call.details"""
  716. with self._state.condition:
  717. def _done():
  718. return self._state.details is not None
  719. _common.wait(self._state.condition.wait, _done)
  720. return _common.decode(self._state.details)
  721. def debug_error_string(self) -> Optional[str]:
  722. with self._state.condition:
  723. def _done():
  724. return self._state.debug_error_string is not None
  725. _common.wait(self._state.condition.wait, _done)
  726. return _common.decode(self._state.debug_error_string)
  727. def cancelled(self) -> bool:
  728. with self._state.condition:
  729. return self._state.cancelled
  730. def running(self) -> bool:
  731. with self._state.condition:
  732. return self._state.code is None
  733. def done(self) -> bool:
  734. with self._state.condition:
  735. return self._state.code is not None
  736. def _is_complete(self) -> bool:
  737. return self._state.code is not None
  738. def result(self, timeout: Optional[float] = None) -> Any:
  739. """Returns the result of the computation or raises its exception.
  740. See grpc.Future.result for the full API contract.
  741. """
  742. with self._state.condition:
  743. timed_out = _common.wait(
  744. self._state.condition.wait, self._is_complete, timeout=timeout
  745. )
  746. if timed_out:
  747. raise grpc.FutureTimeoutError()
  748. else:
  749. if self._state.code is grpc.StatusCode.OK:
  750. return self._state.response
  751. elif self._state.cancelled:
  752. raise grpc.FutureCancelledError()
  753. else:
  754. raise self
  755. def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
  756. """Return the exception raised by the computation.
  757. See grpc.Future.exception for the full API contract.
  758. """
  759. with self._state.condition:
  760. timed_out = _common.wait(
  761. self._state.condition.wait, self._is_complete, timeout=timeout
  762. )
  763. if timed_out:
  764. raise grpc.FutureTimeoutError()
  765. else:
  766. if self._state.code is grpc.StatusCode.OK:
  767. return None
  768. elif self._state.cancelled:
  769. raise grpc.FutureCancelledError()
  770. else:
  771. return self
  772. def traceback(
  773. self, timeout: Optional[float] = None
  774. ) -> Optional[types.TracebackType]:
  775. """Access the traceback of the exception raised by the computation.
  776. See grpc.future.traceback for the full API contract.
  777. """
  778. with self._state.condition:
  779. timed_out = _common.wait(
  780. self._state.condition.wait, self._is_complete, timeout=timeout
  781. )
  782. if timed_out:
  783. raise grpc.FutureTimeoutError()
  784. else:
  785. if self._state.code is grpc.StatusCode.OK:
  786. return None
  787. elif self._state.cancelled:
  788. raise grpc.FutureCancelledError()
  789. else:
  790. try:
  791. raise self
  792. except grpc.RpcError:
  793. return sys.exc_info()[2]
  794. def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
  795. with self._state.condition:
  796. if self._state.code is None:
  797. self._state.callbacks.append(functools.partial(fn, self))
  798. return
  799. fn(self)
  800. def _next(self) -> Any:
  801. with self._state.condition:
  802. if self._state.code is None:
  803. event_handler = _event_handler(
  804. self._state, self._response_deserializer
  805. )
  806. self._state.due.add(cygrpc.OperationType.receive_message)
  807. operating = self._call.operate(
  808. (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
  809. event_handler,
  810. )
  811. if not operating:
  812. self._state.due.remove(cygrpc.OperationType.receive_message)
  813. elif self._state.code is grpc.StatusCode.OK:
  814. raise StopIteration()
  815. else:
  816. raise self
  817. def _response_ready():
  818. return self._state.response is not None or (
  819. cygrpc.OperationType.receive_message not in self._state.due
  820. and self._state.code is not None
  821. )
  822. _common.wait(self._state.condition.wait, _response_ready)
  823. if self._state.response is not None:
  824. response = self._state.response
  825. self._state.response = None
  826. return response
  827. elif cygrpc.OperationType.receive_message not in self._state.due:
  828. if self._state.code is grpc.StatusCode.OK:
  829. raise StopIteration()
  830. elif self._state.code is not None:
  831. raise self
  832. def _start_unary_request(
  833. request: Any,
  834. timeout: Optional[float],
  835. request_serializer: SerializingFunction,
  836. ) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
  837. deadline = _deadline(timeout)
  838. serialized_request = _common.serialize(request, request_serializer)
  839. if serialized_request is None:
  840. state = _RPCState(
  841. (),
  842. (),
  843. (),
  844. grpc.StatusCode.INTERNAL,
  845. "Exception serializing request!",
  846. )
  847. error = _InactiveRpcError(state)
  848. return deadline, None, error
  849. else:
  850. return deadline, serialized_request, None
  851. def _end_unary_response_blocking(
  852. state: _RPCState,
  853. call: cygrpc.SegregatedCall,
  854. with_call: bool,
  855. deadline: Optional[float],
  856. ) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
  857. if state.code is grpc.StatusCode.OK:
  858. if with_call:
  859. rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
  860. return state.response, rendezvous
  861. else:
  862. return state.response
  863. else:
  864. raise _InactiveRpcError(state) # pytype: disable=not-instantiable
  865. def _stream_unary_invocation_operations(
  866. metadata: Optional[MetadataType], initial_metadata_flags: int
  867. ) -> Sequence[Sequence[cygrpc.Operation]]:
  868. return (
  869. (
  870. cygrpc.SendInitialMetadataOperation(
  871. metadata, initial_metadata_flags
  872. ),
  873. cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
  874. cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
  875. ),
  876. (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
  877. )
  878. def _stream_unary_invocation_operations_and_tags(
  879. metadata: Optional[MetadataType], initial_metadata_flags: int
  880. ) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
  881. return tuple(
  882. (
  883. operations,
  884. None,
  885. )
  886. for operations in _stream_unary_invocation_operations(
  887. metadata, initial_metadata_flags
  888. )
  889. )
  890. def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
  891. parent_deadline = cygrpc.get_deadline_from_context()
  892. if parent_deadline is None and user_deadline is None:
  893. return None
  894. elif parent_deadline is not None and user_deadline is None:
  895. return parent_deadline
  896. elif user_deadline is not None and parent_deadline is None:
  897. return user_deadline
  898. else:
  899. return min(parent_deadline, user_deadline)
  900. class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
  901. _channel: cygrpc.Channel
  902. _managed_call: IntegratedCallFactory
  903. _method: bytes
  904. _target: bytes
  905. _request_serializer: Optional[SerializingFunction]
  906. _response_deserializer: Optional[DeserializingFunction]
  907. _context: Any
  908. _registered_call_handle: Optional[int]
  909. __slots__ = [
  910. "_channel",
  911. "_managed_call",
  912. "_method",
  913. "_target",
  914. "_request_serializer",
  915. "_response_deserializer",
  916. "_context",
  917. ]
  918. # pylint: disable=too-many-arguments
  919. def __init__(
  920. self,
  921. channel: cygrpc.Channel,
  922. managed_call: IntegratedCallFactory,
  923. method: bytes,
  924. target: bytes,
  925. request_serializer: Optional[SerializingFunction],
  926. response_deserializer: Optional[DeserializingFunction],
  927. _registered_call_handle: Optional[int],
  928. ):
  929. self._channel = channel
  930. self._managed_call = managed_call
  931. self._method = method
  932. self._target = target
  933. self._request_serializer = request_serializer
  934. self._response_deserializer = response_deserializer
  935. self._context = cygrpc.build_census_context()
  936. self._registered_call_handle = _registered_call_handle
  937. def _prepare(
  938. self,
  939. request: Any,
  940. timeout: Optional[float],
  941. metadata: Optional[MetadataType],
  942. wait_for_ready: Optional[bool],
  943. compression: Optional[grpc.Compression],
  944. ) -> Tuple[
  945. Optional[_RPCState],
  946. Optional[Sequence[cygrpc.Operation]],
  947. Optional[float],
  948. Optional[grpc.RpcError],
  949. ]:
  950. deadline, serialized_request, rendezvous = _start_unary_request(
  951. request, timeout, self._request_serializer
  952. )
  953. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  954. wait_for_ready
  955. )
  956. augmented_metadata = _compression.augment_metadata(
  957. metadata, compression
  958. )
  959. if serialized_request is None:
  960. return None, None, None, rendezvous
  961. else:
  962. state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
  963. operations = (
  964. cygrpc.SendInitialMetadataOperation(
  965. augmented_metadata, initial_metadata_flags
  966. ),
  967. cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
  968. cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
  969. cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
  970. cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
  971. cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
  972. )
  973. return state, operations, deadline, None
  974. def _blocking(
  975. self,
  976. request: Any,
  977. timeout: Optional[float] = None,
  978. metadata: Optional[MetadataType] = None,
  979. credentials: Optional[grpc.CallCredentials] = None,
  980. wait_for_ready: Optional[bool] = None,
  981. compression: Optional[grpc.Compression] = None,
  982. ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
  983. state, operations, deadline, rendezvous = self._prepare(
  984. request, timeout, metadata, wait_for_ready, compression
  985. )
  986. if state is None:
  987. raise rendezvous # pylint: disable-msg=raising-bad-type
  988. else:
  989. state.rpc_start_time = time.perf_counter()
  990. state.method = _common.decode(self._method)
  991. state.target = _common.decode(self._target)
  992. call = self._channel.segregated_call(
  993. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  994. self._method,
  995. None,
  996. _determine_deadline(deadline),
  997. metadata,
  998. None if credentials is None else credentials._credentials,
  999. (
  1000. (
  1001. operations,
  1002. None,
  1003. ),
  1004. ),
  1005. self._context,
  1006. self._registered_call_handle,
  1007. )
  1008. event = call.next_event()
  1009. _handle_event(event, state, self._response_deserializer)
  1010. return state, call
  1011. def __call__(
  1012. self,
  1013. request: Any,
  1014. timeout: Optional[float] = None,
  1015. metadata: Optional[MetadataType] = None,
  1016. credentials: Optional[grpc.CallCredentials] = None,
  1017. wait_for_ready: Optional[bool] = None,
  1018. compression: Optional[grpc.Compression] = None,
  1019. ) -> Any:
  1020. state, call = self._blocking(
  1021. request, timeout, metadata, credentials, wait_for_ready, compression
  1022. )
  1023. return _end_unary_response_blocking(state, call, False, None)
  1024. def with_call(
  1025. self,
  1026. request: Any,
  1027. timeout: Optional[float] = None,
  1028. metadata: Optional[MetadataType] = None,
  1029. credentials: Optional[grpc.CallCredentials] = None,
  1030. wait_for_ready: Optional[bool] = None,
  1031. compression: Optional[grpc.Compression] = None,
  1032. ) -> Tuple[Any, grpc.Call]:
  1033. state, call = self._blocking(
  1034. request, timeout, metadata, credentials, wait_for_ready, compression
  1035. )
  1036. return _end_unary_response_blocking(state, call, True, None)
  1037. def future(
  1038. self,
  1039. request: Any,
  1040. timeout: Optional[float] = None,
  1041. metadata: Optional[MetadataType] = None,
  1042. credentials: Optional[grpc.CallCredentials] = None,
  1043. wait_for_ready: Optional[bool] = None,
  1044. compression: Optional[grpc.Compression] = None,
  1045. ) -> _MultiThreadedRendezvous:
  1046. state, operations, deadline, rendezvous = self._prepare(
  1047. request, timeout, metadata, wait_for_ready, compression
  1048. )
  1049. if state is None:
  1050. raise rendezvous # pylint: disable-msg=raising-bad-type
  1051. else:
  1052. event_handler = _event_handler(state, self._response_deserializer)
  1053. state.rpc_start_time = time.perf_counter()
  1054. state.method = _common.decode(self._method)
  1055. state.target = _common.decode(self._target)
  1056. call = self._managed_call(
  1057. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  1058. self._method,
  1059. None,
  1060. deadline,
  1061. metadata,
  1062. None if credentials is None else credentials._credentials,
  1063. (operations,),
  1064. event_handler,
  1065. self._context,
  1066. self._registered_call_handle,
  1067. )
  1068. return _MultiThreadedRendezvous(
  1069. state, call, self._response_deserializer, deadline
  1070. )
  1071. class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
  1072. _channel: cygrpc.Channel
  1073. _method: bytes
  1074. _target: bytes
  1075. _request_serializer: Optional[SerializingFunction]
  1076. _response_deserializer: Optional[DeserializingFunction]
  1077. _context: Any
  1078. _registered_call_handle: Optional[int]
  1079. __slots__ = [
  1080. "_channel",
  1081. "_method",
  1082. "_target",
  1083. "_request_serializer",
  1084. "_response_deserializer",
  1085. "_context",
  1086. ]
  1087. # pylint: disable=too-many-arguments
  1088. def __init__(
  1089. self,
  1090. channel: cygrpc.Channel,
  1091. method: bytes,
  1092. target: bytes,
  1093. request_serializer: SerializingFunction,
  1094. response_deserializer: DeserializingFunction,
  1095. _registered_call_handle: Optional[int],
  1096. ):
  1097. self._channel = channel
  1098. self._method = method
  1099. self._target = target
  1100. self._request_serializer = request_serializer
  1101. self._response_deserializer = response_deserializer
  1102. self._context = cygrpc.build_census_context()
  1103. self._registered_call_handle = _registered_call_handle
  1104. def __call__( # pylint: disable=too-many-locals
  1105. self,
  1106. request: Any,
  1107. timeout: Optional[float] = None,
  1108. metadata: Optional[MetadataType] = None,
  1109. credentials: Optional[grpc.CallCredentials] = None,
  1110. wait_for_ready: Optional[bool] = None,
  1111. compression: Optional[grpc.Compression] = None,
  1112. ) -> _SingleThreadedRendezvous:
  1113. deadline = _deadline(timeout)
  1114. serialized_request = _common.serialize(
  1115. request, self._request_serializer
  1116. )
  1117. if serialized_request is None:
  1118. state = _RPCState(
  1119. (),
  1120. (),
  1121. (),
  1122. grpc.StatusCode.INTERNAL,
  1123. "Exception serializing request!",
  1124. )
  1125. raise _InactiveRpcError(state)
  1126. state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
  1127. call_credentials = (
  1128. None if credentials is None else credentials._credentials
  1129. )
  1130. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  1131. wait_for_ready
  1132. )
  1133. augmented_metadata = _compression.augment_metadata(
  1134. metadata, compression
  1135. )
  1136. operations = (
  1137. (
  1138. cygrpc.SendInitialMetadataOperation(
  1139. augmented_metadata, initial_metadata_flags
  1140. ),
  1141. cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
  1142. cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
  1143. ),
  1144. (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
  1145. (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
  1146. )
  1147. operations_and_tags = tuple((ops, None) for ops in operations)
  1148. state.rpc_start_time = time.perf_counter()
  1149. state.method = _common.decode(self._method)
  1150. state.target = _common.decode(self._target)
  1151. call = self._channel.segregated_call(
  1152. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  1153. self._method,
  1154. None,
  1155. _determine_deadline(deadline),
  1156. metadata,
  1157. call_credentials,
  1158. operations_and_tags,
  1159. self._context,
  1160. self._registered_call_handle,
  1161. )
  1162. return _SingleThreadedRendezvous(
  1163. state, call, self._response_deserializer, deadline
  1164. )
  1165. class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
  1166. _channel: cygrpc.Channel
  1167. _managed_call: IntegratedCallFactory
  1168. _method: bytes
  1169. _target: bytes
  1170. _request_serializer: Optional[SerializingFunction]
  1171. _response_deserializer: Optional[DeserializingFunction]
  1172. _context: Any
  1173. _registered_call_handle: Optional[int]
  1174. __slots__ = [
  1175. "_channel",
  1176. "_managed_call",
  1177. "_method",
  1178. "_target",
  1179. "_request_serializer",
  1180. "_response_deserializer",
  1181. "_context",
  1182. ]
  1183. # pylint: disable=too-many-arguments
  1184. def __init__(
  1185. self,
  1186. channel: cygrpc.Channel,
  1187. managed_call: IntegratedCallFactory,
  1188. method: bytes,
  1189. target: bytes,
  1190. request_serializer: SerializingFunction,
  1191. response_deserializer: DeserializingFunction,
  1192. _registered_call_handle: Optional[int],
  1193. ):
  1194. self._channel = channel
  1195. self._managed_call = managed_call
  1196. self._method = method
  1197. self._target = target
  1198. self._request_serializer = request_serializer
  1199. self._response_deserializer = response_deserializer
  1200. self._context = cygrpc.build_census_context()
  1201. self._registered_call_handle = _registered_call_handle
  1202. def __call__( # pylint: disable=too-many-locals
  1203. self,
  1204. request: Any,
  1205. timeout: Optional[float] = None,
  1206. metadata: Optional[MetadataType] = None,
  1207. credentials: Optional[grpc.CallCredentials] = None,
  1208. wait_for_ready: Optional[bool] = None,
  1209. compression: Optional[grpc.Compression] = None,
  1210. ) -> _MultiThreadedRendezvous:
  1211. deadline, serialized_request, rendezvous = _start_unary_request(
  1212. request, timeout, self._request_serializer
  1213. )
  1214. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  1215. wait_for_ready
  1216. )
  1217. if serialized_request is None:
  1218. raise rendezvous # pylint: disable-msg=raising-bad-type
  1219. else:
  1220. augmented_metadata = _compression.augment_metadata(
  1221. metadata, compression
  1222. )
  1223. state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
  1224. operations = (
  1225. (
  1226. cygrpc.SendInitialMetadataOperation(
  1227. augmented_metadata, initial_metadata_flags
  1228. ),
  1229. cygrpc.SendMessageOperation(
  1230. serialized_request, _EMPTY_FLAGS
  1231. ),
  1232. cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
  1233. cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
  1234. ),
  1235. (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
  1236. )
  1237. state.rpc_start_time = time.perf_counter()
  1238. state.method = _common.decode(self._method)
  1239. state.target = _common.decode(self._target)
  1240. call = self._managed_call(
  1241. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  1242. self._method,
  1243. None,
  1244. _determine_deadline(deadline),
  1245. metadata,
  1246. None if credentials is None else credentials._credentials,
  1247. operations,
  1248. _event_handler(state, self._response_deserializer),
  1249. self._context,
  1250. self._registered_call_handle,
  1251. )
  1252. return _MultiThreadedRendezvous(
  1253. state, call, self._response_deserializer, deadline
  1254. )
  1255. class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
  1256. _channel: cygrpc.Channel
  1257. _managed_call: IntegratedCallFactory
  1258. _method: bytes
  1259. _target: bytes
  1260. _request_serializer: Optional[SerializingFunction]
  1261. _response_deserializer: Optional[DeserializingFunction]
  1262. _context: Any
  1263. _registered_call_handle: Optional[int]
  1264. __slots__ = [
  1265. "_channel",
  1266. "_managed_call",
  1267. "_method",
  1268. "_target",
  1269. "_request_serializer",
  1270. "_response_deserializer",
  1271. "_context",
  1272. ]
  1273. # pylint: disable=too-many-arguments
  1274. def __init__(
  1275. self,
  1276. channel: cygrpc.Channel,
  1277. managed_call: IntegratedCallFactory,
  1278. method: bytes,
  1279. target: bytes,
  1280. request_serializer: Optional[SerializingFunction],
  1281. response_deserializer: Optional[DeserializingFunction],
  1282. _registered_call_handle: Optional[int],
  1283. ):
  1284. self._channel = channel
  1285. self._managed_call = managed_call
  1286. self._method = method
  1287. self._target = target
  1288. self._request_serializer = request_serializer
  1289. self._response_deserializer = response_deserializer
  1290. self._context = cygrpc.build_census_context()
  1291. self._registered_call_handle = _registered_call_handle
  1292. def _blocking(
  1293. self,
  1294. request_iterator: Iterator,
  1295. timeout: Optional[float],
  1296. metadata: Optional[MetadataType],
  1297. credentials: Optional[grpc.CallCredentials],
  1298. wait_for_ready: Optional[bool],
  1299. compression: Optional[grpc.Compression],
  1300. ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
  1301. deadline = _deadline(timeout)
  1302. state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
  1303. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  1304. wait_for_ready
  1305. )
  1306. augmented_metadata = _compression.augment_metadata(
  1307. metadata, compression
  1308. )
  1309. state.rpc_start_time = time.perf_counter()
  1310. state.method = _common.decode(self._method)
  1311. state.target = _common.decode(self._target)
  1312. call = self._channel.segregated_call(
  1313. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  1314. self._method,
  1315. None,
  1316. _determine_deadline(deadline),
  1317. augmented_metadata,
  1318. None if credentials is None else credentials._credentials,
  1319. _stream_unary_invocation_operations_and_tags(
  1320. augmented_metadata, initial_metadata_flags
  1321. ),
  1322. self._context,
  1323. self._registered_call_handle,
  1324. )
  1325. _consume_request_iterator(
  1326. request_iterator, state, call, self._request_serializer, None
  1327. )
  1328. while True:
  1329. event = call.next_event()
  1330. with state.condition:
  1331. _handle_event(event, state, self._response_deserializer)
  1332. state.condition.notify_all()
  1333. if not state.due:
  1334. break
  1335. return state, call
  1336. def __call__(
  1337. self,
  1338. request_iterator: Iterator,
  1339. timeout: Optional[float] = None,
  1340. metadata: Optional[MetadataType] = None,
  1341. credentials: Optional[grpc.CallCredentials] = None,
  1342. wait_for_ready: Optional[bool] = None,
  1343. compression: Optional[grpc.Compression] = None,
  1344. ) -> Any:
  1345. state, call = self._blocking(
  1346. request_iterator,
  1347. timeout,
  1348. metadata,
  1349. credentials,
  1350. wait_for_ready,
  1351. compression,
  1352. )
  1353. return _end_unary_response_blocking(state, call, False, None)
  1354. def with_call(
  1355. self,
  1356. request_iterator: Iterator,
  1357. timeout: Optional[float] = None,
  1358. metadata: Optional[MetadataType] = None,
  1359. credentials: Optional[grpc.CallCredentials] = None,
  1360. wait_for_ready: Optional[bool] = None,
  1361. compression: Optional[grpc.Compression] = None,
  1362. ) -> Tuple[Any, grpc.Call]:
  1363. state, call = self._blocking(
  1364. request_iterator,
  1365. timeout,
  1366. metadata,
  1367. credentials,
  1368. wait_for_ready,
  1369. compression,
  1370. )
  1371. return _end_unary_response_blocking(state, call, True, None)
  1372. def future(
  1373. self,
  1374. request_iterator: Iterator,
  1375. timeout: Optional[float] = None,
  1376. metadata: Optional[MetadataType] = None,
  1377. credentials: Optional[grpc.CallCredentials] = None,
  1378. wait_for_ready: Optional[bool] = None,
  1379. compression: Optional[grpc.Compression] = None,
  1380. ) -> _MultiThreadedRendezvous:
  1381. deadline = _deadline(timeout)
  1382. state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
  1383. event_handler = _event_handler(state, self._response_deserializer)
  1384. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  1385. wait_for_ready
  1386. )
  1387. augmented_metadata = _compression.augment_metadata(
  1388. metadata, compression
  1389. )
  1390. state.rpc_start_time = time.perf_counter()
  1391. state.method = _common.decode(self._method)
  1392. state.target = _common.decode(self._target)
  1393. call = self._managed_call(
  1394. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  1395. self._method,
  1396. None,
  1397. deadline,
  1398. augmented_metadata,
  1399. None if credentials is None else credentials._credentials,
  1400. _stream_unary_invocation_operations(
  1401. metadata, initial_metadata_flags
  1402. ),
  1403. event_handler,
  1404. self._context,
  1405. self._registered_call_handle,
  1406. )
  1407. _consume_request_iterator(
  1408. request_iterator,
  1409. state,
  1410. call,
  1411. self._request_serializer,
  1412. event_handler,
  1413. )
  1414. return _MultiThreadedRendezvous(
  1415. state, call, self._response_deserializer, deadline
  1416. )
  1417. class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
  1418. _channel: cygrpc.Channel
  1419. _managed_call: IntegratedCallFactory
  1420. _method: bytes
  1421. _target: bytes
  1422. _request_serializer: Optional[SerializingFunction]
  1423. _response_deserializer: Optional[DeserializingFunction]
  1424. _context: Any
  1425. _registered_call_handle: Optional[int]
  1426. __slots__ = [
  1427. "_channel",
  1428. "_managed_call",
  1429. "_method",
  1430. "_target",
  1431. "_request_serializer",
  1432. "_response_deserializer",
  1433. "_context",
  1434. ]
  1435. # pylint: disable=too-many-arguments
  1436. def __init__(
  1437. self,
  1438. channel: cygrpc.Channel,
  1439. managed_call: IntegratedCallFactory,
  1440. method: bytes,
  1441. target: bytes,
  1442. request_serializer: Optional[SerializingFunction],
  1443. response_deserializer: Optional[DeserializingFunction],
  1444. _registered_call_handle: Optional[int],
  1445. ):
  1446. self._channel = channel
  1447. self._managed_call = managed_call
  1448. self._method = method
  1449. self._target = target
  1450. self._request_serializer = request_serializer
  1451. self._response_deserializer = response_deserializer
  1452. self._context = cygrpc.build_census_context()
  1453. self._registered_call_handle = _registered_call_handle
  1454. def __call__(
  1455. self,
  1456. request_iterator: Iterator,
  1457. timeout: Optional[float] = None,
  1458. metadata: Optional[MetadataType] = None,
  1459. credentials: Optional[grpc.CallCredentials] = None,
  1460. wait_for_ready: Optional[bool] = None,
  1461. compression: Optional[grpc.Compression] = None,
  1462. ) -> _MultiThreadedRendezvous:
  1463. deadline = _deadline(timeout)
  1464. state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
  1465. initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
  1466. wait_for_ready
  1467. )
  1468. augmented_metadata = _compression.augment_metadata(
  1469. metadata, compression
  1470. )
  1471. operations = (
  1472. (
  1473. cygrpc.SendInitialMetadataOperation(
  1474. augmented_metadata, initial_metadata_flags
  1475. ),
  1476. cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
  1477. ),
  1478. (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
  1479. )
  1480. event_handler = _event_handler(state, self._response_deserializer)
  1481. state.rpc_start_time = time.perf_counter()
  1482. state.method = _common.decode(self._method)
  1483. state.target = _common.decode(self._target)
  1484. call = self._managed_call(
  1485. cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
  1486. self._method,
  1487. None,
  1488. _determine_deadline(deadline),
  1489. augmented_metadata,
  1490. None if credentials is None else credentials._credentials,
  1491. operations,
  1492. event_handler,
  1493. self._context,
  1494. self._registered_call_handle,
  1495. )
  1496. _consume_request_iterator(
  1497. request_iterator,
  1498. state,
  1499. call,
  1500. self._request_serializer,
  1501. event_handler,
  1502. )
  1503. return _MultiThreadedRendezvous(
  1504. state, call, self._response_deserializer, deadline
  1505. )
  1506. class _InitialMetadataFlags(int):
  1507. """Stores immutable initial metadata flags"""
  1508. def __new__(cls, value: int = _EMPTY_FLAGS):
  1509. value &= cygrpc.InitialMetadataFlags.used_mask
  1510. return super(_InitialMetadataFlags, cls).__new__(cls, value)
  1511. def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
  1512. if wait_for_ready is not None:
  1513. if wait_for_ready:
  1514. return self.__class__(
  1515. self
  1516. | cygrpc.InitialMetadataFlags.wait_for_ready
  1517. | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
  1518. )
  1519. elif not wait_for_ready:
  1520. return self.__class__(
  1521. self & ~cygrpc.InitialMetadataFlags.wait_for_ready
  1522. | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
  1523. )
  1524. return self
  1525. class _ChannelCallState(object):
  1526. channel: cygrpc.Channel
  1527. managed_calls: int
  1528. threading: bool
  1529. def __init__(self, channel: cygrpc.Channel):
  1530. self.lock = threading.Lock()
  1531. self.channel = channel
  1532. self.managed_calls = 0
  1533. self.threading = False
  1534. def reset_postfork_child(self) -> None:
  1535. self.managed_calls = 0
  1536. def __del__(self):
  1537. try:
  1538. self.channel.close(
  1539. cygrpc.StatusCode.cancelled, "Channel deallocated!"
  1540. )
  1541. except (TypeError, AttributeError):
  1542. pass
  1543. def _run_channel_spin_thread(state: _ChannelCallState) -> None:
  1544. def channel_spin():
  1545. while True:
  1546. cygrpc.block_if_fork_in_progress(state)
  1547. event = state.channel.next_call_event()
  1548. if event.completion_type == cygrpc.CompletionType.queue_timeout:
  1549. continue
  1550. call_completed = event.tag(event)
  1551. if call_completed:
  1552. with state.lock:
  1553. state.managed_calls -= 1
  1554. if state.managed_calls == 0:
  1555. return
  1556. channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
  1557. channel_spin_thread.setDaemon(True)
  1558. channel_spin_thread.start()
  1559. def _channel_managed_call_management(state: _ChannelCallState):
  1560. # pylint: disable=too-many-arguments
  1561. def create(
  1562. flags: int,
  1563. method: bytes,
  1564. host: Optional[str],
  1565. deadline: Optional[float],
  1566. metadata: Optional[MetadataType],
  1567. credentials: Optional[cygrpc.CallCredentials],
  1568. operations: Sequence[Sequence[cygrpc.Operation]],
  1569. event_handler: UserTag,
  1570. context: Any,
  1571. _registered_call_handle: Optional[int],
  1572. ) -> cygrpc.IntegratedCall:
  1573. """Creates a cygrpc.IntegratedCall.
  1574. Args:
  1575. flags: An integer bitfield of call flags.
  1576. method: The RPC method.
  1577. host: A host string for the created call.
  1578. deadline: A float to be the deadline of the created call or None if
  1579. the call is to have an infinite deadline.
  1580. metadata: The metadata for the call or None.
  1581. credentials: A cygrpc.CallCredentials or None.
  1582. operations: A sequence of sequences of cygrpc.Operations to be
  1583. started on the call.
  1584. event_handler: A behavior to call to handle the events resultant from
  1585. the operations on the call.
  1586. context: Context object for distributed tracing.
  1587. _registered_call_handle: An int representing the call handle of the
  1588. method, or None if the method is not registered.
  1589. Returns:
  1590. A cygrpc.IntegratedCall with which to conduct an RPC.
  1591. """
  1592. operations_and_tags = tuple(
  1593. (
  1594. operation,
  1595. event_handler,
  1596. )
  1597. for operation in operations
  1598. )
  1599. with state.lock:
  1600. call = state.channel.integrated_call(
  1601. flags,
  1602. method,
  1603. host,
  1604. deadline,
  1605. metadata,
  1606. credentials,
  1607. operations_and_tags,
  1608. context,
  1609. _registered_call_handle,
  1610. )
  1611. if state.managed_calls == 0:
  1612. state.managed_calls = 1
  1613. _run_channel_spin_thread(state)
  1614. else:
  1615. state.managed_calls += 1
  1616. return call
  1617. return create
  1618. class _ChannelConnectivityState(object):
  1619. lock: threading.RLock
  1620. channel: grpc.Channel
  1621. polling: bool
  1622. connectivity: grpc.ChannelConnectivity
  1623. try_to_connect: bool
  1624. # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
  1625. callbacks_and_connectivities: List[
  1626. Sequence[
  1627. Union[
  1628. Callable[[grpc.ChannelConnectivity], None],
  1629. Optional[grpc.ChannelConnectivity],
  1630. ]
  1631. ]
  1632. ]
  1633. delivering: bool
  1634. def __init__(self, channel: grpc.Channel):
  1635. self.lock = threading.RLock()
  1636. self.channel = channel
  1637. self.polling = False
  1638. self.connectivity = None
  1639. self.try_to_connect = False
  1640. self.callbacks_and_connectivities = []
  1641. self.delivering = False
  1642. def reset_postfork_child(self) -> None:
  1643. self.polling = False
  1644. self.connectivity = None
  1645. self.try_to_connect = False
  1646. self.callbacks_and_connectivities = []
  1647. self.delivering = False
  1648. def _deliveries(
  1649. state: _ChannelConnectivityState,
  1650. ) -> List[Callable[[grpc.ChannelConnectivity], None]]:
  1651. callbacks_needing_update = []
  1652. for callback_and_connectivity in state.callbacks_and_connectivities:
  1653. callback, callback_connectivity = callback_and_connectivity
  1654. if callback_connectivity is not state.connectivity:
  1655. callbacks_needing_update.append(callback)
  1656. callback_and_connectivity[1] = state.connectivity
  1657. return callbacks_needing_update
  1658. def _deliver(
  1659. state: _ChannelConnectivityState,
  1660. initial_connectivity: grpc.ChannelConnectivity,
  1661. initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
  1662. ) -> None:
  1663. connectivity = initial_connectivity
  1664. callbacks = initial_callbacks
  1665. while True:
  1666. for callback in callbacks:
  1667. cygrpc.block_if_fork_in_progress(state)
  1668. try:
  1669. callback(connectivity)
  1670. except Exception: # pylint: disable=broad-except
  1671. _LOGGER.exception(
  1672. _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE
  1673. )
  1674. with state.lock:
  1675. callbacks = _deliveries(state)
  1676. if callbacks:
  1677. connectivity = state.connectivity
  1678. else:
  1679. state.delivering = False
  1680. return
  1681. def _spawn_delivery(
  1682. state: _ChannelConnectivityState,
  1683. callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
  1684. ) -> None:
  1685. delivering_thread = cygrpc.ForkManagedThread(
  1686. target=_deliver,
  1687. args=(
  1688. state,
  1689. state.connectivity,
  1690. callbacks,
  1691. ),
  1692. )
  1693. delivering_thread.setDaemon(True)
  1694. delivering_thread.start()
  1695. state.delivering = True
  1696. # NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
  1697. def _poll_connectivity(
  1698. state: _ChannelConnectivityState,
  1699. channel: grpc.Channel,
  1700. initial_try_to_connect: bool,
  1701. ) -> None:
  1702. try_to_connect = initial_try_to_connect
  1703. connectivity = channel.check_connectivity_state(try_to_connect)
  1704. with state.lock:
  1705. state.connectivity = (
  1706. _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
  1707. connectivity
  1708. ]
  1709. )
  1710. callbacks = tuple(
  1711. callback for callback, _ in state.callbacks_and_connectivities
  1712. )
  1713. for callback_and_connectivity in state.callbacks_and_connectivities:
  1714. callback_and_connectivity[1] = state.connectivity
  1715. if callbacks:
  1716. _spawn_delivery(state, callbacks)
  1717. while True:
  1718. event = channel.watch_connectivity_state(
  1719. connectivity, time.time() + 0.2
  1720. )
  1721. cygrpc.block_if_fork_in_progress(state)
  1722. with state.lock:
  1723. if (
  1724. not state.callbacks_and_connectivities
  1725. and not state.try_to_connect
  1726. ):
  1727. state.polling = False
  1728. state.connectivity = None
  1729. break
  1730. try_to_connect = state.try_to_connect
  1731. state.try_to_connect = False
  1732. if event.success or try_to_connect:
  1733. connectivity = channel.check_connectivity_state(try_to_connect)
  1734. with state.lock:
  1735. state.connectivity = (
  1736. _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
  1737. connectivity
  1738. ]
  1739. )
  1740. if not state.delivering:
  1741. callbacks = _deliveries(state)
  1742. if callbacks:
  1743. _spawn_delivery(state, callbacks)
  1744. def _subscribe(
  1745. state: _ChannelConnectivityState,
  1746. callback: Callable[[grpc.ChannelConnectivity], None],
  1747. try_to_connect: bool,
  1748. ) -> None:
  1749. with state.lock:
  1750. if not state.callbacks_and_connectivities and not state.polling:
  1751. polling_thread = cygrpc.ForkManagedThread(
  1752. target=_poll_connectivity,
  1753. args=(state, state.channel, bool(try_to_connect)),
  1754. )
  1755. polling_thread.setDaemon(True)
  1756. polling_thread.start()
  1757. state.polling = True
  1758. state.callbacks_and_connectivities.append([callback, None])
  1759. elif not state.delivering and state.connectivity is not None:
  1760. _spawn_delivery(state, (callback,))
  1761. state.try_to_connect |= bool(try_to_connect)
  1762. state.callbacks_and_connectivities.append(
  1763. [callback, state.connectivity]
  1764. )
  1765. else:
  1766. state.try_to_connect |= bool(try_to_connect)
  1767. state.callbacks_and_connectivities.append([callback, None])
  1768. def _unsubscribe(
  1769. state: _ChannelConnectivityState,
  1770. callback: Callable[[grpc.ChannelConnectivity], None],
  1771. ) -> None:
  1772. with state.lock:
  1773. for index, (subscribed_callback, unused_connectivity) in enumerate(
  1774. state.callbacks_and_connectivities
  1775. ):
  1776. if callback == subscribed_callback:
  1777. state.callbacks_and_connectivities.pop(index)
  1778. break
  1779. def _augment_options(
  1780. base_options: Sequence[ChannelArgumentType],
  1781. compression: Optional[grpc.Compression],
  1782. ) -> Sequence[ChannelArgumentType]:
  1783. compression_option = _compression.create_channel_option(compression)
  1784. return (
  1785. tuple(base_options)
  1786. + compression_option
  1787. + (
  1788. (
  1789. cygrpc.ChannelArgKey.primary_user_agent_string,
  1790. _USER_AGENT,
  1791. ),
  1792. )
  1793. )
  1794. def _separate_channel_options(
  1795. options: Sequence[ChannelArgumentType],
  1796. ) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
  1797. """Separates core channel options from Python channel options."""
  1798. core_options = []
  1799. python_options = []
  1800. for pair in options:
  1801. if (
  1802. pair[0]
  1803. == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
  1804. ):
  1805. python_options.append(pair)
  1806. else:
  1807. core_options.append(pair)
  1808. return python_options, core_options
  1809. class Channel(grpc.Channel):
  1810. """A cygrpc.Channel-backed implementation of grpc.Channel."""
  1811. _single_threaded_unary_stream: bool
  1812. _channel: cygrpc.Channel
  1813. _call_state: _ChannelCallState
  1814. _connectivity_state: _ChannelConnectivityState
  1815. _target: str
  1816. _registered_call_handles: Dict[str, int]
  1817. def __init__(
  1818. self,
  1819. target: str,
  1820. options: Sequence[ChannelArgumentType],
  1821. credentials: Optional[grpc.ChannelCredentials],
  1822. compression: Optional[grpc.Compression],
  1823. ):
  1824. """Constructor.
  1825. Args:
  1826. target: The target to which to connect.
  1827. options: Configuration options for the channel.
  1828. credentials: A cygrpc.ChannelCredentials or None.
  1829. compression: An optional value indicating the compression method to be
  1830. used over the lifetime of the channel.
  1831. """
  1832. python_options, core_options = _separate_channel_options(options)
  1833. self._single_threaded_unary_stream = (
  1834. _DEFAULT_SINGLE_THREADED_UNARY_STREAM
  1835. )
  1836. self._process_python_options(python_options)
  1837. self._channel = cygrpc.Channel(
  1838. _common.encode(target),
  1839. _augment_options(core_options, compression),
  1840. credentials,
  1841. )
  1842. self._target = target
  1843. self._call_state = _ChannelCallState(self._channel)
  1844. self._connectivity_state = _ChannelConnectivityState(self._channel)
  1845. cygrpc.fork_register_channel(self)
  1846. if cygrpc.g_gevent_activated:
  1847. cygrpc.gevent_increment_channel_count()
  1848. def _get_registered_call_handle(self, method: str) -> int:
  1849. """
  1850. Get the registered call handle for a method.
  1851. This is a semi-private method. It is intended for use only by gRPC generated code.
  1852. This method is not thread-safe.
  1853. Args:
  1854. method: Required, the method name for the RPC.
  1855. Returns:
  1856. The registered call handle pointer in the form of a Python Long.
  1857. """
  1858. return self._channel.get_registered_call_handle(_common.encode(method))
  1859. def _process_python_options(
  1860. self, python_options: Sequence[ChannelArgumentType]
  1861. ) -> None:
  1862. """Sets channel attributes according to python-only channel options."""
  1863. for pair in python_options:
  1864. if (
  1865. pair[0]
  1866. == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
  1867. ):
  1868. self._single_threaded_unary_stream = True
  1869. def subscribe(
  1870. self,
  1871. callback: Callable[[grpc.ChannelConnectivity], None],
  1872. try_to_connect: Optional[bool] = None,
  1873. ) -> None:
  1874. _subscribe(self._connectivity_state, callback, try_to_connect)
  1875. def unsubscribe(
  1876. self, callback: Callable[[grpc.ChannelConnectivity], None]
  1877. ) -> None:
  1878. _unsubscribe(self._connectivity_state, callback)
  1879. # pylint: disable=arguments-differ
  1880. def unary_unary(
  1881. self,
  1882. method: str,
  1883. request_serializer: Optional[SerializingFunction] = None,
  1884. response_deserializer: Optional[DeserializingFunction] = None,
  1885. _registered_method: Optional[bool] = False,
  1886. ) -> grpc.UnaryUnaryMultiCallable:
  1887. _registered_call_handle = None
  1888. if _registered_method:
  1889. _registered_call_handle = self._get_registered_call_handle(method)
  1890. return _UnaryUnaryMultiCallable(
  1891. self._channel,
  1892. _channel_managed_call_management(self._call_state),
  1893. _common.encode(method),
  1894. _common.encode(self._target),
  1895. request_serializer,
  1896. response_deserializer,
  1897. _registered_call_handle,
  1898. )
  1899. # pylint: disable=arguments-differ
  1900. def unary_stream(
  1901. self,
  1902. method: str,
  1903. request_serializer: Optional[SerializingFunction] = None,
  1904. response_deserializer: Optional[DeserializingFunction] = None,
  1905. _registered_method: Optional[bool] = False,
  1906. ) -> grpc.UnaryStreamMultiCallable:
  1907. _registered_call_handle = None
  1908. if _registered_method:
  1909. _registered_call_handle = self._get_registered_call_handle(method)
  1910. # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
  1911. # on a single Python thread results in an appreciable speed-up. However,
  1912. # due to slight differences in capability, the multi-threaded variant
  1913. # remains the default.
  1914. if self._single_threaded_unary_stream:
  1915. return _SingleThreadedUnaryStreamMultiCallable(
  1916. self._channel,
  1917. _common.encode(method),
  1918. _common.encode(self._target),
  1919. request_serializer,
  1920. response_deserializer,
  1921. _registered_call_handle,
  1922. )
  1923. else:
  1924. return _UnaryStreamMultiCallable(
  1925. self._channel,
  1926. _channel_managed_call_management(self._call_state),
  1927. _common.encode(method),
  1928. _common.encode(self._target),
  1929. request_serializer,
  1930. response_deserializer,
  1931. _registered_call_handle,
  1932. )
  1933. # pylint: disable=arguments-differ
  1934. def stream_unary(
  1935. self,
  1936. method: str,
  1937. request_serializer: Optional[SerializingFunction] = None,
  1938. response_deserializer: Optional[DeserializingFunction] = None,
  1939. _registered_method: Optional[bool] = False,
  1940. ) -> grpc.StreamUnaryMultiCallable:
  1941. _registered_call_handle = None
  1942. if _registered_method:
  1943. _registered_call_handle = self._get_registered_call_handle(method)
  1944. return _StreamUnaryMultiCallable(
  1945. self._channel,
  1946. _channel_managed_call_management(self._call_state),
  1947. _common.encode(method),
  1948. _common.encode(self._target),
  1949. request_serializer,
  1950. response_deserializer,
  1951. _registered_call_handle,
  1952. )
  1953. # pylint: disable=arguments-differ
  1954. def stream_stream(
  1955. self,
  1956. method: str,
  1957. request_serializer: Optional[SerializingFunction] = None,
  1958. response_deserializer: Optional[DeserializingFunction] = None,
  1959. _registered_method: Optional[bool] = False,
  1960. ) -> grpc.StreamStreamMultiCallable:
  1961. _registered_call_handle = None
  1962. if _registered_method:
  1963. _registered_call_handle = self._get_registered_call_handle(method)
  1964. return _StreamStreamMultiCallable(
  1965. self._channel,
  1966. _channel_managed_call_management(self._call_state),
  1967. _common.encode(method),
  1968. _common.encode(self._target),
  1969. request_serializer,
  1970. response_deserializer,
  1971. _registered_call_handle,
  1972. )
  1973. def _unsubscribe_all(self) -> None:
  1974. state = self._connectivity_state
  1975. if state:
  1976. with state.lock:
  1977. del state.callbacks_and_connectivities[:]
  1978. def _close(self) -> None:
  1979. self._unsubscribe_all()
  1980. self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!")
  1981. cygrpc.fork_unregister_channel(self)
  1982. if cygrpc.g_gevent_activated:
  1983. cygrpc.gevent_decrement_channel_count()
  1984. def _close_on_fork(self) -> None:
  1985. self._unsubscribe_all()
  1986. self._channel.close_on_fork(
  1987. cygrpc.StatusCode.cancelled, "Channel closed due to fork"
  1988. )
  1989. def __enter__(self):
  1990. return self
  1991. def __exit__(self, exc_type, exc_val, exc_tb):
  1992. self._close()
  1993. return False
  1994. def close(self) -> None:
  1995. self._close()
  1996. def __del__(self):
  1997. # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
  1998. # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
  1999. # here (or more likely, call self._close() here). We don't do this today
  2000. # because many valid use cases today allow the channel to be deleted
  2001. # immediately after stubs are created. After a sufficient period of time
  2002. # has passed for all users to be trusted to freeze out to their channels
  2003. # for as long as they are in use and to close them after using them,
  2004. # then deletion of this grpc._channel.Channel instance can be made to
  2005. # effect closure of the underlying cygrpc.Channel instance.
  2006. try:
  2007. self._unsubscribe_all()
  2008. except: # pylint: disable=bare-except
  2009. # Exceptions in __del__ are ignored by Python anyway, but they can
  2010. # keep spamming logs. Just silence them.
  2011. pass