Files
adler32
backtrace
backtrace_sys
base64
bigtable
bitflags
byteorder
bytes
cfg_if
cookie
cookie_store
crc32fast
crossbeam_deque
crossbeam_epoch
crossbeam_queue
crossbeam_utils
curl
curl_sys
dtoa
either
encoding_rs
error_chain
failure
failure_derive
flate2
fnv
foreign_types
foreign_types_shared
futures
futures_cpupool
goauth
h2
http
http_body
httparse
hyper
hyper_tls
idna
indexmap
iovec
itoa
lazy_static
libc
libz_sys
lock_api
log
matches
maybe_uninit
memoffset
mime
mime_guess
miniz_oxide
mio
native_tls
net2
num_cpus
num_traits
openssl
openssl_probe
openssl_sys
parking_lot
parking_lot_core
percent_encoding
proc_macro2
protobuf
protobuf_json
publicsuffix
quote
rand
rand_chacha
rand_core
rand_hc
rand_isaac
rand_jitter
rand_os
rand_pcg
rand_xorshift
regex
regex_syntax
reqwest
rustc_demangle
rustc_serialize
ryu
scopeguard
serde
serde_codegen_internals
serde_derive
serde_json
serde_urlencoded
slab
smallvec
smpl_jwt
socket2
string
syn
synom
synstructure
time
tokio
tokio_buf
tokio_current_thread
tokio_executor
tokio_io
tokio_reactor
tokio_sync
tokio_tcp
tokio_threadpool
tokio_timer
try_from
try_lock
unicase
unicode_bidi
unicode_normalization
unicode_xid
url
uuid
want
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
use super::{BlockingError, BlockingImpl};
use futures::Poll;
use std::cell::Cell;
use std::fmt;
use std::marker::PhantomData;
use tokio_executor::Enter;

thread_local! {
    static CURRENT: Cell<BlockingImpl> = Cell::new(super::default_blocking);
}

/// Ensures that the executor is removed from the thread-local context
/// when leaving the scope. This handles cases that involve panicking.
///
/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
/// backwards-compatibility layer. In general, user code should not override the
/// blocking implementation. If you use this, make sure you know what you're
/// doing.
pub struct DefaultGuard<'a> {
    prior: BlockingImpl,
    _lifetime: PhantomData<&'a ()>,
}

/// Set the default blocking implementation, returning a guard that resets the
/// blocking implementation when dropped.
///
/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
/// backwards-compatibility layer. In general, user code should not override the
/// blocking implementation. If you use this, make sure you know what you're
/// doing.
pub fn set_default<'a>(blocking: BlockingImpl) -> DefaultGuard<'a> {
    CURRENT.with(|cell| {
        let prior = cell.replace(blocking);
        DefaultGuard {
            prior,
            _lifetime: PhantomData,
        }
    })
}

/// Set the default blocking implementation for the duration of the closure.
///
/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
/// backwards-compatibility layer. In general, user code should not override the
/// blocking implementation. If you use this, make sure you know what you're
/// doing.
pub fn with_default<F, R>(blocking: BlockingImpl, enter: &mut Enter, f: F) -> R
where
    F: FnOnce(&mut Enter) -> R,
{
    let _guard = set_default(blocking);
    f(enter)
}

