tokio/net/unix/
stream.rs

1use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
2use crate::net::unix::split::{split, ReadHalf, WriteHalf};
3use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
4use crate::net::unix::ucred::{self, UCred};
5use crate::net::unix::SocketAddr;
6
7use std::fmt;
8use std::future::poll_fn;
9use std::io::{self, Read, Write};
10use std::net::Shutdown;
11#[cfg(target_os = "android")]
12use std::os::android::net::SocketAddrExt;
13#[cfg(target_os = "linux")]
14use std::os::linux::net::SocketAddrExt;
15#[cfg(any(target_os = "linux", target_os = "android"))]
16use std::os::unix::ffi::OsStrExt;
17use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
18use std::os::unix::net::{self, SocketAddr as StdSocketAddr};
19use std::path::Path;
20use std::pin::Pin;
21use std::task::{Context, Poll};
22
23cfg_io_util! {
24    use bytes::BufMut;
25}
26
27cfg_net_unix! {
28    /// A structure representing a connected Unix socket.
29    ///
30    /// This socket can be connected directly with [`UnixStream::connect`] or accepted
31    /// from a listener with [`UnixListener::accept`]. Additionally, a pair of
32    /// anonymous Unix sockets can be created with `UnixStream::pair`.
33    ///
34    /// To shut down the stream in the write direction, you can call the
35    /// [`shutdown()`] method. This will cause the other peer to receive a read of
36    /// length 0, indicating that no more data will be sent. This only closes
37    /// the stream in one direction.
38    ///
39    /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
40    /// [`UnixListener::accept`]: crate::net::UnixListener::accept
41    #[cfg_attr(docsrs, doc(alias = "uds"))]
42    pub struct UnixStream {
43        io: PollEvented<mio::net::UnixStream>,
44    }
45}
46
47impl UnixStream {
48    pub(crate) async fn connect_mio(sys: mio::net::UnixStream) -> io::Result<UnixStream> {
49        let stream = UnixStream::new(sys)?;
50
51        // Once we've connected, wait for the stream to be writable as
52        // that's when the actual connection has been initiated. Once we're
53        // writable we check for `take_socket_error` to see if the connect
54        // actually hit an error or not.
55        //
56        // If all that succeeded then we ship everything on up.
57        poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
58
59        if let Some(e) = stream.io.take_error()? {
60            return Err(e);
61        }
62
63        Ok(stream)
64    }
65
66    /// Connects to the socket named by `path`.
67    ///
68    /// This function will create a new Unix socket and connect to the path
69    /// specified, associating the returned stream with the default event loop's
70    /// handle.
71    pub async fn connect<P>(path: P) -> io::Result<UnixStream>
72    where
73        P: AsRef<Path>,
74    {
75        // On linux, abstract socket paths need to be considered.
76        #[cfg(any(target_os = "linux", target_os = "android"))]
77        let addr = {
78            let os_str_bytes = path.as_ref().as_os_str().as_bytes();
79            if os_str_bytes.starts_with(b"\0") {
80                StdSocketAddr::from_abstract_name(&os_str_bytes[1..])?
81            } else {
82                StdSocketAddr::from_pathname(path)?
83            }
84        };
85        #[cfg(not(any(target_os = "linux", target_os = "android")))]
86        let addr = StdSocketAddr::from_pathname(path)?;
87
88        let stream = mio::net::UnixStream::connect_addr(&addr)?;
89        let stream = UnixStream::new(stream)?;
90
91        poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
92
93        if let Some(e) = stream.io.take_error()? {
94            return Err(e);
95        }
96
97        Ok(stream)
98    }
99
100    /// Waits for any of the requested ready states.
101    ///
102    /// This function is usually paired with `try_read()` or `try_write()`. It
103    /// can be used to concurrently read / write to the same socket on a single
104    /// task without splitting the socket.
105    ///
106    /// The function may complete without the socket being ready. This is a
107    /// false-positive and attempting an operation will return with
108    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
109    /// [`Ready`] set, so you should always check the returned value and possibly
110    /// wait again if the requested states are not set.
111    ///
112    /// # Cancel safety
113    ///
114    /// This method is cancel safe. Once a readiness event occurs, the method
115    /// will continue to return immediately until the readiness event is
116    /// consumed by an attempt to read or write that fails with `WouldBlock` or
117    /// `Poll::Pending`.
118    ///
119    /// # Examples
120    ///
121    /// Concurrently read and write to the stream on the same task without
122    /// splitting.
123    ///
124    /// ```no_run
125    /// use tokio::io::Interest;
126    /// use tokio::net::UnixStream;
127    /// use std::error::Error;
128    /// use std::io;
129    ///
130    /// #[tokio::main]
131    /// async fn main() -> Result<(), Box<dyn Error>> {
132    ///     let dir = tempfile::tempdir().unwrap();
133    ///     let bind_path = dir.path().join("bind_path");
134    ///     let stream = UnixStream::connect(bind_path).await?;
135    ///
136    ///     loop {
137    ///         let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
138    ///
139    ///         if ready.is_readable() {
140    ///             let mut data = vec![0; 1024];
141    ///             // Try to read data, this may still fail with `WouldBlock`
142    ///             // if the readiness event is a false positive.
143    ///             match stream.try_read(&mut data) {
144    ///                 Ok(n) => {
145    ///                     println!("read {} bytes", n);        
146    ///                 }
147    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
148    ///                     continue;
149    ///                 }
150    ///                 Err(e) => {
151    ///                     return Err(e.into());
152    ///                 }
153    ///             }
154    ///
155    ///         }
156    ///
157    ///         if ready.is_writable() {
158    ///             // Try to write data, this may still fail with `WouldBlock`
159    ///             // if the readiness event is a false positive.
160    ///             match stream.try_write(b"hello world") {
161    ///                 Ok(n) => {
162    ///                     println!("write {} bytes", n);
163    ///                 }
164    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
165    ///                     continue;
166    ///                 }
167    ///                 Err(e) => {
168    ///                     return Err(e.into());
169    ///                 }
170    ///             }
171    ///         }
172    ///     }
173    /// }
174    /// ```
175    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
176        let event = self.io.registration().readiness(interest).await?;
177        Ok(event.ready)
178    }
179
180    /// Waits for the socket to become readable.
181    ///
182    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
183    /// paired with `try_read()`.
184    ///
185    /// # Cancel safety
186    ///
187    /// This method is cancel safe. Once a readiness event occurs, the method
188    /// will continue to return immediately until the readiness event is
189    /// consumed by an attempt to read that fails with `WouldBlock` or
190    /// `Poll::Pending`.
191    ///
192    /// # Examples
193    ///
194    /// ```no_run
195    /// use tokio::net::UnixStream;
196    /// use std::error::Error;
197    /// use std::io;
198    ///
199    /// #[tokio::main]
200    /// async fn main() -> Result<(), Box<dyn Error>> {
201    ///     // Connect to a peer
202    ///     let dir = tempfile::tempdir().unwrap();
203    ///     let bind_path = dir.path().join("bind_path");
204    ///     let stream = UnixStream::connect(bind_path).await?;
205    ///
206    ///     let mut msg = vec![0; 1024];
207    ///
208    ///     loop {
209    ///         // Wait for the socket to be readable
210    ///         stream.readable().await?;
211    ///
212    ///         // Try to read data, this may still fail with `WouldBlock`
213    ///         // if the readiness event is a false positive.
214    ///         match stream.try_read(&mut msg) {
215    ///             Ok(n) => {
216    ///                 msg.truncate(n);
217    ///                 break;
218    ///             }
219    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
220    ///                 continue;
221    ///             }
222    ///             Err(e) => {
223    ///                 return Err(e.into());
224    ///             }
225    ///         }
226    ///     }
227    ///
228    ///     println!("GOT = {:?}", msg);
229    ///     Ok(())
230    /// }
231    /// ```
232    pub async fn readable(&self) -> io::Result<()> {
233        self.ready(Interest::READABLE).await?;
234        Ok(())
235    }
236
237    /// Polls for read readiness.
238    ///
239    /// If the unix stream is not currently ready for reading, this method will
240    /// store a clone of the `Waker` from the provided `Context`. When the unix
241    /// stream becomes ready for reading, `Waker::wake` will be called on the
242    /// waker.
243    ///
244    /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
245    /// the `Waker` from the `Context` passed to the most recent call is
246    /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
247    /// second, independent waker.)
248    ///
249    /// This function is intended for cases where creating and pinning a future
250    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
251    /// preferred, as this supports polling from multiple tasks at once.
252    ///
253    /// # Return value
254    ///
255    /// The function returns:
256    ///
257    /// * `Poll::Pending` if the unix stream is not ready for reading.
258    /// * `Poll::Ready(Ok(()))` if the unix stream is ready for reading.
259    /// * `Poll::Ready(Err(e))` if an error is encountered.
260    ///
261    /// # Errors
262    ///
263    /// This function may encounter any standard I/O error except `WouldBlock`.
264    ///
265    /// [`readable`]: method@Self::readable
266    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
267        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
268    }
269
270    /// Try to read data from the stream into the provided buffer, returning how
271    /// many bytes were read.
272    ///
273    /// Receives any pending data from the socket but does not wait for new data
274    /// to arrive. On success, returns the number of bytes read. Because
275    /// `try_read()` is non-blocking, the buffer does not have to be stored by
276    /// the async task and can exist entirely on the stack.
277    ///
278    /// Usually, [`readable()`] or [`ready()`] is used with this function.
279    ///
280    /// [`readable()`]: UnixStream::readable()
281    /// [`ready()`]: UnixStream::ready()
282    ///
283    /// # Return
284    ///
285    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
286    /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
287    ///
288    /// 1. The stream's read half is closed and will no longer yield data.
289    /// 2. The specified buffer was 0 bytes in length.
290    ///
291    /// If the stream is not ready to read data,
292    /// `Err(io::ErrorKind::WouldBlock)` is returned.
293    ///
294    /// # Examples
295    ///
296    /// ```no_run
297    /// use tokio::net::UnixStream;
298    /// use std::error::Error;
299    /// use std::io;
300    ///
301    /// #[tokio::main]
302    /// async fn main() -> Result<(), Box<dyn Error>> {
303    ///     // Connect to a peer
304    ///     let dir = tempfile::tempdir().unwrap();
305    ///     let bind_path = dir.path().join("bind_path");
306    ///     let stream = UnixStream::connect(bind_path).await?;
307    ///
308    ///     loop {
309    ///         // Wait for the socket to be readable
310    ///         stream.readable().await?;
311    ///
312    ///         // Creating the buffer **after** the `await` prevents it from
313    ///         // being stored in the async task.
314    ///         let mut buf = [0; 4096];
315    ///
316    ///         // Try to read data, this may still fail with `WouldBlock`
317    ///         // if the readiness event is a false positive.
318    ///         match stream.try_read(&mut buf) {
319    ///             Ok(0) => break,
320    ///             Ok(n) => {
321    ///                 println!("read {} bytes", n);
322    ///             }
323    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
324    ///                 continue;
325    ///             }
326    ///             Err(e) => {
327    ///                 return Err(e.into());
328    ///             }
329    ///         }
330    ///     }
331    ///
332    ///     Ok(())
333    /// }
334    /// ```
335    pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
336        self.io
337            .registration()
338            .try_io(Interest::READABLE, || (&*self.io).read(buf))
339    }
340
341    /// Tries to read data from the stream into the provided buffers, returning
342    /// how many bytes were read.
343    ///
344    /// Data is copied to fill each buffer in order, with the final buffer
345    /// written to possibly being only partially filled. This method behaves
346    /// equivalently to a single call to [`try_read()`] with concatenated
347    /// buffers.
348    ///
349    /// Receives any pending data from the socket but does not wait for new data
350    /// to arrive. On success, returns the number of bytes read. Because
351    /// `try_read_vectored()` is non-blocking, the buffer does not have to be
352    /// stored by the async task and can exist entirely on the stack.
353    ///
354    /// Usually, [`readable()`] or [`ready()`] is used with this function.
355    ///
356    /// [`try_read()`]: UnixStream::try_read()
357    /// [`readable()`]: UnixStream::readable()
358    /// [`ready()`]: UnixStream::ready()
359    ///
360    /// # Return
361    ///
362    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
363    /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
364    /// and will no longer yield data. If the stream is not ready to read data
365    /// `Err(io::ErrorKind::WouldBlock)` is returned.
366    ///
367    /// # Examples
368    ///
369    /// ```no_run
370    /// use tokio::net::UnixStream;
371    /// use std::error::Error;
372    /// use std::io::{self, IoSliceMut};
373    ///
374    /// #[tokio::main]
375    /// async fn main() -> Result<(), Box<dyn Error>> {
376    ///     // Connect to a peer
377    ///     let dir = tempfile::tempdir().unwrap();
378    ///     let bind_path = dir.path().join("bind_path");
379    ///     let stream = UnixStream::connect(bind_path).await?;
380    ///
381    ///     loop {
382    ///         // Wait for the socket to be readable
383    ///         stream.readable().await?;
384    ///
385    ///         // Creating the buffer **after** the `await` prevents it from
386    ///         // being stored in the async task.
387    ///         let mut buf_a = [0; 512];
388    ///         let mut buf_b = [0; 1024];
389    ///         let mut bufs = [
390    ///             IoSliceMut::new(&mut buf_a),
391    ///             IoSliceMut::new(&mut buf_b),
392    ///         ];
393    ///
394    ///         // Try to read data, this may still fail with `WouldBlock`
395    ///         // if the readiness event is a false positive.
396    ///         match stream.try_read_vectored(&mut bufs) {
397    ///             Ok(0) => break,
398    ///             Ok(n) => {
399    ///                 println!("read {} bytes", n);
400    ///             }
401    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
402    ///                 continue;
403    ///             }
404    ///             Err(e) => {
405    ///                 return Err(e.into());
406    ///             }
407    ///         }
408    ///     }
409    ///
410    ///     Ok(())
411    /// }
412    /// ```
413    pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
414        self.io
415            .registration()
416            .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
417    }
418
419    cfg_io_util! {
420        /// Tries to read data from the stream into the provided buffer, advancing the
421        /// buffer's internal cursor, returning how many bytes were read.
422        ///
423        /// Receives any pending data from the socket but does not wait for new data
424        /// to arrive. On success, returns the number of bytes read. Because
425        /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
426        /// the async task and can exist entirely on the stack.
427        ///
428        /// Usually, [`readable()`] or [`ready()`] is used with this function.
429        ///
430        /// [`readable()`]: UnixStream::readable()
431        /// [`ready()`]: UnixStream::ready()
432        ///
433        /// # Return
434        ///
435        /// If data is successfully read, `Ok(n)` is returned, where `n` is the
436        /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
437        /// and will no longer yield data. If the stream is not ready to read data
438        /// `Err(io::ErrorKind::WouldBlock)` is returned.
439        ///
440        /// # Examples
441        ///
442        /// ```no_run
443        /// use tokio::net::UnixStream;
444        /// use std::error::Error;
445        /// use std::io;
446        ///
447        /// #[tokio::main]
448        /// async fn main() -> Result<(), Box<dyn Error>> {
449        ///     // Connect to a peer
450        ///     let dir = tempfile::tempdir().unwrap();
451        ///     let bind_path = dir.path().join("bind_path");
452        ///     let stream = UnixStream::connect(bind_path).await?;
453        ///
454        ///     loop {
455        ///         // Wait for the socket to be readable
456        ///         stream.readable().await?;
457        ///
458        ///         let mut buf = Vec::with_capacity(4096);
459        ///
460        ///         // Try to read data, this may still fail with `WouldBlock`
461        ///         // if the readiness event is a false positive.
462        ///         match stream.try_read_buf(&mut buf) {
463        ///             Ok(0) => break,
464        ///             Ok(n) => {
465        ///                 println!("read {} bytes", n);
466        ///             }
467        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
468        ///                 continue;
469        ///             }
470        ///             Err(e) => {
471        ///                 return Err(e.into());
472        ///             }
473        ///         }
474        ///     }
475        ///
476        ///     Ok(())
477        /// }
478        /// ```
479        pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
480            self.io.registration().try_io(Interest::READABLE, || {
481                use std::io::Read;
482
483                let dst = buf.chunk_mut();
484                let dst =
485                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
486
487                // Safety: We trust `UnixStream::read` to have filled up `n` bytes in the
488                // buffer.
489                let n = (&*self.io).read(dst)?;
490
491                unsafe {
492                    buf.advance_mut(n);
493                }
494
495                Ok(n)
496            })
497        }
498    }
499
500    /// Waits for the socket to become writable.
501    ///
502    /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
503    /// paired with `try_write()`.
504    ///
505    /// # Cancel safety
506    ///
507    /// This method is cancel safe. Once a readiness event occurs, the method
508    /// will continue to return immediately until the readiness event is
509    /// consumed by an attempt to write that fails with `WouldBlock` or
510    /// `Poll::Pending`.
511    ///
512    /// # Examples
513    ///
514    /// ```no_run
515    /// use tokio::net::UnixStream;
516    /// use std::error::Error;
517    /// use std::io;
518    ///
519    /// #[tokio::main]
520    /// async fn main() -> Result<(), Box<dyn Error>> {
521    ///     // Connect to a peer
522    ///     let dir = tempfile::tempdir().unwrap();
523    ///     let bind_path = dir.path().join("bind_path");
524    ///     let stream = UnixStream::connect(bind_path).await?;
525    ///
526    ///     loop {
527    ///         // Wait for the socket to be writable
528    ///         stream.writable().await?;
529    ///
530    ///         // Try to write data, this may still fail with `WouldBlock`
531    ///         // if the readiness event is a false positive.
532    ///         match stream.try_write(b"hello world") {
533    ///             Ok(n) => {
534    ///                 break;
535    ///             }
536    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
537    ///                 continue;
538    ///             }
539    ///             Err(e) => {
540    ///                 return Err(e.into());
541    ///             }
542    ///         }
543    ///     }
544    ///
545    ///     Ok(())
546    /// }
547    /// ```
548    pub async fn writable(&self) -> io::Result<()> {
549        self.ready(Interest::WRITABLE).await?;
550        Ok(())
551    }
552
553    /// Polls for write readiness.
554    ///
555    /// If the unix stream is not currently ready for writing, this method will
556    /// store a clone of the `Waker` from the provided `Context`. When the unix
557    /// stream becomes ready for writing, `Waker::wake` will be called on the
558    /// waker.
559    ///
560    /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
561    /// the `Waker` from the `Context` passed to the most recent call is
562    /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
563    /// second, independent waker.)
564    ///
565    /// This function is intended for cases where creating and pinning a future
566    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
567    /// preferred, as this supports polling from multiple tasks at once.
568    ///
569    /// # Return value
570    ///
571    /// The function returns:
572    ///
573    /// * `Poll::Pending` if the unix stream is not ready for writing.
574    /// * `Poll::Ready(Ok(()))` if the unix stream is ready for writing.
575    /// * `Poll::Ready(Err(e))` if an error is encountered.
576    ///
577    /// # Errors
578    ///
579    /// This function may encounter any standard I/O error except `WouldBlock`.
580    ///
581    /// [`writable`]: method@Self::writable
582    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
583        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
584    }
585
586    /// Tries to write a buffer to the stream, returning how many bytes were
587    /// written.
588    ///
589    /// The function will attempt to write the entire contents of `buf`, but
590    /// only part of the buffer may be written.
591    ///
592    /// This function is usually paired with `writable()`.
593    ///
594    /// # Return
595    ///
596    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
597    /// number of bytes written. If the stream is not ready to write data,
598    /// `Err(io::ErrorKind::WouldBlock)` is returned.
599    ///
600    /// # Examples
601    ///
602    /// ```no_run
603    /// use tokio::net::UnixStream;
604    /// use std::error::Error;
605    /// use std::io;
606    ///
607    /// #[tokio::main]
608    /// async fn main() -> Result<(), Box<dyn Error>> {
609    ///     // Connect to a peer
610    ///     let dir = tempfile::tempdir().unwrap();
611    ///     let bind_path = dir.path().join("bind_path");
612    ///     let stream = UnixStream::connect(bind_path).await?;
613    ///
614    ///     loop {
615    ///         // Wait for the socket to be writable
616    ///         stream.writable().await?;
617    ///
618    ///         // Try to write data, this may still fail with `WouldBlock`
619    ///         // if the readiness event is a false positive.
620    ///         match stream.try_write(b"hello world") {
621    ///             Ok(n) => {
622    ///                 break;
623    ///             }
624    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
625    ///                 continue;
626    ///             }
627    ///             Err(e) => {
628    ///                 return Err(e.into());
629    ///             }
630    ///         }
631    ///     }
632    ///
633    ///     Ok(())
634    /// }
635    /// ```
636    pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
637        self.io
638            .registration()
639            .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
640    }
641
642    /// Tries to write several buffers to the stream, returning how many bytes
643    /// were written.
644    ///
645    /// Data is written from each buffer in order, with the final buffer read
646    /// from possible being only partially consumed. This method behaves
647    /// equivalently to a single call to [`try_write()`] with concatenated
648    /// buffers.
649    ///
650    /// This function is usually paired with `writable()`.
651    ///
652    /// [`try_write()`]: UnixStream::try_write()
653    ///
654    /// # Return
655    ///
656    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
657    /// number of bytes written. If the stream is not ready to write data,
658    /// `Err(io::ErrorKind::WouldBlock)` is returned.
659    ///
660    /// # Examples
661    ///
662    /// ```no_run
663    /// use tokio::net::UnixStream;
664    /// use std::error::Error;
665    /// use std::io;
666    ///
667    /// #[tokio::main]
668    /// async fn main() -> Result<(), Box<dyn Error>> {
669    ///     // Connect to a peer
670    ///     let dir = tempfile::tempdir().unwrap();
671    ///     let bind_path = dir.path().join("bind_path");
672    ///     let stream = UnixStream::connect(bind_path).await?;
673    ///
674    ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
675    ///
676    ///     loop {
677    ///         // Wait for the socket to be writable
678    ///         stream.writable().await?;
679    ///
680    ///         // Try to write data, this may still fail with `WouldBlock`
681    ///         // if the readiness event is a false positive.
682    ///         match stream.try_write_vectored(&bufs) {
683    ///             Ok(n) => {
684    ///                 break;
685    ///             }
686    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
687    ///                 continue;
688    ///             }
689    ///             Err(e) => {
690    ///                 return Err(e.into());
691    ///             }
692    ///         }
693    ///     }
694    ///
695    ///     Ok(())
696    /// }
697    /// ```
698    pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
699        self.io
700            .registration()
701            .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
702    }
703
704    /// Tries to read or write from the socket using a user-provided IO operation.
705    ///
706    /// If the socket is ready, the provided closure is called. The closure
707    /// should attempt to perform IO operation on the socket by manually
708    /// calling the appropriate syscall. If the operation fails because the
709    /// socket is not actually ready, then the closure should return a
710    /// `WouldBlock` error and the readiness flag is cleared. The return value
711    /// of the closure is then returned by `try_io`.
712    ///
713    /// If the socket is not ready, then the closure is not called
714    /// and a `WouldBlock` error is returned.
715    ///
716    /// The closure should only return a `WouldBlock` error if it has performed
717    /// an IO operation on the socket that failed due to the socket not being
718    /// ready. Returning a `WouldBlock` error in any other situation will
719    /// incorrectly clear the readiness flag, which can cause the socket to
720    /// behave incorrectly.
721    ///
722    /// The closure should not perform the IO operation using any of the methods
723    /// defined on the Tokio `UnixStream` type, as this will mess with the
724    /// readiness flag and can cause the socket to behave incorrectly.
725    ///
726    /// This method is not intended to be used with combined interests.
727    /// The closure should perform only one type of IO operation, so it should not
728    /// require more than one ready state. This method may panic or sleep forever
729    /// if it is called with a combined interest.
730    ///
731    /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
732    ///
733    /// [`readable()`]: UnixStream::readable()
734    /// [`writable()`]: UnixStream::writable()
735    /// [`ready()`]: UnixStream::ready()
736    pub fn try_io<R>(
737        &self,
738        interest: Interest,
739        f: impl FnOnce() -> io::Result<R>,
740    ) -> io::Result<R> {
741        self.io
742            .registration()
743            .try_io(interest, || self.io.try_io(f))
744    }
745
746    /// Reads or writes from the socket using a user-provided IO operation.
747    ///
748    /// The readiness of the socket is awaited and when the socket is ready,
749    /// the provided closure is called. The closure should attempt to perform
750    /// IO operation on the socket by manually calling the appropriate syscall.
751    /// If the operation fails because the socket is not actually ready,
752    /// then the closure should return a `WouldBlock` error. In such case the
753    /// readiness flag is cleared and the socket readiness is awaited again.
754    /// This loop is repeated until the closure returns an `Ok` or an error
755    /// other than `WouldBlock`.
756    ///
757    /// The closure should only return a `WouldBlock` error if it has performed
758    /// an IO operation on the socket that failed due to the socket not being
759    /// ready. Returning a `WouldBlock` error in any other situation will
760    /// incorrectly clear the readiness flag, which can cause the socket to
761    /// behave incorrectly.
762    ///
763    /// The closure should not perform the IO operation using any of the methods
764    /// defined on the Tokio `UnixStream` type, as this will mess with the
765    /// readiness flag and can cause the socket to behave incorrectly.
766    ///
767    /// This method is not intended to be used with combined interests.
768    /// The closure should perform only one type of IO operation, so it should not
769    /// require more than one ready state. This method may panic or sleep forever
770    /// if it is called with a combined interest.
771    pub async fn async_io<R>(
772        &self,
773        interest: Interest,
774        mut f: impl FnMut() -> io::Result<R>,
775    ) -> io::Result<R> {
776        self.io
777            .registration()
778            .async_io(interest, || self.io.try_io(&mut f))
779            .await
780    }
781
782    /// Creates new [`UnixStream`] from a [`std::os::unix::net::UnixStream`].
783    ///
784    /// This function is intended to be used to wrap a `UnixStream` from the
785    /// standard library in the Tokio equivalent.
786    ///
787    /// # Notes
788    ///
789    /// The caller is responsible for ensuring that the stream is in
790    /// non-blocking mode. Otherwise all I/O operations on the stream
791    /// will block the thread, which will cause unexpected behavior.
792    /// Non-blocking mode can be set using [`set_nonblocking`].
793    ///
794    /// [`set_nonblocking`]: std::os::unix::net::UnixStream::set_nonblocking
795    ///
796    /// # Examples
797    ///
798    /// ```no_run
799    /// use tokio::net::UnixStream;
800    /// use std::os::unix::net::UnixStream as StdUnixStream;
801    /// # use std::error::Error;
802    ///
803    /// # async fn dox() -> Result<(), Box<dyn Error>> {
804    /// let std_stream = StdUnixStream::connect("/path/to/the/socket")?;
805    /// std_stream.set_nonblocking(true)?;
806    /// let stream = UnixStream::from_std(std_stream)?;
807    /// # Ok(())
808    /// # }
809    /// ```
810    ///
811    /// # Panics
812    ///
813    /// This function panics if it is not called from within a runtime with
814    /// IO enabled.
815    ///
816    /// The runtime is usually set implicitly when this function is called
817    /// from a future driven by a tokio runtime, otherwise runtime can be set
818    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
819    #[track_caller]
820    pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
821        let stream = mio::net::UnixStream::from_std(stream);
822        let io = PollEvented::new(stream)?;
823
824        Ok(UnixStream { io })
825    }
826
827    /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`].
828    ///
829    /// The returned [`std::os::unix::net::UnixStream`] will have nonblocking
830    /// mode set as `true`.  Use [`set_nonblocking`] to change the blocking
831    /// mode if needed.
832    ///
833    /// # Examples
834    ///
835    /// ```
836    /// use std::error::Error;
837    /// use std::io::Read;
838    /// use tokio::net::UnixListener;
839    /// # use tokio::net::UnixStream;
840    /// # use tokio::io::AsyncWriteExt;
841    ///
842    /// #[tokio::main]
843    /// async fn main() -> Result<(), Box<dyn Error>> {
844    ///     let dir = tempfile::tempdir().unwrap();
845    ///     let bind_path = dir.path().join("bind_path");
846    ///
847    ///     let mut data = [0u8; 12];
848    ///     let listener = UnixListener::bind(&bind_path)?;
849    /// #   let handle = tokio::spawn(async {
850    /// #       let mut stream = UnixStream::connect(bind_path).await.unwrap();
851    /// #       stream.write(b"Hello world!").await.unwrap();
852    /// #   });
853    ///     let (tokio_unix_stream, _) = listener.accept().await?;
854    ///     let mut std_unix_stream = tokio_unix_stream.into_std()?;
855    /// #   handle.await.expect("The task being joined has panicked");
856    ///     std_unix_stream.set_nonblocking(false)?;
857    ///     std_unix_stream.read_exact(&mut data)?;
858    /// #   assert_eq!(b"Hello world!", &data);
859    ///     Ok(())
860    /// }
861    /// ```
862    /// [`tokio::net::UnixStream`]: UnixStream
863    /// [`std::os::unix::net::UnixStream`]: std::os::unix::net::UnixStream
864    /// [`set_nonblocking`]: fn@std::os::unix::net::UnixStream::set_nonblocking
865    pub fn into_std(self) -> io::Result<std::os::unix::net::UnixStream> {
866        self.io
867            .into_inner()
868            .map(IntoRawFd::into_raw_fd)
869            .map(|raw_fd| unsafe { std::os::unix::net::UnixStream::from_raw_fd(raw_fd) })
870    }
871
872    /// Creates an unnamed pair of connected sockets.
873    ///
874    /// This function will create a pair of interconnected Unix sockets for
875    /// communicating back and forth between one another. Each socket will
876    /// be associated with the default event loop's handle.
877    pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
878        let (a, b) = mio::net::UnixStream::pair()?;
879        let a = UnixStream::new(a)?;
880        let b = UnixStream::new(b)?;
881
882        Ok((a, b))
883    }
884
885    pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> {
886        let io = PollEvented::new(stream)?;
887        Ok(UnixStream { io })
888    }
889
890    /// Returns the socket address of the local half of this connection.
891    ///
892    /// # Examples
893    ///
894    /// ```no_run
895    /// use tokio::net::UnixStream;
896    ///
897    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
898    /// let dir = tempfile::tempdir().unwrap();
899    /// let bind_path = dir.path().join("bind_path");
900    /// let stream = UnixStream::connect(bind_path).await?;
901    ///
902    /// println!("{:?}", stream.local_addr()?);
903    /// # Ok(())
904    /// # }
905    /// ```
906    pub fn local_addr(&self) -> io::Result<SocketAddr> {
907        self.io.local_addr().map(SocketAddr)
908    }
909
910    /// Returns the socket address of the remote half of this connection.
911    ///
912    /// # Examples
913    ///
914    /// ```no_run
915    /// use tokio::net::UnixStream;
916    ///
917    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
918    /// let dir = tempfile::tempdir().unwrap();
919    /// let bind_path = dir.path().join("bind_path");
920    /// let stream = UnixStream::connect(bind_path).await?;
921    ///
922    /// println!("{:?}", stream.peer_addr()?);
923    /// # Ok(())
924    /// # }
925    /// ```
926    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
927        self.io.peer_addr().map(SocketAddr)
928    }
929
930    /// Returns effective credentials of the process which called `connect` or `pair`.
931    pub fn peer_cred(&self) -> io::Result<UCred> {
932        ucred::get_peer_cred(self)
933    }
934
935    /// Returns the value of the `SO_ERROR` option.
936    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
937        self.io.take_error()
938    }
939
940    /// Shuts down the read, write, or both halves of this connection.
941    ///
942    /// This function will cause all pending and future I/O calls on the
943    /// specified portions to immediately return with an appropriate value
944    /// (see the documentation of `Shutdown`).
945    pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
946        self.io.shutdown(how)
947    }
948
949    // These lifetime markers also appear in the generated documentation, and make
950    // it more clear that this is a *borrowed* split.
951    #[allow(clippy::needless_lifetimes)]
952    /// Splits a `UnixStream` into a read half and a write half, which can be used
953    /// to read and write the stream concurrently.
954    ///
955    /// This method is more efficient than [`into_split`], but the halves cannot be
956    /// moved into independently spawned tasks.
957    ///
958    /// [`into_split`]: Self::into_split()
959    pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
960        split(self)
961    }
962
963    /// Splits a `UnixStream` into a read half and a write half, which can be used
964    /// to read and write the stream concurrently.
965    ///
966    /// Unlike [`split`], the owned halves can be moved to separate tasks, however
967    /// this comes at the cost of a heap allocation.
968    ///
969    /// **Note:** Dropping the write half will shut down the write half of the
970    /// stream. This is equivalent to calling [`shutdown()`] on the `UnixStream`.
971    ///
972    /// [`split`]: Self::split()
973    /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
974    pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
975        split_owned(self)
976    }
977}
978
979impl TryFrom<net::UnixStream> for UnixStream {
980    type Error = io::Error;
981
982    /// Consumes stream, returning the tokio I/O object.
983    ///
984    /// This is equivalent to
985    /// [`UnixStream::from_std(stream)`](UnixStream::from_std).
986    fn try_from(stream: net::UnixStream) -> io::Result<Self> {
987        Self::from_std(stream)
988    }
989}
990
991impl AsyncRead for UnixStream {
992    fn poll_read(
993        self: Pin<&mut Self>,
994        cx: &mut Context<'_>,
995        buf: &mut ReadBuf<'_>,
996    ) -> Poll<io::Result<()>> {
997        self.poll_read_priv(cx, buf)
998    }
999}
1000
1001impl AsyncWrite for UnixStream {
1002    fn poll_write(
1003        self: Pin<&mut Self>,
1004        cx: &mut Context<'_>,
1005        buf: &[u8],
1006    ) -> Poll<io::Result<usize>> {
1007        self.poll_write_priv(cx, buf)
1008    }
1009
1010    fn poll_write_vectored(
1011        self: Pin<&mut Self>,
1012        cx: &mut Context<'_>,
1013        bufs: &[io::IoSlice<'_>],
1014    ) -> Poll<io::Result<usize>> {
1015        self.poll_write_vectored_priv(cx, bufs)
1016    }
1017
1018    fn is_write_vectored(&self) -> bool {
1019        true
1020    }
1021
1022    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1023        Poll::Ready(Ok(()))
1024    }
1025
1026    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1027        self.shutdown_std(std::net::Shutdown::Write)?;
1028        Poll::Ready(Ok(()))
1029    }
1030}
1031
1032impl UnixStream {
1033    // == Poll IO functions that takes `&self` ==
1034    //
1035    // To read or write without mutable access to the `UnixStream`, combine the
1036    // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1037    // `try_write` methods.
1038
1039    pub(crate) fn poll_read_priv(
1040        &self,
1041        cx: &mut Context<'_>,
1042        buf: &mut ReadBuf<'_>,
1043    ) -> Poll<io::Result<()>> {
1044        // Safety: `UnixStream::read` correctly handles reads into uninitialized memory
1045        unsafe { self.io.poll_read(cx, buf) }
1046    }
1047
1048    pub(crate) fn poll_write_priv(
1049        &self,
1050        cx: &mut Context<'_>,
1051        buf: &[u8],
1052    ) -> Poll<io::Result<usize>> {
1053        self.io.poll_write(cx, buf)
1054    }
1055
1056    pub(super) fn poll_write_vectored_priv(
1057        &self,
1058        cx: &mut Context<'_>,
1059        bufs: &[io::IoSlice<'_>],
1060    ) -> Poll<io::Result<usize>> {
1061        self.io.poll_write_vectored(cx, bufs)
1062    }
1063}
1064
1065impl fmt::Debug for UnixStream {
1066    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1067        self.io.fmt(f)
1068    }
1069}
1070
1071impl AsRawFd for UnixStream {
1072    fn as_raw_fd(&self) -> RawFd {
1073        self.io.as_raw_fd()
1074    }
1075}
1076
1077impl AsFd for UnixStream {
1078    fn as_fd(&self) -> BorrowedFd<'_> {
1079        unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1080    }
1081}