tracing_appender/
non_blocking.rs

1//! A non-blocking, off-thread writer.
2//!
3//! This spawns a dedicated worker thread which is responsible for writing log
4//! lines to the provided writer. When a line is written using the returned
5//! `NonBlocking` struct's `make_writer` method, it will be enqueued to be
6//! written by the worker thread.
7//!
8//! The queue has a fixed capacity, and if it becomes full, any logs written
9//! to it will be dropped until capacity is once again available. This may
10//! occur if logs are consistently produced faster than the worker thread can
11//! output them. The queue capacity and behavior when full (i.e., whether to
12//! drop logs or to exert backpressure to slow down senders) can be configured
13//! using [`NonBlockingBuilder::default()`][builder].
14//! This function returns the default configuration. It is equivalent to:
15//!
16//! ```rust
17//! # use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
18//! # fn doc() -> (NonBlocking, WorkerGuard) {
19//! tracing_appender::non_blocking(std::io::stdout())
20//! # }
21//! ```
22//! [builder]: NonBlockingBuilder::default
23//!
24//! <br/> This function returns a tuple of `NonBlocking` and `WorkerGuard`.
25//! `NonBlocking` implements [`MakeWriter`] which integrates with `tracing_subscriber`.
26//! `WorkerGuard` is a drop guard that is responsible for flushing any remaining logs when
27//! the program terminates.
28//!
29//! Note that the `WorkerGuard` returned by `non_blocking` _must_ be assigned to a binding that
30//! is not `_`, as `_` will result in the `WorkerGuard` being dropped immediately.
31//! Unintentional drops of `WorkerGuard` remove the guarantee that logs will be flushed
32//! during a program's termination, in a panic or otherwise.
33//!
34//! See [`WorkerGuard`][worker_guard] for examples of using the guard.
35//!
36//! [worker_guard]: WorkerGuard
37//!
38//! # Examples
39//!
40//! ``` rust
41//! # fn docs() {
42//! let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());
43//! let subscriber = tracing_subscriber::fmt().with_writer(non_blocking);
44//! tracing::subscriber::with_default(subscriber.finish(), || {
45//!    tracing::event!(tracing::Level::INFO, "Hello");
46//! });
47//! # }
48//! ```
49use crate::worker::Worker;
50use crate::Msg;
51use crossbeam_channel::{bounded, SendTimeoutError, Sender};
52use std::io;
53use std::io::Write;
54use std::sync::atomic::AtomicUsize;
55use std::sync::atomic::Ordering;
56use std::sync::Arc;
57use std::thread::JoinHandle;
58use std::time::Duration;
59use tracing_subscriber::fmt::MakeWriter;
60
61/// The default maximum number of buffered log lines.
62///
63/// If [`NonBlocking`][non-blocking] is lossy, it will drop spans/events at capacity.
64/// If [`NonBlocking`][non-blocking] is _not_ lossy,
65/// backpressure will be exerted on senders, causing them to block their
66/// respective threads until there is available capacity.
67///
68/// [non-blocking]: NonBlocking
69/// Recommended to be a power of 2.
70pub const DEFAULT_BUFFERED_LINES_LIMIT: usize = 128_000;
71
72/// A guard that flushes spans/events associated to a [`NonBlocking`] on a drop
73///
74/// Writing to a [`NonBlocking`] writer will **not** immediately write a span or event to the underlying
75/// output. Instead, the span or event will be written by a dedicated logging thread at some later point.
76/// To increase throughput, the non-blocking writer will flush to the underlying output on
77/// a periodic basis rather than every time a span or event is written. This means that if the program
78/// terminates abruptly (such as through an uncaught `panic` or a `std::process::exit`), some spans
79/// or events may not be written.
80///
81/// Since spans/events and events recorded near a crash are often necessary for diagnosing the failure,
82/// `WorkerGuard` provides a mechanism to ensure that _all_ buffered logs are flushed to their output.
83/// `WorkerGuard` should be assigned in the `main` function or whatever the entrypoint of the program is.
84/// This will ensure that the guard will be dropped during an unwinding or when `main` exits
85/// successfully.
86///
87/// # Examples
88///
89/// ``` rust
90/// # #[clippy::allow(needless_doctest_main)]
91/// fn main () {
92/// # fn doc() {
93///     let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());
94///     let subscriber = tracing_subscriber::fmt().with_writer(non_blocking);
95///     tracing::subscriber::with_default(subscriber.finish(), || {
96///         // Emit some tracing events within context of the non_blocking `_guard` and tracing subscriber
97///         tracing::event!(tracing::Level::INFO, "Hello");
98///     });
99///     // Exiting the context of `main` will drop the `_guard` and any remaining logs should get flushed
100/// # }
101/// }
102/// ```
103#[must_use]
104#[derive(Debug)]
105pub struct WorkerGuard {
106    _guard: Option<JoinHandle<()>>,
107    sender: Sender<Msg>,
108    shutdown: Sender<()>,
109}
110
111/// A non-blocking writer.
112///
113/// While the line between "blocking" and "non-blocking" IO is fuzzy, writing to a file is typically
114/// considered to be a _blocking_ operation. For an application whose `Subscriber` writes spans and events
115/// as they are emitted, an application might find the latency profile to be unacceptable.
116/// `NonBlocking` moves the writing out of an application's data path by sending spans and events
117/// to a dedicated logging thread.
118///
119/// This struct implements [`MakeWriter`][make_writer] from the `tracing-subscriber`
120/// crate. Therefore, it can be used with the [`tracing_subscriber::fmt`][fmt] module
121/// or with any other subscriber/layer implementation that uses the `MakeWriter` trait.
122///
123/// [make_writer]: tracing_subscriber::fmt::MakeWriter
124/// [fmt]: mod@tracing_subscriber::fmt
125#[derive(Clone, Debug)]
126pub struct NonBlocking {
127    error_counter: ErrorCounter,
128    channel: Sender<Msg>,
129    is_lossy: bool,
130}
131
132/// Tracks the number of times a log line was dropped by the background thread.
133///
134/// If the non-blocking writer is not configured in [lossy mode], the error
135/// count should always be 0.
136///
137/// [lossy mode]: NonBlockingBuilder::lossy
138#[derive(Clone, Debug)]
139pub struct ErrorCounter(Arc<AtomicUsize>);
140
141impl NonBlocking {
142    /// Returns a new `NonBlocking` writer wrapping the provided `writer`.
143    ///
144    /// The returned `NonBlocking` writer will have the [default configuration][default] values.
145    /// Other configurations can be specified using the [builder] interface.
146    ///
147    /// [default]: NonBlockingBuilder::default
148    /// [builder]: NonBlockingBuilder
149    pub fn new<T: Write + Send + 'static>(writer: T) -> (NonBlocking, WorkerGuard) {
150        NonBlockingBuilder::default().finish(writer)
151    }
152
153    fn create<T: Write + Send + 'static>(
154        writer: T,
155        buffered_lines_limit: usize,
156        is_lossy: bool,
157        thread_name: String,
158    ) -> (NonBlocking, WorkerGuard) {
159        let (sender, receiver) = bounded(buffered_lines_limit);
160
161        let (shutdown_sender, shutdown_receiver) = bounded(0);
162
163        let worker = Worker::new(receiver, writer, shutdown_receiver);
164        let worker_guard = WorkerGuard::new(
165            worker.worker_thread(thread_name),
166            sender.clone(),
167            shutdown_sender,
168        );
169
170        (
171            Self {
172                channel: sender,
173                error_counter: ErrorCounter(Arc::new(AtomicUsize::new(0))),
174                is_lossy,
175            },
176            worker_guard,
177        )
178    }
179
180    /// Returns a counter for the number of times logs where dropped. This will always return zero if
181    /// `NonBlocking` is not lossy.
182    pub fn error_counter(&self) -> ErrorCounter {
183        self.error_counter.clone()
184    }
185}
186
187/// A builder for [`NonBlocking`][non-blocking].
188///
189/// [non-blocking]: NonBlocking
190#[derive(Debug)]
191pub struct NonBlockingBuilder {
192    buffered_lines_limit: usize,
193    is_lossy: bool,
194    thread_name: String,
195}
196
197impl NonBlockingBuilder {
198    /// Sets the number of lines to buffer before dropping logs or exerting backpressure on senders
199    pub fn buffered_lines_limit(mut self, buffered_lines_limit: usize) -> NonBlockingBuilder {
200        self.buffered_lines_limit = buffered_lines_limit;
201        self
202    }
203
204    /// Sets whether `NonBlocking` should be lossy or not.
205    ///
206    /// If set to `true`, logs will be dropped when the buffered limit is reached. If `false`, backpressure
207    /// will be exerted on senders, blocking them until the buffer has capacity again.
208    ///
209    /// By default, the built `NonBlocking` will be lossy.
210    pub fn lossy(mut self, is_lossy: bool) -> NonBlockingBuilder {
211        self.is_lossy = is_lossy;
212        self
213    }
214
215    /// Override the worker thread's name.
216    ///
217    /// The default worker thread name is "tracing-appender".
218    pub fn thread_name(mut self, name: &str) -> NonBlockingBuilder {
219        self.thread_name = name.to_string();
220        self
221    }
222
223    /// Completes the builder, returning the configured `NonBlocking`.
224    pub fn finish<T: Write + Send + 'static>(self, writer: T) -> (NonBlocking, WorkerGuard) {
225        NonBlocking::create(
226            writer,
227            self.buffered_lines_limit,
228            self.is_lossy,
229            self.thread_name,
230        )
231    }
232}
233
234impl Default for NonBlockingBuilder {
235    fn default() -> Self {
236        NonBlockingBuilder {
237            buffered_lines_limit: DEFAULT_BUFFERED_LINES_LIMIT,
238            is_lossy: true,
239            thread_name: "tracing-appender".to_string(),
240        }
241    }
242}
243
244impl std::io::Write for NonBlocking {
245    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
246        let buf_size = buf.len();
247        if self.is_lossy {
248            if self.channel.try_send(Msg::Line(buf.to_vec())).is_err() {
249                self.error_counter.incr_saturating();
250            }
251        } else {
252            return match self.channel.send(Msg::Line(buf.to_vec())) {
253                Ok(_) => Ok(buf_size),
254                Err(_) => Err(io::Error::from(io::ErrorKind::Other)),
255            };
256        }
257        Ok(buf_size)
258    }
259
260    fn flush(&mut self) -> io::Result<()> {
261        Ok(())
262    }
263
264    #[inline]
265    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
266        self.write(buf).map(|_| ())
267    }
268}
269
270impl<'a> MakeWriter<'a> for NonBlocking {
271    type Writer = NonBlocking;
272
273    fn make_writer(&'a self) -> Self::Writer {
274        self.clone()
275    }
276}
277
278impl WorkerGuard {
279    fn new(handle: JoinHandle<()>, sender: Sender<Msg>, shutdown: Sender<()>) -> Self {
280        WorkerGuard {
281            _guard: Some(handle),
282            sender,
283            shutdown,
284        }
285    }
286}
287
288impl Drop for WorkerGuard {
289    fn drop(&mut self) {
290        match self
291            .sender
292            .send_timeout(Msg::Shutdown, Duration::from_millis(100))
293        {
294            Ok(_) => {
295                // Attempt to wait for `Worker` to flush all messages before dropping. This happens
296                // when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout`
297                // so that drop is not blocked indefinitely.
298                // TODO: Make timeout configurable.
299                let _ = self.shutdown.send_timeout((), Duration::from_millis(1000));
300            }
301            Err(SendTimeoutError::Disconnected(_)) => (),
302            Err(SendTimeoutError::Timeout(e)) => println!(
303                "Failed to send shutdown signal to logging worker. Error: {:?}",
304                e
305            ),
306        }
307    }
308}
309
310// === impl ErrorCounter ===
311
312impl ErrorCounter {
313    /// Returns the number of log lines that have been dropped.
314    ///
315    /// If the non-blocking writer is not configured in [lossy mode], the error
316    /// count should always be 0.
317    ///
318    /// [lossy mode]: NonBlockingBuilder::lossy
319    pub fn dropped_lines(&self) -> usize {
320        self.0.load(Ordering::Acquire)
321    }
322
323    fn incr_saturating(&self) {
324        let mut curr = self.0.load(Ordering::Acquire);
325        // We don't need to enter the CAS loop if the current value is already
326        // `usize::MAX`.
327        if curr == usize::MAX {
328            return;
329        }
330
331        // This is implemented as a CAS loop rather than as a simple
332        // `fetch_add`, because we don't want to wrap on overflow. Instead, we
333        // need to ensure that saturating addition is performed.
334        loop {
335            let val = curr.saturating_add(1);
336            match self
337                .0
338                .compare_exchange(curr, val, Ordering::AcqRel, Ordering::Acquire)
339            {
340                Ok(_) => return,
341                Err(actual) => curr = actual,
342            }
343        }
344    }
345}
346
347#[cfg(test)]
348mod test {
349    use super::*;
350    use std::sync::mpsc;
351    use std::thread;
352    use std::time::Duration;
353
354    struct MockWriter {
355        tx: mpsc::SyncSender<String>,
356    }
357
358    impl MockWriter {
359        fn new(capacity: usize) -> (Self, mpsc::Receiver<String>) {
360            let (tx, rx) = mpsc::sync_channel(capacity);
361            (Self { tx }, rx)
362        }
363    }
364
365    impl std::io::Write for MockWriter {
366        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
367            let buf_len = buf.len();
368            let _ = self.tx.send(String::from_utf8_lossy(buf).to_string());
369            Ok(buf_len)
370        }
371
372        fn flush(&mut self) -> std::io::Result<()> {
373            Ok(())
374        }
375    }
376
377    #[test]
378    fn backpressure_exerted() {
379        let (mock_writer, rx) = MockWriter::new(1);
380
381        let (mut non_blocking, _guard) = self::NonBlockingBuilder::default()
382            .lossy(false)
383            .buffered_lines_limit(1)
384            .finish(mock_writer);
385
386        let error_count = non_blocking.error_counter();
387
388        non_blocking.write_all(b"Hello").expect("Failed to write");
389        assert_eq!(0, error_count.dropped_lines());
390
391        let handle = thread::spawn(move || {
392            non_blocking.write_all(b", World").expect("Failed to write");
393        });
394
395        // Sleep a little to ensure previously spawned thread gets blocked on write.
396        thread::sleep(Duration::from_millis(100));
397        // We should not drop logs when blocked.
398        assert_eq!(0, error_count.dropped_lines());
399
400        // Read the first message to unblock sender.
401        let mut line = rx.recv().unwrap();
402        assert_eq!(line, "Hello");
403
404        // Wait for thread to finish.
405        handle.join().expect("thread should not panic");
406
407        // Thread has joined, we should be able to read the message it sent.
408        line = rx.recv().unwrap();
409        assert_eq!(line, ", World");
410    }
411
412    fn write_non_blocking(non_blocking: &mut NonBlocking, msg: &[u8]) {
413        non_blocking.write_all(msg).expect("Failed to write");
414
415        // Sleep a bit to prevent races.
416        thread::sleep(Duration::from_millis(200));
417    }
418
419    #[test]
420    #[ignore] // flaky, see https://github.com/tokio-rs/tracing/issues/751
421    fn logs_dropped_if_lossy() {
422        let (mock_writer, rx) = MockWriter::new(1);
423
424        let (mut non_blocking, _guard) = self::NonBlockingBuilder::default()
425            .lossy(true)
426            .buffered_lines_limit(1)
427            .finish(mock_writer);
428
429        let error_count = non_blocking.error_counter();
430
431        // First write will not block
432        write_non_blocking(&mut non_blocking, b"Hello");
433        assert_eq!(0, error_count.dropped_lines());
434
435        // Second write will not block as Worker will have called `recv` on channel.
436        // "Hello" is not yet consumed. MockWriter call to write_all will block until
437        // "Hello" is consumed.
438        write_non_blocking(&mut non_blocking, b", World");
439        assert_eq!(0, error_count.dropped_lines());
440
441        // Will sit in NonBlocking channel's buffer.
442        write_non_blocking(&mut non_blocking, b"Test");
443        assert_eq!(0, error_count.dropped_lines());
444
445        // Allow a line to be written. "Hello" message will be consumed.
446        // ", World" will be able to write to MockWriter.
447        // "Test" will block on call to MockWriter's `write_all`
448        let line = rx.recv().unwrap();
449        assert_eq!(line, "Hello");
450
451        // This will block as NonBlocking channel is full.
452        write_non_blocking(&mut non_blocking, b"Universe");
453        assert_eq!(1, error_count.dropped_lines());
454
455        // Finally the second message sent will be consumed.
456        let line = rx.recv().unwrap();
457        assert_eq!(line, ", World");
458        assert_eq!(1, error_count.dropped_lines());
459    }
460
461    #[test]
462    fn multi_threaded_writes() {
463        let (mock_writer, rx) = MockWriter::new(DEFAULT_BUFFERED_LINES_LIMIT);
464
465        let (non_blocking, _guard) = self::NonBlockingBuilder::default()
466            .lossy(true)
467            .finish(mock_writer);
468
469        let error_count = non_blocking.error_counter();
470        let mut join_handles: Vec<JoinHandle<()>> = Vec::with_capacity(10);
471
472        for _ in 0..10 {
473            let cloned_non_blocking = non_blocking.clone();
474            join_handles.push(thread::spawn(move || {
475                let subscriber = tracing_subscriber::fmt().with_writer(cloned_non_blocking);
476                tracing::subscriber::with_default(subscriber.finish(), || {
477                    tracing::event!(tracing::Level::INFO, "Hello");
478                });
479            }));
480        }
481
482        for handle in join_handles {
483            handle.join().expect("Failed to join thread");
484        }
485
486        let mut hello_count: u8 = 0;
487
488        while let Ok(event_str) = rx.recv_timeout(Duration::from_secs(5)) {
489            assert!(event_str.contains("Hello"));
490            hello_count += 1;
491        }
492
493        assert_eq!(10, hello_count);
494        assert_eq!(0, error_count.dropped_lines());
495    }
496}