mio/net/uds/
stream.rs

1use std::fmt;
2use std::io::{self, IoSlice, IoSliceMut, Read, Write};
3use std::net::Shutdown;
4use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
5use std::os::unix::net::{self, SocketAddr};
6use std::path::Path;
7
8use crate::io_source::IoSource;
9use crate::{event, sys, Interest, Registry, Token};
10
11/// A non-blocking Unix stream socket.
12pub struct UnixStream {
13    inner: IoSource<net::UnixStream>,
14}
15
16impl UnixStream {
17    /// Connects to the socket named by `path`.
18    ///
19    /// This may return a `WouldBlock` in which case the socket connection
20    /// cannot be completed immediately. Usually it means the backlog is full.
21    pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
22        let addr = SocketAddr::from_pathname(path)?;
23        UnixStream::connect_addr(&addr)
24    }
25
26    /// Connects to the socket named by `address`.
27    ///
28    /// This may return a `WouldBlock` in which case the socket connection
29    /// cannot be completed immediately. Usually it means the backlog is full.
30    pub fn connect_addr(address: &SocketAddr) -> io::Result<UnixStream> {
31        sys::uds::stream::connect_addr(address).map(UnixStream::from_std)
32    }
33
34    /// Creates a new `UnixStream` from a standard `net::UnixStream`.
35    ///
36    /// This function is intended to be used to wrap a Unix stream from the
37    /// standard library in the Mio equivalent. The conversion assumes nothing
38    /// about the underlying stream; it is left up to the user to set it in
39    /// non-blocking mode.
40    ///
41    /// # Note
42    ///
43    /// The Unix stream here will not have `connect` called on it, so it
44    /// should already be connected via some other means (be it manually, or
45    /// the standard library).
46    pub fn from_std(stream: net::UnixStream) -> UnixStream {
47        UnixStream {
48            inner: IoSource::new(stream),
49        }
50    }
51
52    /// Creates an unnamed pair of connected sockets.
53    ///
54    /// Returns two `UnixStream`s which are connected to each other.
55    pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
56        sys::uds::stream::pair().map(|(stream1, stream2)| {
57            (UnixStream::from_std(stream1), UnixStream::from_std(stream2))
58        })
59    }
60
61    /// Returns the socket address of the local half of this connection.
62    pub fn local_addr(&self) -> io::Result<SocketAddr> {
63        self.inner.local_addr()
64    }
65
66    /// Returns the socket address of the remote half of this connection.
67    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
68        self.inner.peer_addr()
69    }
70
71    /// Returns the value of the `SO_ERROR` option.
72    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
73        self.inner.take_error()
74    }
75
76    /// Shuts down the read, write, or both halves of this connection.
77    ///
78    /// This function will cause all pending and future I/O calls on the
79    /// specified portions to immediately return with an appropriate value
80    /// (see the documentation of `Shutdown`).
81    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
82        self.inner.shutdown(how)
83    }
84
85    /// Execute an I/O operation ensuring that the socket receives more events
86    /// if it hits a [`WouldBlock`] error.
87    ///
88    /// # Notes
89    ///
90    /// This method is required to be called for **all** I/O operations to
91    /// ensure the user will receive events once the socket is ready again after
92    /// returning a [`WouldBlock`] error.
93    ///
94    /// [`WouldBlock`]: io::ErrorKind::WouldBlock
95    ///
96    /// # Examples
97    ///
98    /// ```
99    /// # use std::error::Error;
100    /// #
101    /// # fn main() -> Result<(), Box<dyn Error>> {
102    /// use std::io;
103    /// use std::os::fd::AsRawFd;
104    /// use mio::net::UnixStream;
105    ///
106    /// let (stream1, stream2) = UnixStream::pair()?;
107    ///
108    /// // Wait until the stream is writable...
109    ///
110    /// // Write to the stream using a direct libc call, of course the
111    /// // `io::Write` implementation would be easier to use.
112    /// let buf = b"hello";
113    /// let n = stream1.try_io(|| {
114    ///     let buf_ptr = &buf as *const _ as *const _;
115    ///     let res = unsafe { libc::send(stream1.as_raw_fd(), buf_ptr, buf.len(), 0) };
116    ///     if res != -1 {
117    ///         Ok(res as usize)
118    ///     } else {
119    ///         // If EAGAIN or EWOULDBLOCK is set by libc::send, the closure
120    ///         // should return `WouldBlock` error.
121    ///         Err(io::Error::last_os_error())
122    ///     }
123    /// })?;
124    /// eprintln!("write {} bytes", n);
125    ///
126    /// // Wait until the stream is readable...
127    ///
128    /// // Read from the stream using a direct libc call, of course the
129    /// // `io::Read` implementation would be easier to use.
130    /// let mut buf = [0; 512];
131    /// let n = stream2.try_io(|| {
132    ///     let buf_ptr = &mut buf as *mut _ as *mut _;
133    ///     let res = unsafe { libc::recv(stream2.as_raw_fd(), buf_ptr, buf.len(), 0) };
134    ///     if res != -1 {
135    ///         Ok(res as usize)
136    ///     } else {
137    ///         // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure
138    ///         // should return `WouldBlock` error.
139    ///         Err(io::Error::last_os_error())
140    ///     }
141    /// })?;
142    /// eprintln!("read {} bytes", n);
143    /// # Ok(())
144    /// # }
145    /// ```
146    pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
147    where
148        F: FnOnce() -> io::Result<T>,
149    {
150        self.inner.do_io(|_| f())
151    }
152}
153
154impl Read for UnixStream {
155    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
156        self.inner.do_io(|mut inner| inner.read(buf))
157    }
158
159    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
160        self.inner.do_io(|mut inner| inner.read_vectored(bufs))
161    }
162}
163
164impl<'a> Read for &'a UnixStream {
165    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
166        self.inner.do_io(|mut inner| inner.read(buf))
167    }
168
169    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
170        self.inner.do_io(|mut inner| inner.read_vectored(bufs))
171    }
172}
173
174impl Write for UnixStream {
175    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
176        self.inner.do_io(|mut inner| inner.write(buf))
177    }
178
179    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
180        self.inner.do_io(|mut inner| inner.write_vectored(bufs))
181    }
182
183    fn flush(&mut self) -> io::Result<()> {
184        self.inner.do_io(|mut inner| inner.flush())
185    }
186}
187
188impl<'a> Write for &'a UnixStream {
189    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
190        self.inner.do_io(|mut inner| inner.write(buf))
191    }
192
193    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
194        self.inner.do_io(|mut inner| inner.write_vectored(bufs))
195    }
196
197    fn flush(&mut self) -> io::Result<()> {
198        self.inner.do_io(|mut inner| inner.flush())
199    }
200}
201
202impl event::Source for UnixStream {
203    fn register(
204        &mut self,
205        registry: &Registry,
206        token: Token,
207        interests: Interest,
208    ) -> io::Result<()> {
209        self.inner.register(registry, token, interests)
210    }
211
212    fn reregister(
213        &mut self,
214        registry: &Registry,
215        token: Token,
216        interests: Interest,
217    ) -> io::Result<()> {
218        self.inner.reregister(registry, token, interests)
219    }
220
221    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
222        self.inner.deregister(registry)
223    }
224}
225
226impl fmt::Debug for UnixStream {
227    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228        self.inner.fmt(f)
229    }
230}
231
232impl IntoRawFd for UnixStream {
233    fn into_raw_fd(self) -> RawFd {
234        self.inner.into_inner().into_raw_fd()
235    }
236}
237
238impl AsRawFd for UnixStream {
239    fn as_raw_fd(&self) -> RawFd {
240        self.inner.as_raw_fd()
241    }
242}
243
244impl FromRawFd for UnixStream {
245    /// Converts a `RawFd` to a `UnixStream`.
246    ///
247    /// # Notes
248    ///
249    /// The caller is responsible for ensuring that the socket is in
250    /// non-blocking mode.
251    unsafe fn from_raw_fd(fd: RawFd) -> UnixStream {
252        UnixStream::from_std(FromRawFd::from_raw_fd(fd))
253    }
254}
255
256impl From<UnixStream> for net::UnixStream {
257    fn from(stream: UnixStream) -> Self {
258        // Safety: This is safe since we are extracting the raw fd from a well-constructed
259        // mio::net::uds::UnixStream which ensures that we actually pass in a valid file
260        // descriptor/socket
261        unsafe { net::UnixStream::from_raw_fd(stream.into_raw_fd()) }
262    }
263}
264
265impl From<UnixStream> for OwnedFd {
266    fn from(unix_stream: UnixStream) -> Self {
267        unix_stream.inner.into_inner().into()
268    }
269}
270
271impl AsFd for UnixStream {
272    fn as_fd(&self) -> BorrowedFd<'_> {
273        self.inner.as_fd()
274    }
275}
276
277impl From<OwnedFd> for UnixStream {
278    fn from(fd: OwnedFd) -> Self {
279        UnixStream::from_std(From::from(fd))
280    }
281}