locks.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. # Copyright 2015 The Tornado Authors
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. # not use this file except in compliance with the License. You may obtain
  5. # a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. # License for the specific language governing permissions and limitations
  13. # under the License.
  14. import collections
  15. from concurrent.futures import CancelledError
  16. import datetime
  17. import types
  18. from tornado import gen, ioloop
  19. from tornado.concurrent import Future, future_set_result_unless_cancelled
  20. from typing import Union, Optional, Type, Any, Awaitable
  21. import typing
  22. if typing.TYPE_CHECKING:
  23. from typing import Deque, Set # noqa: F401
  24. __all__ = ["Condition", "Event", "Semaphore", "BoundedSemaphore", "Lock"]
  25. class _TimeoutGarbageCollector(object):
  26. """Base class for objects that periodically clean up timed-out waiters.
  27. Avoids memory leak in a common pattern like:
  28. while True:
  29. yield condition.wait(short_timeout)
  30. print('looping....')
  31. """
  32. def __init__(self) -> None:
  33. self._waiters = collections.deque() # type: Deque[Future]
  34. self._timeouts = 0
  35. def _garbage_collect(self) -> None:
  36. # Occasionally clear timed-out waiters.
  37. self._timeouts += 1
  38. if self._timeouts > 100:
  39. self._timeouts = 0
  40. self._waiters = collections.deque(w for w in self._waiters if not w.done())
  41. class Condition(_TimeoutGarbageCollector):
  42. """A condition allows one or more coroutines to wait until notified.
  43. Like a standard `threading.Condition`, but does not need an underlying lock
  44. that is acquired and released.
  45. With a `Condition`, coroutines can wait to be notified by other coroutines:
  46. .. testcode::
  47. from tornado import gen
  48. from tornado.ioloop import IOLoop
  49. from tornado.locks import Condition
  50. condition = Condition()
  51. async def waiter():
  52. print("I'll wait right here")
  53. await condition.wait()
  54. print("I'm done waiting")
  55. async def notifier():
  56. print("About to notify")
  57. condition.notify()
  58. print("Done notifying")
  59. async def runner():
  60. # Wait for waiter() and notifier() in parallel
  61. await gen.multi([waiter(), notifier()])
  62. IOLoop.current().run_sync(runner)
  63. .. testoutput::
  64. I'll wait right here
  65. About to notify
  66. Done notifying
  67. I'm done waiting
  68. `wait` takes an optional ``timeout`` argument, which is either an absolute
  69. timestamp::
  70. io_loop = IOLoop.current()
  71. # Wait up to 1 second for a notification.
  72. await condition.wait(timeout=io_loop.time() + 1)
  73. ...or a `datetime.timedelta` for a timeout relative to the current time::
  74. # Wait up to 1 second.
  75. await condition.wait(timeout=datetime.timedelta(seconds=1))
  76. The method returns False if there's no notification before the deadline.
  77. .. versionchanged:: 5.0
  78. Previously, waiters could be notified synchronously from within
  79. `notify`. Now, the notification will always be received on the
  80. next iteration of the `.IOLoop`.
  81. """
  82. def __init__(self) -> None:
  83. super(Condition, self).__init__()
  84. self.io_loop = ioloop.IOLoop.current()
  85. def __repr__(self) -> str:
  86. result = "<%s" % (self.__class__.__name__,)
  87. if self._waiters:
  88. result += " waiters[%s]" % len(self._waiters)
  89. return result + ">"
  90. def wait(self, timeout: Union[float, datetime.timedelta] = None) -> Awaitable[bool]:
  91. """Wait for `.notify`.
  92. Returns a `.Future` that resolves ``True`` if the condition is notified,
  93. or ``False`` after a timeout.
  94. """
  95. waiter = Future() # type: Future[bool]
  96. self._waiters.append(waiter)
  97. if timeout:
  98. def on_timeout() -> None:
  99. if not waiter.done():
  100. future_set_result_unless_cancelled(waiter, False)
  101. self._garbage_collect()
  102. io_loop = ioloop.IOLoop.current()
  103. timeout_handle = io_loop.add_timeout(timeout, on_timeout)
  104. waiter.add_done_callback(lambda _: io_loop.remove_timeout(timeout_handle))
  105. return waiter
  106. def notify(self, n: int = 1) -> None:
  107. """Wake ``n`` waiters."""
  108. waiters = [] # Waiters we plan to run right now.
  109. while n and self._waiters:
  110. waiter = self._waiters.popleft()
  111. if not waiter.done(): # Might have timed out.
  112. n -= 1
  113. waiters.append(waiter)
  114. for waiter in waiters:
  115. future_set_result_unless_cancelled(waiter, True)
  116. def notify_all(self) -> None:
  117. """Wake all waiters."""
  118. self.notify(len(self._waiters))
  119. class Event(object):
  120. """An event blocks coroutines until its internal flag is set to True.
  121. Similar to `threading.Event`.
  122. A coroutine can wait for an event to be set. Once it is set, calls to
  123. ``yield event.wait()`` will not block unless the event has been cleared:
  124. .. testcode::
  125. from tornado import gen
  126. from tornado.ioloop import IOLoop
  127. from tornado.locks import Event
  128. event = Event()
  129. async def waiter():
  130. print("Waiting for event")
  131. await event.wait()
  132. print("Not waiting this time")
  133. await event.wait()
  134. print("Done")
  135. async def setter():
  136. print("About to set the event")
  137. event.set()
  138. async def runner():
  139. await gen.multi([waiter(), setter()])
  140. IOLoop.current().run_sync(runner)
  141. .. testoutput::
  142. Waiting for event
  143. About to set the event
  144. Not waiting this time
  145. Done
  146. """
  147. def __init__(self) -> None:
  148. self._value = False
  149. self._waiters = set() # type: Set[Future[None]]
  150. def __repr__(self) -> str:
  151. return "<%s %s>" % (
  152. self.__class__.__name__,
  153. "set" if self.is_set() else "clear",
  154. )
  155. def is_set(self) -> bool:
  156. """Return ``True`` if the internal flag is true."""
  157. return self._value
  158. def set(self) -> None:
  159. """Set the internal flag to ``True``. All waiters are awakened.
  160. Calling `.wait` once the flag is set will not block.
  161. """
  162. if not self._value:
  163. self._value = True
  164. for fut in self._waiters:
  165. if not fut.done():
  166. fut.set_result(None)
  167. def clear(self) -> None:
  168. """Reset the internal flag to ``False``.
  169. Calls to `.wait` will block until `.set` is called.
  170. """
  171. self._value = False
  172. def wait(self, timeout: Union[float, datetime.timedelta] = None) -> Awaitable[None]:
  173. """Block until the internal flag is true.
  174. Returns an awaitable, which raises `tornado.util.TimeoutError` after a
  175. timeout.
  176. """
  177. fut = Future() # type: Future[None]
  178. if self._value:
  179. fut.set_result(None)
  180. return fut
  181. self._waiters.add(fut)
  182. fut.add_done_callback(lambda fut: self._waiters.remove(fut))
  183. if timeout is None:
  184. return fut
  185. else:
  186. timeout_fut = gen.with_timeout(
  187. timeout, fut, quiet_exceptions=(CancelledError,)
  188. )
  189. # This is a slightly clumsy workaround for the fact that
  190. # gen.with_timeout doesn't cancel its futures. Cancelling
  191. # fut will remove it from the waiters list.
  192. timeout_fut.add_done_callback(
  193. lambda tf: fut.cancel() if not fut.done() else None
  194. )
  195. return timeout_fut
  196. class _ReleasingContextManager(object):
  197. """Releases a Lock or Semaphore at the end of a "with" statement.
  198. with (yield semaphore.acquire()):
  199. pass
  200. # Now semaphore.release() has been called.
  201. """
  202. def __init__(self, obj: Any) -> None:
  203. self._obj = obj
  204. def __enter__(self) -> None:
  205. pass
  206. def __exit__(
  207. self,
  208. exc_type: "Optional[Type[BaseException]]",
  209. exc_val: Optional[BaseException],
  210. exc_tb: Optional[types.TracebackType],
  211. ) -> None:
  212. self._obj.release()
  213. class Semaphore(_TimeoutGarbageCollector):
  214. """A lock that can be acquired a fixed number of times before blocking.
  215. A Semaphore manages a counter representing the number of `.release` calls
  216. minus the number of `.acquire` calls, plus an initial value. The `.acquire`
  217. method blocks if necessary until it can return without making the counter
  218. negative.
  219. Semaphores limit access to a shared resource. To allow access for two
  220. workers at a time:
  221. .. testsetup:: semaphore
  222. from collections import deque
  223. from tornado import gen
  224. from tornado.ioloop import IOLoop
  225. from tornado.concurrent import Future
  226. # Ensure reliable doctest output: resolve Futures one at a time.
  227. futures_q = deque([Future() for _ in range(3)])
  228. async def simulator(futures):
  229. for f in futures:
  230. # simulate the asynchronous passage of time
  231. await gen.sleep(0)
  232. await gen.sleep(0)
  233. f.set_result(None)
  234. IOLoop.current().add_callback(simulator, list(futures_q))
  235. def use_some_resource():
  236. return futures_q.popleft()
  237. .. testcode:: semaphore
  238. from tornado import gen
  239. from tornado.ioloop import IOLoop
  240. from tornado.locks import Semaphore
  241. sem = Semaphore(2)
  242. async def worker(worker_id):
  243. await sem.acquire()
  244. try:
  245. print("Worker %d is working" % worker_id)
  246. await use_some_resource()
  247. finally:
  248. print("Worker %d is done" % worker_id)
  249. sem.release()
  250. async def runner():
  251. # Join all workers.
  252. await gen.multi([worker(i) for i in range(3)])
  253. IOLoop.current().run_sync(runner)
  254. .. testoutput:: semaphore
  255. Worker 0 is working
  256. Worker 1 is working
  257. Worker 0 is done
  258. Worker 2 is working
  259. Worker 1 is done
  260. Worker 2 is done
  261. Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
  262. the semaphore has been released once, by worker 0.
  263. The semaphore can be used as an async context manager::
  264. async def worker(worker_id):
  265. async with sem:
  266. print("Worker %d is working" % worker_id)
  267. await use_some_resource()
  268. # Now the semaphore has been released.
  269. print("Worker %d is done" % worker_id)
  270. For compatibility with older versions of Python, `.acquire` is a
  271. context manager, so ``worker`` could also be written as::
  272. @gen.coroutine
  273. def worker(worker_id):
  274. with (yield sem.acquire()):
  275. print("Worker %d is working" % worker_id)
  276. yield use_some_resource()
  277. # Now the semaphore has been released.
  278. print("Worker %d is done" % worker_id)
  279. .. versionchanged:: 4.3
  280. Added ``async with`` support in Python 3.5.
  281. """
  282. def __init__(self, value: int = 1) -> None:
  283. super(Semaphore, self).__init__()
  284. if value < 0:
  285. raise ValueError("semaphore initial value must be >= 0")
  286. self._value = value
  287. def __repr__(self) -> str:
  288. res = super(Semaphore, self).__repr__()
  289. extra = (
  290. "locked" if self._value == 0 else "unlocked,value:{0}".format(self._value)
  291. )
  292. if self._waiters:
  293. extra = "{0},waiters:{1}".format(extra, len(self._waiters))
  294. return "<{0} [{1}]>".format(res[1:-1], extra)
  295. def release(self) -> None:
  296. """Increment the counter and wake one waiter."""
  297. self._value += 1
  298. while self._waiters:
  299. waiter = self._waiters.popleft()
  300. if not waiter.done():
  301. self._value -= 1
  302. # If the waiter is a coroutine paused at
  303. #
  304. # with (yield semaphore.acquire()):
  305. #
  306. # then the context manager's __exit__ calls release() at the end
  307. # of the "with" block.
  308. waiter.set_result(_ReleasingContextManager(self))
  309. break
  310. def acquire(
  311. self, timeout: Union[float, datetime.timedelta] = None
  312. ) -> Awaitable[_ReleasingContextManager]:
  313. """Decrement the counter. Returns an awaitable.
  314. Block if the counter is zero and wait for a `.release`. The awaitable
  315. raises `.TimeoutError` after the deadline.
  316. """
  317. waiter = Future() # type: Future[_ReleasingContextManager]
  318. if self._value > 0:
  319. self._value -= 1
  320. waiter.set_result(_ReleasingContextManager(self))
  321. else:
  322. self._waiters.append(waiter)
  323. if timeout:
  324. def on_timeout() -> None:
  325. if not waiter.done():
  326. waiter.set_exception(gen.TimeoutError())
  327. self._garbage_collect()
  328. io_loop = ioloop.IOLoop.current()
  329. timeout_handle = io_loop.add_timeout(timeout, on_timeout)
  330. waiter.add_done_callback(
  331. lambda _: io_loop.remove_timeout(timeout_handle)
  332. )
  333. return waiter
  334. def __enter__(self) -> None:
  335. raise RuntimeError("Use 'async with' instead of 'with' for Semaphore")
  336. def __exit__(
  337. self,
  338. typ: "Optional[Type[BaseException]]",
  339. value: Optional[BaseException],
  340. traceback: Optional[types.TracebackType],
  341. ) -> None:
  342. self.__enter__()
  343. async def __aenter__(self) -> None:
  344. await self.acquire()
  345. async def __aexit__(
  346. self,
  347. typ: "Optional[Type[BaseException]]",
  348. value: Optional[BaseException],
  349. tb: Optional[types.TracebackType],
  350. ) -> None:
  351. self.release()
  352. class BoundedSemaphore(Semaphore):
  353. """A semaphore that prevents release() being called too many times.
  354. If `.release` would increment the semaphore's value past the initial
  355. value, it raises `ValueError`. Semaphores are mostly used to guard
  356. resources with limited capacity, so a semaphore released too many times
  357. is a sign of a bug.
  358. """
  359. def __init__(self, value: int = 1) -> None:
  360. super(BoundedSemaphore, self).__init__(value=value)
  361. self._initial_value = value
  362. def release(self) -> None:
  363. """Increment the counter and wake one waiter."""
  364. if self._value >= self._initial_value:
  365. raise ValueError("Semaphore released too many times")
  366. super(BoundedSemaphore, self).release()
  367. class Lock(object):
  368. """A lock for coroutines.
  369. A Lock begins unlocked, and `acquire` locks it immediately. While it is
  370. locked, a coroutine that yields `acquire` waits until another coroutine
  371. calls `release`.
  372. Releasing an unlocked lock raises `RuntimeError`.
  373. A Lock can be used as an async context manager with the ``async
  374. with`` statement:
  375. >>> from tornado import locks
  376. >>> lock = locks.Lock()
  377. >>>
  378. >>> async def f():
  379. ... async with lock:
  380. ... # Do something holding the lock.
  381. ... pass
  382. ...
  383. ... # Now the lock is released.
  384. For compatibility with older versions of Python, the `.acquire`
  385. method asynchronously returns a regular context manager:
  386. >>> async def f2():
  387. ... with (yield lock.acquire()):
  388. ... # Do something holding the lock.
  389. ... pass
  390. ...
  391. ... # Now the lock is released.
  392. .. versionchanged:: 4.3
  393. Added ``async with`` support in Python 3.5.
  394. """
  395. def __init__(self) -> None:
  396. self._block = BoundedSemaphore(value=1)
  397. def __repr__(self) -> str:
  398. return "<%s _block=%s>" % (self.__class__.__name__, self._block)
  399. def acquire(
  400. self, timeout: Union[float, datetime.timedelta] = None
  401. ) -> Awaitable[_ReleasingContextManager]:
  402. """Attempt to lock. Returns an awaitable.
  403. Returns an awaitable, which raises `tornado.util.TimeoutError` after a
  404. timeout.
  405. """
  406. return self._block.acquire(timeout)
  407. def release(self) -> None:
  408. """Unlock.
  409. The first coroutine in line waiting for `acquire` gets the lock.
  410. If not locked, raise a `RuntimeError`.
  411. """
  412. try:
  413. self._block.release()
  414. except ValueError:
  415. raise RuntimeError("release unlocked lock")
  416. def __enter__(self) -> None:
  417. raise RuntimeError("Use `async with` instead of `with` for Lock")
  418. def __exit__(
  419. self,
  420. typ: "Optional[Type[BaseException]]",
  421. value: Optional[BaseException],
  422. tb: Optional[types.TracebackType],
  423. ) -> None:
  424. self.__enter__()
  425. async def __aenter__(self) -> None:
  426. await self.acquire()
  427. async def __aexit__(
  428. self,
  429. typ: "Optional[Type[BaseException]]",
  430. value: Optional[BaseException],
  431. tb: Optional[types.TracebackType],
  432. ) -> None:
  433. self.release()