_tasks.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. from __future__ import annotations
  2. import sys
  3. from abc import ABCMeta, abstractmethod
  4. from collections.abc import Awaitable, Callable
  5. from types import TracebackType
  6. from typing import TYPE_CHECKING, Any, Protocol, TypeVar, overload
  7. if sys.version_info >= (3, 11):
  8. from typing import TypeVarTuple, Unpack
  9. else:
  10. from typing_extensions import TypeVarTuple, Unpack
  11. if TYPE_CHECKING:
  12. from .._core._tasks import CancelScope
  13. T_Retval = TypeVar("T_Retval")
  14. T_contra = TypeVar("T_contra", contravariant=True)
  15. PosArgsT = TypeVarTuple("PosArgsT")
  16. class TaskStatus(Protocol[T_contra]):
  17. @overload
  18. def started(self: TaskStatus[None]) -> None: ...
  19. @overload
  20. def started(self, value: T_contra) -> None: ...
  21. def started(self, value: T_contra | None = None) -> None:
  22. """
  23. Signal that the task has started.
  24. :param value: object passed back to the starter of the task
  25. """
  26. class TaskGroup(metaclass=ABCMeta):
  27. """
  28. Groups several asynchronous tasks together.
  29. :ivar cancel_scope: the cancel scope inherited by all child tasks
  30. :vartype cancel_scope: CancelScope
  31. .. note:: On asyncio, support for eager task factories is considered to be
  32. **experimental**. In particular, they don't follow the usual semantics of new
  33. tasks being scheduled on the next iteration of the event loop, and may thus
  34. cause unexpected behavior in code that wasn't written with such semantics in
  35. mind.
  36. """
  37. cancel_scope: CancelScope
  38. @abstractmethod
  39. def start_soon(
  40. self,
  41. func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
  42. *args: Unpack[PosArgsT],
  43. name: object = None,
  44. ) -> None:
  45. """
  46. Start a new task in this task group.
  47. :param func: a coroutine function
  48. :param args: positional arguments to call the function with
  49. :param name: name of the task, for the purposes of introspection and debugging
  50. .. versionadded:: 3.0
  51. """
  52. @abstractmethod
  53. async def start(
  54. self,
  55. func: Callable[..., Awaitable[Any]],
  56. *args: object,
  57. name: object = None,
  58. ) -> Any:
  59. """
  60. Start a new task and wait until it signals for readiness.
  61. The target callable must accept a keyword argument ``task_status`` (of type
  62. :class:`TaskStatus`). Awaiting on this method will return whatever was passed to
  63. ``task_status.started()`` (``None`` by default).
  64. .. note:: The :class:`TaskStatus` class is generic, and the type argument should
  65. indicate the type of the value that will be passed to
  66. ``task_status.started()``.
  67. :param func: a coroutine function that accepts the ``task_status`` keyword
  68. argument
  69. :param args: positional arguments to call the function with
  70. :param name: an optional name for the task, for introspection and debugging
  71. :return: the value passed to ``task_status.started()``
  72. :raises RuntimeError: if the task finishes without calling
  73. ``task_status.started()``
  74. .. seealso:: :ref:`start_initialize`
  75. .. versionadded:: 3.0
  76. """
  77. @abstractmethod
  78. async def __aenter__(self) -> TaskGroup:
  79. """Enter the task group context and allow starting new tasks."""
  80. @abstractmethod
  81. async def __aexit__(
  82. self,
  83. exc_type: type[BaseException] | None,
  84. exc_val: BaseException | None,
  85. exc_tb: TracebackType | None,
  86. ) -> bool:
  87. """Exit the task group context waiting for all tasks to finish."""