
//! An asynchronous `Mutex`-like type. //! //! This module provides [`Lock`], a type that acts similarly to an asynchronous `Mutex`, with one //! major difference: the [`LockGuard`] returned by `poll_lock` is not tied to the lifetime of the //! `Mutex`. This enables you to acquire a lock, and then pass that guard into a future, and then //! release it at some later point in time. //! //! This allows you to do something along the lines of: //! //! ```rust,no_run //! # #[macro_use] //! # extern crate futures; //! # extern crate tokio; //! # use futures::{future, Poll, Async, Future, Stream}; //! use tokio::sync::lock::{Lock, LockGuard}; //! struct MyType<S> { //! lock: Lock<S>, //! } //! //! impl<S> Future for MyType<S> //! where S: Stream<Item = u32> + Send + 'static //! { //! type Item = (); //! type Error = (); //! //! fn poll(&mut self) -> Poll<Self::Item, Self::Error> { //! match self.lock.poll_lock() { //! Async::Ready(mut guard) => { //! tokio::spawn(future::poll_fn(move || { //! let item = try_ready!(guard.poll().map_err(|_| ())); //! println!("item = {:?}", item); //! Ok(().into()) //! })); //! Ok(().into()) //! }, //! Async::NotReady => Ok(Async::NotReady) //! } //! } //! } //! # fn main() {} //! ``` //! //! [`Lock`]: struct.Lock.html //! [`LockGuard`]: struct.LockGuard.html use futures::Async; use semaphore; use std::cell::UnsafeCell; use std::fmt; use std::ops::{Deref, DerefMut}; use std::sync::Arc; /// An asynchronous mutual exclusion primitive useful for protecting shared data /// /// Each mutex has a type parameter (`T`) which represents the data that it is protecting. The data /// can only be accessed through the RAII guards returned from `poll_lock`, which guarantees that /// the data is only ever accessed when the mutex is locked. #[derive(Debug)] pub struct Lock<T> { inner: Arc<State<T>>, permit: semaphore::Permit, } /// A handle to a held `Lock`. /// /// As long as you have this guard, you have exclusive access to the underlying `T`. The guard /// internally keeps a reference-couned pointer to the original `Lock`, so even if the lock goes /// away, the guard remains valid. /// /// The lock is automatically released whenever the guard is dropped, at which point `poll_lock` /// will succeed yet again. #[derive(Debug)] pub struct LockGuard<T>(Lock<T>); // As long as T: Send, it's fine to send and share Lock<T> between threads. // If T was not Send, sending and sharing a Lock<T> would be bad, since you can access T through // Lock<T>. unsafe impl<T> Send for Lock<T> where T: Send {} unsafe impl<T> Sync for Lock<T> where T: Send {} unsafe impl<T> Sync for LockGuard<T> where T: Send + Sync {} #[derive(Debug)] struct State<T> { c: UnsafeCell<T>, s: semaphore::Semaphore, } #[test] fn bounds() { fn check<T: Send>() {} check::<LockGuard<u32>>(); } impl<T> Lock<T> { /// Creates a new lock in an unlocked state ready for use. pub fn new(t: T) -> Self { Self { inner: Arc::new(State { c: UnsafeCell::new(t), s: semaphore::Semaphore::new(1), }), permit: semaphore::Permit::new(), } } /// Try to acquire the lock. /// /// If the lock is already held, the current task is notified when it is released. pub fn poll_lock(&mut self) -> Async<LockGuard<T>> { if let Async::NotReady = self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() }) { return Async::NotReady; } // We want to move the acquired permit into the guard, // and leave an unacquired one in self. let acquired = Self { inner: self.inner.clone(), permit: ::std::mem::replace(&mut self.permit, semaphore::Permit::new()), }; Async::Ready(LockGuard(acquired)) } } impl<T> Drop for LockGuard<T> { fn drop(&mut self) { if self.0.permit.is_acquired() { self.0.permit.release(&self.0.inner.s); } else if ::std::thread::panicking() { // A guard _should_ always hold its permit, but if the thread is already panicking, // we don't want to generate a panic-while-panicing, since that's just unhelpful! } else { unreachable!("Permit not held when LockGuard was dropped") } } } impl<T> From<T> for Lock<T> { fn from(s: T) -> Self { Self::new(s) } } impl<T> Clone for Lock<T> { fn clone(&self) -> Self { Self { inner: self.inner.clone(), permit: semaphore::Permit::new(), } } } impl<T> Default for Lock<T> where T: Default, { fn default() -> Self { Self::new(T::default()) } } impl<T> Deref for LockGuard<T> { type Target = T; fn deref(&self) -> &Self::Target { assert!(self.0.permit.is_acquired()); unsafe { &*self.0.inner.c.get() } } } impl<T> DerefMut for LockGuard<T> { fn deref_mut(&mut self) -> &mut Self::Target { assert!(self.0.permit.is_acquired()); unsafe { &mut *self.0.inner.c.get() } } } impl<T: fmt::Display> fmt::Display for LockGuard<T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fmt::Display::fmt(&**self, f) } }