/// Enter a blocking section of code.
///
/// The `blocking` function annotates a section of code that performs a blocking
/// operation, either by issuing a blocking syscall or by performing a long
/// running CPU-bound computation.
///
/// When the `blocking` function enters, it hands off the responsibility of
/// processing the current work queue to another thread. Then, it calls the
/// supplied closure. The closure is permitted to block indefinitely.
///
/// If the maximum number of concurrent `blocking` calls has been reached, then
/// `NotReady` is returned and the task is notified once existing `blocking`
/// calls complete. The maximum value is specified when creating a thread pool
/// using [`Builder::max_blocking`][build]
///
/// NB: The entire task that called `blocking` is blocked whenever the supplied
/// closure blocks, even if you have used future combinators such as `select` -
/// the other futures in this task will not make progress until the closure
/// returns.
/// If this is not desired, ensure that `blocking` runs in its own task (e.g.
/// using `futures::sync::oneshot::spawn`).
///
/// [build]: struct.Builder.html#method.max_blocking
///
/// # Return
///
/// When the blocking closure is executed, `Ok(Ready(T))` is returned, where
/// `T` is the closure's return value.
///
/// If the thread pool has shutdown, `Err` is returned.
///
/// If the number of concurrent `blocking` calls has reached the maximum,
/// `Ok(NotReady)` is returned and the current task is notified when a call to
/// `blocking` will succeed.
///
/// If `blocking` is called from outside the context of a Tokio thread pool,
/// `Err` is returned.
///
/// # Background
///
/// By default, the Tokio thread pool expects that tasks will only run for short
/// periods at a time before yielding back to the thread pool. This is the basic
/// premise of cooperative multitasking.
///
/// However, it is common to want to perform a blocking operation while
/// processing an asynchronous computation. Examples of blocking operation
/// include:
///
/// * Performing synchronous file operations (reading and writing).
/// * Blocking on acquiring a mutex.
/// * Performing a CPU bound computation, like cryptographic encryption or
///   decryption.
///
/// One option for dealing with blocking operations in an asynchronous context
/// is to use a thread pool dedicated to performing these operations. This not
/// ideal as it requires bidirectional message passing as well as a channel to
/// communicate which adds a level of buffering.
///
/// Instead, `blocking` hands off the responsibility of processing the work queue
/// to another thread. This hand off is light compared to a channel and does not
/// require buffering.
///
/// # Examples
///
/// Block on receiving a message from a `std` channel. This example is a little
/// silly as using the non-blocking channel from the `futures` crate would make
/// more sense. The blocking receive can be replaced with any blocking operation
/// that needs to be performed.
///
/// ```rust
/// # extern crate futures;
/// # extern crate tokio_threadpool;
///
/// use tokio_threadpool::{ThreadPool, blocking};
///
/// use futures::Future;
/// use futures::future::{lazy, poll_fn};
///
/// use std::sync::mpsc;
/// use std::thread;
/// use std::time::Duration;
///
/// pub fn main() {
///     // This is a *blocking* channel
///     let (tx, rx) = mpsc::channel();
///
///     // Spawn a thread to send a message
///     thread::spawn(move || {
///         thread::sleep(Duration::from_millis(500));
///         tx.send("hello").unwrap();
///     });
///
///     let pool = ThreadPool::new();
///
///     pool.spawn(lazy(move || {
///         // Because `blocking` returns `Poll`, it is intended to be used
///         // from the context of a `Future` implementation. Since we don't
///         // have a complicated requirement, we can use `poll_fn` in this
///         // case.
///         poll_fn(move || {
///             blocking(|| {
///                 let msg = rx.recv().unwrap();
///                 println!("message = {}", msg);
///             }).map_err(|_| panic!("the threadpool shut down"))
///         })
///     }));
///
///     // Wait for the task we just spawned to complete.
///     pool.shutdown_on_idle().wait().unwrap();
/// }
/// ```
pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError>
where
    F: FnOnce() -> T,
{
    CURRENT.with(|cell| {
        let blocking = cell.get();

        // Object-safety workaround: the `Blocking` trait must be object-safe,
        // since we use a trait object in the thread-local. However, a blocking
        // _operation_ will be generic over the return type of the blocking
        // function. Therefore, rather than passing a function with a return
        // type to `Blocking::run_blocking`, we pass a _new_ closure which
        // doesn't have a return value. That closure invokes the blocking
        // function and assigns its value to `ret`, which we then unpack when
        // the blocking call finishes.
        let mut f = Some(f);
        let mut ret = None;
        {
            let ret2 = &mut ret;
            let mut run = move || {
                let f = f
                    .take()
                    .expect("blocking closure invoked twice; this is a bug!");
                *ret2 = Some((f)());
            };

            try_ready!((blocking)(&mut run));
        }

        // Return the result
        let ret =
            ret.expect("blocking function finished, but return value was unset; this is a bug!");
        Ok(ret.into())
    })
}

// === impl DefaultGuard ===

impl<'a> fmt::Debug for DefaultGuard<'a> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.pad("DefaultGuard { .. }")
    }
}

impl<'a> Drop for DefaultGuard<'a> {
    fn drop(&mut self) {
        // if the TLS value has already been torn down, there's nothing else we
        // can do. we're almost certainly panicking anyway.
        let _ = CURRENT.try_with(|cell| {
            cell.set(self.prior);
        });
    }
}