queues.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. # Copyright 2015 The Tornado Authors
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. # not use this file except in compliance with the License. You may obtain
  5. # a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. # License for the specific language governing permissions and limitations
  13. # under the License.
  14. """Asynchronous queues for coroutines. These classes are very similar
  15. to those provided in the standard library's `asyncio package
  16. <https://docs.python.org/3/library/asyncio-queue.html>`_.
  17. .. warning::
  18. Unlike the standard library's `queue` module, the classes defined here
  19. are *not* thread-safe. To use these queues from another thread,
  20. use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread
  21. before calling any queue methods.
  22. """
  23. import collections
  24. import datetime
  25. import heapq
  26. from tornado import gen, ioloop
  27. from tornado.concurrent import Future, future_set_result_unless_cancelled
  28. from tornado.locks import Event
  29. from typing import Union, TypeVar, Generic, Awaitable
  30. import typing
  31. if typing.TYPE_CHECKING:
  32. from typing import Deque, Tuple, List, Any # noqa: F401
  33. _T = TypeVar("_T")
  34. __all__ = ["Queue", "PriorityQueue", "LifoQueue", "QueueFull", "QueueEmpty"]
  35. class QueueEmpty(Exception):
  36. """Raised by `.Queue.get_nowait` when the queue has no items."""
  37. pass
  38. class QueueFull(Exception):
  39. """Raised by `.Queue.put_nowait` when a queue is at its maximum size."""
  40. pass
  41. def _set_timeout(
  42. future: Future, timeout: Union[None, float, datetime.timedelta]
  43. ) -> None:
  44. if timeout:
  45. def on_timeout() -> None:
  46. if not future.done():
  47. future.set_exception(gen.TimeoutError())
  48. io_loop = ioloop.IOLoop.current()
  49. timeout_handle = io_loop.add_timeout(timeout, on_timeout)
  50. future.add_done_callback(lambda _: io_loop.remove_timeout(timeout_handle))
  51. class _QueueIterator(Generic[_T]):
  52. def __init__(self, q: "Queue[_T]") -> None:
  53. self.q = q
  54. def __anext__(self) -> Awaitable[_T]:
  55. return self.q.get()
  56. class Queue(Generic[_T]):
  57. """Coordinate producer and consumer coroutines.
  58. If maxsize is 0 (the default) the queue size is unbounded.
  59. .. testcode::
  60. from tornado import gen
  61. from tornado.ioloop import IOLoop
  62. from tornado.queues import Queue
  63. q = Queue(maxsize=2)
  64. async def consumer():
  65. async for item in q:
  66. try:
  67. print('Doing work on %s' % item)
  68. await gen.sleep(0.01)
  69. finally:
  70. q.task_done()
  71. async def producer():
  72. for item in range(5):
  73. await q.put(item)
  74. print('Put %s' % item)
  75. async def main():
  76. # Start consumer without waiting (since it never finishes).
  77. IOLoop.current().spawn_callback(consumer)
  78. await producer() # Wait for producer to put all tasks.
  79. await q.join() # Wait for consumer to finish all tasks.
  80. print('Done')
  81. IOLoop.current().run_sync(main)
  82. .. testoutput::
  83. Put 0
  84. Put 1
  85. Doing work on 0
  86. Put 2
  87. Doing work on 1
  88. Put 3
  89. Doing work on 2
  90. Put 4
  91. Doing work on 3
  92. Doing work on 4
  93. Done
  94. In versions of Python without native coroutines (before 3.5),
  95. ``consumer()`` could be written as::
  96. @gen.coroutine
  97. def consumer():
  98. while True:
  99. item = yield q.get()
  100. try:
  101. print('Doing work on %s' % item)
  102. yield gen.sleep(0.01)
  103. finally:
  104. q.task_done()
  105. .. versionchanged:: 4.3
  106. Added ``async for`` support in Python 3.5.
  107. """
  108. # Exact type depends on subclass. Could be another generic
  109. # parameter and use protocols to be more precise here.
  110. _queue = None # type: Any
  111. def __init__(self, maxsize: int = 0) -> None:
  112. if maxsize is None:
  113. raise TypeError("maxsize can't be None")
  114. if maxsize < 0:
  115. raise ValueError("maxsize can't be negative")
  116. self._maxsize = maxsize
  117. self._init()
  118. self._getters = collections.deque([]) # type: Deque[Future[_T]]
  119. self._putters = collections.deque([]) # type: Deque[Tuple[_T, Future[None]]]
  120. self._unfinished_tasks = 0
  121. self._finished = Event()
  122. self._finished.set()
  123. @property
  124. def maxsize(self) -> int:
  125. """Number of items allowed in the queue."""
  126. return self._maxsize
  127. def qsize(self) -> int:
  128. """Number of items in the queue."""
  129. return len(self._queue)
  130. def empty(self) -> bool:
  131. return not self._queue
  132. def full(self) -> bool:
  133. if self.maxsize == 0:
  134. return False
  135. else:
  136. return self.qsize() >= self.maxsize
  137. def put(
  138. self, item: _T, timeout: Union[float, datetime.timedelta] = None
  139. ) -> "Future[None]":
  140. """Put an item into the queue, perhaps waiting until there is room.
  141. Returns a Future, which raises `tornado.util.TimeoutError` after a
  142. timeout.
  143. ``timeout`` may be a number denoting a time (on the same
  144. scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
  145. `datetime.timedelta` object for a deadline relative to the
  146. current time.
  147. """
  148. future = Future() # type: Future[None]
  149. try:
  150. self.put_nowait(item)
  151. except QueueFull:
  152. self._putters.append((item, future))
  153. _set_timeout(future, timeout)
  154. else:
  155. future.set_result(None)
  156. return future
  157. def put_nowait(self, item: _T) -> None:
  158. """Put an item into the queue without blocking.
  159. If no free slot is immediately available, raise `QueueFull`.
  160. """
  161. self._consume_expired()
  162. if self._getters:
  163. assert self.empty(), "queue non-empty, why are getters waiting?"
  164. getter = self._getters.popleft()
  165. self.__put_internal(item)
  166. future_set_result_unless_cancelled(getter, self._get())
  167. elif self.full():
  168. raise QueueFull
  169. else:
  170. self.__put_internal(item)
  171. def get(self, timeout: Union[float, datetime.timedelta] = None) -> Awaitable[_T]:
  172. """Remove and return an item from the queue.
  173. Returns an awaitable which resolves once an item is available, or raises
  174. `tornado.util.TimeoutError` after a timeout.
  175. ``timeout`` may be a number denoting a time (on the same
  176. scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
  177. `datetime.timedelta` object for a deadline relative to the
  178. current time.
  179. .. note::
  180. The ``timeout`` argument of this method differs from that
  181. of the standard library's `queue.Queue.get`. That method
  182. interprets numeric values as relative timeouts; this one
  183. interprets them as absolute deadlines and requires
  184. ``timedelta`` objects for relative timeouts (consistent
  185. with other timeouts in Tornado).
  186. """
  187. future = Future() # type: Future[_T]
  188. try:
  189. future.set_result(self.get_nowait())
  190. except QueueEmpty:
  191. self._getters.append(future)
  192. _set_timeout(future, timeout)
  193. return future
  194. def get_nowait(self) -> _T:
  195. """Remove and return an item from the queue without blocking.
  196. Return an item if one is immediately available, else raise
  197. `QueueEmpty`.
  198. """
  199. self._consume_expired()
  200. if self._putters:
  201. assert self.full(), "queue not full, why are putters waiting?"
  202. item, putter = self._putters.popleft()
  203. self.__put_internal(item)
  204. future_set_result_unless_cancelled(putter, None)
  205. return self._get()
  206. elif self.qsize():
  207. return self._get()
  208. else:
  209. raise QueueEmpty
  210. def task_done(self) -> None:
  211. """Indicate that a formerly enqueued task is complete.
  212. Used by queue consumers. For each `.get` used to fetch a task, a
  213. subsequent call to `.task_done` tells the queue that the processing
  214. on the task is complete.
  215. If a `.join` is blocking, it resumes when all items have been
  216. processed; that is, when every `.put` is matched by a `.task_done`.
  217. Raises `ValueError` if called more times than `.put`.
  218. """
  219. if self._unfinished_tasks <= 0:
  220. raise ValueError("task_done() called too many times")
  221. self._unfinished_tasks -= 1
  222. if self._unfinished_tasks == 0:
  223. self._finished.set()
  224. def join(self, timeout: Union[float, datetime.timedelta] = None) -> Awaitable[None]:
  225. """Block until all items in the queue are processed.
  226. Returns an awaitable, which raises `tornado.util.TimeoutError` after a
  227. timeout.
  228. """
  229. return self._finished.wait(timeout)
  230. def __aiter__(self) -> _QueueIterator[_T]:
  231. return _QueueIterator(self)
  232. # These three are overridable in subclasses.
  233. def _init(self) -> None:
  234. self._queue = collections.deque()
  235. def _get(self) -> _T:
  236. return self._queue.popleft()
  237. def _put(self, item: _T) -> None:
  238. self._queue.append(item)
  239. # End of the overridable methods.
  240. def __put_internal(self, item: _T) -> None:
  241. self._unfinished_tasks += 1
  242. self._finished.clear()
  243. self._put(item)
  244. def _consume_expired(self) -> None:
  245. # Remove timed-out waiters.
  246. while self._putters and self._putters[0][1].done():
  247. self._putters.popleft()
  248. while self._getters and self._getters[0].done():
  249. self._getters.popleft()
  250. def __repr__(self) -> str:
  251. return "<%s at %s %s>" % (type(self).__name__, hex(id(self)), self._format())
  252. def __str__(self) -> str:
  253. return "<%s %s>" % (type(self).__name__, self._format())
  254. def _format(self) -> str:
  255. result = "maxsize=%r" % (self.maxsize,)
  256. if getattr(self, "_queue", None):
  257. result += " queue=%r" % self._queue
  258. if self._getters:
  259. result += " getters[%s]" % len(self._getters)
  260. if self._putters:
  261. result += " putters[%s]" % len(self._putters)
  262. if self._unfinished_tasks:
  263. result += " tasks=%s" % self._unfinished_tasks
  264. return result
  265. class PriorityQueue(Queue):
  266. """A `.Queue` that retrieves entries in priority order, lowest first.
  267. Entries are typically tuples like ``(priority number, data)``.
  268. .. testcode::
  269. from tornado.queues import PriorityQueue
  270. q = PriorityQueue()
  271. q.put((1, 'medium-priority item'))
  272. q.put((0, 'high-priority item'))
  273. q.put((10, 'low-priority item'))
  274. print(q.get_nowait())
  275. print(q.get_nowait())
  276. print(q.get_nowait())
  277. .. testoutput::
  278. (0, 'high-priority item')
  279. (1, 'medium-priority item')
  280. (10, 'low-priority item')
  281. """
  282. def _init(self) -> None:
  283. self._queue = []
  284. def _put(self, item: _T) -> None:
  285. heapq.heappush(self._queue, item)
  286. def _get(self) -> _T:
  287. return heapq.heappop(self._queue)
  288. class LifoQueue(Queue):
  289. """A `.Queue` that retrieves the most recently put items first.
  290. .. testcode::
  291. from tornado.queues import LifoQueue
  292. q = LifoQueue()
  293. q.put(3)
  294. q.put(2)
  295. q.put(1)
  296. print(q.get_nowait())
  297. print(q.get_nowait())
  298. print(q.get_nowait())
  299. .. testoutput::
  300. 1
  301. 2
  302. 3
  303. """
  304. def _init(self) -> None:
  305. self._queue = []
  306. def _put(self, item: _T) -> None:
  307. self._queue.append(item)
  308. def _get(self) -> _T:
  309. return self._queue.pop()