tracing_appender/
worker.rs
1use crate::Msg;
2use crossbeam_channel::{Receiver, RecvError, TryRecvError};
3use std::fmt::Debug;
4use std::io::Write;
5use std::{io, thread};
6
7pub(crate) struct Worker<T: Write + Send + 'static> {
8 writer: T,
9 receiver: Receiver<Msg>,
10 shutdown: Receiver<()>,
11}
12
13#[derive(Debug, Clone, Copy, Eq, PartialEq)]
14pub(crate) enum WorkerState {
15 Empty,
16 Disconnected,
17 Continue,
18 Shutdown,
19}
20
21impl<T: Write + Send + 'static> Worker<T> {
22 pub(crate) fn new(receiver: Receiver<Msg>, writer: T, shutdown: Receiver<()>) -> Worker<T> {
23 Self {
24 writer,
25 receiver,
26 shutdown,
27 }
28 }
29
30 fn handle_recv(&mut self, result: &Result<Msg, RecvError>) -> io::Result<WorkerState> {
31 match result {
32 Ok(Msg::Line(msg)) => {
33 self.writer.write_all(msg)?;
34 Ok(WorkerState::Continue)
35 }
36 Ok(Msg::Shutdown) => Ok(WorkerState::Shutdown),
37 Err(_) => Ok(WorkerState::Disconnected),
38 }
39 }
40
41 fn handle_try_recv(&mut self, result: &Result<Msg, TryRecvError>) -> io::Result<WorkerState> {
42 match result {
43 Ok(Msg::Line(msg)) => {
44 self.writer.write_all(msg)?;
45 Ok(WorkerState::Continue)
46 }
47 Ok(Msg::Shutdown) => Ok(WorkerState::Shutdown),
48 Err(TryRecvError::Empty) => Ok(WorkerState::Empty),
49 Err(TryRecvError::Disconnected) => Ok(WorkerState::Disconnected),
50 }
51 }
52
53 pub(crate) fn work(&mut self) -> io::Result<WorkerState> {
57 let mut worker_state = self.handle_recv(&self.receiver.recv())?;
59
60 while worker_state == WorkerState::Continue {
61 let try_recv_result = self.receiver.try_recv();
62 let handle_result = self.handle_try_recv(&try_recv_result);
63 worker_state = handle_result?;
64 }
65 self.writer.flush()?;
66 Ok(worker_state)
67 }
68
69 pub(crate) fn worker_thread(mut self, name: String) -> std::thread::JoinHandle<()> {
71 thread::Builder::new()
72 .name(name)
73 .spawn(move || {
74 loop {
75 match self.work() {
76 Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {}
77 Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => {
78 let _ = self.shutdown.recv();
79 break;
80 }
81 Err(_) => {
82 }
84 }
85 }
86 if let Err(e) = self.writer.flush() {
87 eprintln!("Failed to flush. Error: {}", e);
88 }
89 })
90 .expect("failed to spawn `tracing-appender` non-blocking worker thread")
91 }
92}