tokio/net/unix/datagram/
socket.rs

1use crate::io::{Interest, PollEvented, ReadBuf, Ready};
2use crate::net::unix::SocketAddr;
3
4use std::fmt;
5use std::io;
6use std::net::Shutdown;
7use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
8use std::os::unix::net;
9use std::path::Path;
10use std::task::{ready, Context, Poll};
11
12cfg_io_util! {
13    use bytes::BufMut;
14}
15
16cfg_net_unix! {
17    /// An I/O object representing a Unix datagram socket.
18    ///
19    /// A socket can be either named (associated with a filesystem path) or
20    /// unnamed.
21    ///
22    /// This type does not provide a `split` method, because this functionality
23    /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do
24    /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>`
25    /// is enough. This is because all of the methods take `&self` instead of
26    /// `&mut self`.
27    ///
28    /// **Note:** named sockets are persisted even after the object is dropped
29    /// and the program has exited, and cannot be reconnected. It is advised
30    /// that you either check for and unlink the existing socket if it exists,
31    /// or use a temporary file that is guaranteed to not already exist.
32    ///
33    /// [`Arc`]: std::sync::Arc
34    ///
35    /// # Examples
36    /// Using named sockets, associated with a filesystem path:
37    /// ```
38    /// # use std::error::Error;
39    /// # #[tokio::main]
40    /// # async fn main() -> Result<(), Box<dyn Error>> {
41    /// use tokio::net::UnixDatagram;
42    /// use tempfile::tempdir;
43    ///
44    /// // We use a temporary directory so that the socket
45    /// // files left by the bound sockets will get cleaned up.
46    /// let tmp = tempdir()?;
47    ///
48    /// // Bind each socket to a filesystem path
49    /// let tx_path = tmp.path().join("tx");
50    /// let tx = UnixDatagram::bind(&tx_path)?;
51    /// let rx_path = tmp.path().join("rx");
52    /// let rx = UnixDatagram::bind(&rx_path)?;
53    ///
54    /// let bytes = b"hello world";
55    /// tx.send_to(bytes, &rx_path).await?;
56    ///
57    /// let mut buf = vec![0u8; 24];
58    /// let (size, addr) = rx.recv_from(&mut buf).await?;
59    ///
60    /// let dgram = &buf[..size];
61    /// assert_eq!(dgram, bytes);
62    /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
63    ///
64    /// # Ok(())
65    /// # }
66    /// ```
67    ///
68    /// Using unnamed sockets, created as a pair
69    /// ```
70    /// # use std::error::Error;
71    /// # #[tokio::main]
72    /// # async fn main() -> Result<(), Box<dyn Error>> {
73    /// use tokio::net::UnixDatagram;
74    ///
75    /// // Create the pair of sockets
76    /// let (sock1, sock2) = UnixDatagram::pair()?;
77    ///
78    /// // Since the sockets are paired, the paired send/recv
79    /// // functions can be used
80    /// let bytes = b"hello world";
81    /// sock1.send(bytes).await?;
82    ///
83    /// let mut buff = vec![0u8; 24];
84    /// let size = sock2.recv(&mut buff).await?;
85    ///
86    /// let dgram = &buff[..size];
87    /// assert_eq!(dgram, bytes);
88    ///
89    /// # Ok(())
90    /// # }
91    /// ```
92    #[cfg_attr(docsrs, doc(alias = "uds"))]
93    pub struct UnixDatagram {
94        io: PollEvented<mio::net::UnixDatagram>,
95    }
96}
97
98impl UnixDatagram {
99    pub(crate) fn from_mio(sys: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
100        let datagram = UnixDatagram::new(sys)?;
101
102        if let Some(e) = datagram.io.take_error()? {
103            return Err(e);
104        }
105
106        Ok(datagram)
107    }
108
109    /// Waits for any of the requested ready states.
110    ///
111    /// This function is usually paired with `try_recv()` or `try_send()`. It
112    /// can be used to concurrently `recv` / `send` to the same socket on a single
113    /// task without splitting the socket.
114    ///
115    /// The function may complete without the socket being ready. This is a
116    /// false-positive and attempting an operation will return with
117    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
118    /// [`Ready`] set, so you should always check the returned value and possibly
119    /// wait again if the requested states are not set.
120    ///
121    /// # Cancel safety
122    ///
123    /// This method is cancel safe. Once a readiness event occurs, the method
124    /// will continue to return immediately until the readiness event is
125    /// consumed by an attempt to read or write that fails with `WouldBlock` or
126    /// `Poll::Pending`.
127    ///
128    /// # Examples
129    ///
130    /// Concurrently receive from and send to the socket on the same task
131    /// without splitting.
132    ///
133    /// ```no_run
134    /// use tokio::io::Interest;
135    /// use tokio::net::UnixDatagram;
136    /// use std::io;
137    ///
138    /// #[tokio::main]
139    /// async fn main() -> io::Result<()> {
140    ///     let dir = tempfile::tempdir().unwrap();
141    ///     let client_path = dir.path().join("client.sock");
142    ///     let server_path = dir.path().join("server.sock");
143    ///     let socket = UnixDatagram::bind(&client_path)?;
144    ///     socket.connect(&server_path)?;
145    ///
146    ///     loop {
147    ///         let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
148    ///
149    ///         if ready.is_readable() {
150    ///             let mut data = [0; 1024];
151    ///             match socket.try_recv(&mut data[..]) {
152    ///                 Ok(n) => {
153    ///                     println!("received {:?}", &data[..n]);
154    ///                 }
155    ///                 // False-positive, continue
156    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
157    ///                 Err(e) => {
158    ///                     return Err(e);
159    ///                 }
160    ///             }
161    ///         }
162    ///
163    ///         if ready.is_writable() {
164    ///             // Write some data
165    ///             match socket.try_send(b"hello world") {
166    ///                 Ok(n) => {
167    ///                     println!("sent {} bytes", n);
168    ///                 }
169    ///                 // False-positive, continue
170    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
171    ///                 Err(e) => {
172    ///                     return Err(e);
173    ///                 }
174    ///             }
175    ///         }
176    ///     }
177    /// }
178    /// ```
179    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
180        let event = self.io.registration().readiness(interest).await?;
181        Ok(event.ready)
182    }
183
184    /// Waits for the socket to become writable.
185    ///
186    /// This function is equivalent to `ready(Interest::WRITABLE)` and is
187    /// usually paired with `try_send()` or `try_send_to()`.
188    ///
189    /// The function may complete without the socket being writable. This is a
190    /// false-positive and attempting a `try_send()` will return with
191    /// `io::ErrorKind::WouldBlock`.
192    ///
193    /// # Cancel safety
194    ///
195    /// This method is cancel safe. Once a readiness event occurs, the method
196    /// will continue to return immediately until the readiness event is
197    /// consumed by an attempt to write that fails with `WouldBlock` or
198    /// `Poll::Pending`.
199    ///
200    /// # Examples
201    ///
202    /// ```no_run
203    /// use tokio::net::UnixDatagram;
204    /// use std::io;
205    ///
206    /// #[tokio::main]
207    /// async fn main() -> io::Result<()> {
208    ///     let dir = tempfile::tempdir().unwrap();
209    ///     let client_path = dir.path().join("client.sock");
210    ///     let server_path = dir.path().join("server.sock");
211    ///     let socket = UnixDatagram::bind(&client_path)?;
212    ///     socket.connect(&server_path)?;
213    ///
214    ///     loop {
215    ///         // Wait for the socket to be writable
216    ///         socket.writable().await?;
217    ///
218    ///         // Try to send data, this may still fail with `WouldBlock`
219    ///         // if the readiness event is a false positive.
220    ///         match socket.try_send(b"hello world") {
221    ///             Ok(n) => {
222    ///                 break;
223    ///             }
224    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
225    ///                 continue;
226    ///             }
227    ///             Err(e) => {
228    ///                 return Err(e);
229    ///             }
230    ///         }
231    ///     }
232    ///
233    ///     Ok(())
234    /// }
235    /// ```
236    pub async fn writable(&self) -> io::Result<()> {
237        self.ready(Interest::WRITABLE).await?;
238        Ok(())
239    }
240
241    /// Polls for write/send readiness.
242    ///
243    /// If the socket is not currently ready for sending, this method will
244    /// store a clone of the `Waker` from the provided `Context`. When the socket
245    /// becomes ready for sending, `Waker::wake` will be called on the
246    /// waker.
247    ///
248    /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
249    /// the `Waker` from the `Context` passed to the most recent call is
250    /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
251    /// second, independent waker.)
252    ///
253    /// This function is intended for cases where creating and pinning a future
254    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
255    /// preferred, as this supports polling from multiple tasks at once.
256    ///
257    /// # Return value
258    ///
259    /// The function returns:
260    ///
261    /// * `Poll::Pending` if the socket is not ready for writing.
262    /// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
263    /// * `Poll::Ready(Err(e))` if an error is encountered.
264    ///
265    /// # Errors
266    ///
267    /// This function may encounter any standard I/O error except `WouldBlock`.
268    ///
269    /// [`writable`]: method@Self::writable
270    pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
271        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
272    }
273
274    /// Waits for the socket to become readable.
275    ///
276    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
277    /// paired with `try_recv()`.
278    ///
279    /// The function may complete without the socket being readable. This is a
280    /// false-positive and attempting a `try_recv()` will return with
281    /// `io::ErrorKind::WouldBlock`.
282    ///
283    /// # Cancel safety
284    ///
285    /// This method is cancel safe. Once a readiness event occurs, the method
286    /// will continue to return immediately until the readiness event is
287    /// consumed by an attempt to read that fails with `WouldBlock` or
288    /// `Poll::Pending`.
289    ///
290    /// # Examples
291    ///
292    /// ```no_run
293    /// use tokio::net::UnixDatagram;
294    /// use std::io;
295    ///
296    /// #[tokio::main]
297    /// async fn main() -> io::Result<()> {
298    ///     // Connect to a peer
299    ///     let dir = tempfile::tempdir().unwrap();
300    ///     let client_path = dir.path().join("client.sock");
301    ///     let server_path = dir.path().join("server.sock");
302    ///     let socket = UnixDatagram::bind(&client_path)?;
303    ///     socket.connect(&server_path)?;
304    ///
305    ///     loop {
306    ///         // Wait for the socket to be readable
307    ///         socket.readable().await?;
308    ///
309    ///         // The buffer is **not** included in the async task and will
310    ///         // only exist on the stack.
311    ///         let mut buf = [0; 1024];
312    ///
313    ///         // Try to recv data, this may still fail with `WouldBlock`
314    ///         // if the readiness event is a false positive.
315    ///         match socket.try_recv(&mut buf) {
316    ///             Ok(n) => {
317    ///                 println!("GOT {:?}", &buf[..n]);
318    ///                 break;
319    ///             }
320    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
321    ///                 continue;
322    ///             }
323    ///             Err(e) => {
324    ///                 return Err(e);
325    ///             }
326    ///         }
327    ///     }
328    ///
329    ///     Ok(())
330    /// }
331    /// ```
332    pub async fn readable(&self) -> io::Result<()> {
333        self.ready(Interest::READABLE).await?;
334        Ok(())
335    }
336
337    /// Polls for read/receive readiness.
338    ///
339    /// If the socket is not currently ready for receiving, this method will
340    /// store a clone of the `Waker` from the provided `Context`. When the
341    /// socket becomes ready for reading, `Waker::wake` will be called on the
342    /// waker.
343    ///
344    /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
345    /// `poll_peek`, only the `Waker` from the `Context` passed to the most
346    /// recent call is scheduled to receive a wakeup. (However,
347    /// `poll_send_ready` retains a second, independent waker.)
348    ///
349    /// This function is intended for cases where creating and pinning a future
350    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
351    /// preferred, as this supports polling from multiple tasks at once.
352    ///
353    /// # Return value
354    ///
355    /// The function returns:
356    ///
357    /// * `Poll::Pending` if the socket is not ready for reading.
358    /// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
359    /// * `Poll::Ready(Err(e))` if an error is encountered.
360    ///
361    /// # Errors
362    ///
363    /// This function may encounter any standard I/O error except `WouldBlock`.
364    ///
365    /// [`readable`]: method@Self::readable
366    pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
367        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
368    }
369
370    /// Creates a new `UnixDatagram` bound to the specified path.
371    ///
372    /// # Examples
373    /// ```
374    /// # use std::error::Error;
375    /// # #[tokio::main]
376    /// # async fn main() -> Result<(), Box<dyn Error>> {
377    /// use tokio::net::UnixDatagram;
378    /// use tempfile::tempdir;
379    ///
380    /// // We use a temporary directory so that the socket
381    /// // files left by the bound sockets will get cleaned up.
382    /// let tmp = tempdir()?;
383    ///
384    /// // Bind the socket to a filesystem path
385    /// let socket_path = tmp.path().join("socket");
386    /// let socket = UnixDatagram::bind(&socket_path)?;
387    ///
388    /// # Ok(())
389    /// # }
390    /// ```
391    pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
392    where
393        P: AsRef<Path>,
394    {
395        let socket = mio::net::UnixDatagram::bind(path)?;
396        UnixDatagram::new(socket)
397    }
398
399    /// Creates an unnamed pair of connected sockets.
400    ///
401    /// This function will create a pair of interconnected Unix sockets for
402    /// communicating back and forth between one another.
403    ///
404    /// # Examples
405    /// ```
406    /// # use std::error::Error;
407    /// # #[tokio::main]
408    /// # async fn main() -> Result<(), Box<dyn Error>> {
409    /// use tokio::net::UnixDatagram;
410    ///
411    /// // Create the pair of sockets
412    /// let (sock1, sock2) = UnixDatagram::pair()?;
413    ///
414    /// // Since the sockets are paired, the paired send/recv
415    /// // functions can be used
416    /// let bytes = b"hail eris";
417    /// sock1.send(bytes).await?;
418    ///
419    /// let mut buff = vec![0u8; 24];
420    /// let size = sock2.recv(&mut buff).await?;
421    ///
422    /// let dgram = &buff[..size];
423    /// assert_eq!(dgram, bytes);
424    ///
425    /// # Ok(())
426    /// # }
427    /// ```
428    pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
429        let (a, b) = mio::net::UnixDatagram::pair()?;
430        let a = UnixDatagram::new(a)?;
431        let b = UnixDatagram::new(b)?;
432
433        Ok((a, b))
434    }
435
436    /// Creates new [`UnixDatagram`] from a [`std::os::unix::net::UnixDatagram`].
437    ///
438    /// This function is intended to be used to wrap a `UnixDatagram` from the
439    /// standard library in the Tokio equivalent.
440    ///
441    /// # Notes
442    ///
443    /// The caller is responsible for ensuring that the socket is in
444    /// non-blocking mode. Otherwise all I/O operations on the socket
445    /// will block the thread, which will cause unexpected behavior.
446    /// Non-blocking mode can be set using [`set_nonblocking`].
447    ///
448    /// [`set_nonblocking`]: std::os::unix::net::UnixDatagram::set_nonblocking
449    ///
450    /// # Panics
451    ///
452    /// This function panics if it is not called from within a runtime with
453    /// IO enabled.
454    ///
455    /// The runtime is usually set implicitly when this function is called
456    /// from a future driven by a Tokio runtime, otherwise runtime can be set
457    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
458    /// # Examples
459    /// ```
460    /// # use std::error::Error;
461    /// # #[tokio::main]
462    /// # async fn main() -> Result<(), Box<dyn Error>> {
463    /// use tokio::net::UnixDatagram;
464    /// use std::os::unix::net::UnixDatagram as StdUDS;
465    /// use tempfile::tempdir;
466    ///
467    /// // We use a temporary directory so that the socket
468    /// // files left by the bound sockets will get cleaned up.
469    /// let tmp = tempdir()?;
470    ///
471    /// // Bind the socket to a filesystem path
472    /// let socket_path = tmp.path().join("socket");
473    /// let std_socket = StdUDS::bind(&socket_path)?;
474    /// std_socket.set_nonblocking(true)?;
475    /// let tokio_socket = UnixDatagram::from_std(std_socket)?;
476    ///
477    /// # Ok(())
478    /// # }
479    /// ```
480    #[track_caller]
481    pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
482        let socket = mio::net::UnixDatagram::from_std(datagram);
483        let io = PollEvented::new(socket)?;
484        Ok(UnixDatagram { io })
485    }
486
487    /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
488    ///
489    /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking
490    /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
491    /// if needed.
492    ///
493    /// # Examples
494    ///
495    /// ```rust,no_run
496    /// # use std::error::Error;
497    /// # async fn dox() -> Result<(), Box<dyn Error>> {
498    /// let tokio_socket = tokio::net::UnixDatagram::bind("/path/to/the/socket")?;
499    /// let std_socket = tokio_socket.into_std()?;
500    /// std_socket.set_nonblocking(false)?;
501    /// # Ok(())
502    /// # }
503    /// ```
504    ///
505    /// [`tokio::net::UnixDatagram`]: UnixDatagram
506    /// [`std::os::unix::net::UnixDatagram`]: std::os::unix::net::UnixDatagram
507    /// [`set_nonblocking`]: fn@std::os::unix::net::UnixDatagram::set_nonblocking
508    pub fn into_std(self) -> io::Result<std::os::unix::net::UnixDatagram> {
509        self.io
510            .into_inner()
511            .map(IntoRawFd::into_raw_fd)
512            .map(|raw_fd| unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(raw_fd) })
513    }
514
515    fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
516        let io = PollEvented::new(socket)?;
517        Ok(UnixDatagram { io })
518    }
519
520    /// Creates a new `UnixDatagram` which is not bound to any address.
521    ///
522    /// # Examples
523    /// ```
524    /// # use std::error::Error;
525    /// # #[tokio::main]
526    /// # async fn main() -> Result<(), Box<dyn Error>> {
527    /// use tokio::net::UnixDatagram;
528    /// use tempfile::tempdir;
529    ///
530    /// // Create an unbound socket
531    /// let tx = UnixDatagram::unbound()?;
532    ///
533    /// // Create another, bound socket
534    /// let tmp = tempdir()?;
535    /// let rx_path = tmp.path().join("rx");
536    /// let rx = UnixDatagram::bind(&rx_path)?;
537    ///
538    /// // Send to the bound socket
539    /// let bytes = b"hello world";
540    /// tx.send_to(bytes, &rx_path).await?;
541    ///
542    /// let mut buf = vec![0u8; 24];
543    /// let (size, addr) = rx.recv_from(&mut buf).await?;
544    ///
545    /// let dgram = &buf[..size];
546    /// assert_eq!(dgram, bytes);
547    ///
548    /// # Ok(())
549    /// # }
550    /// ```
551    pub fn unbound() -> io::Result<UnixDatagram> {
552        let socket = mio::net::UnixDatagram::unbound()?;
553        UnixDatagram::new(socket)
554    }
555
556    /// Connects the socket to the specified address.
557    ///
558    /// The `send` method may be used to send data to the specified address.
559    /// `recv` and `recv_from` will only receive data from that address.
560    ///
561    /// # Examples
562    /// ```
563    /// # use std::error::Error;
564    /// # #[tokio::main]
565    /// # async fn main() -> Result<(), Box<dyn Error>> {
566    /// use tokio::net::UnixDatagram;
567    /// use tempfile::tempdir;
568    ///
569    /// // Create an unbound socket
570    /// let tx = UnixDatagram::unbound()?;
571    ///
572    /// // Create another, bound socket
573    /// let tmp = tempdir()?;
574    /// let rx_path = tmp.path().join("rx");
575    /// let rx = UnixDatagram::bind(&rx_path)?;
576    ///
577    /// // Connect to the bound socket
578    /// tx.connect(&rx_path)?;
579    ///
580    /// // Send to the bound socket
581    /// let bytes = b"hello world";
582    /// tx.send(bytes).await?;
583    ///
584    /// let mut buf = vec![0u8; 24];
585    /// let (size, addr) = rx.recv_from(&mut buf).await?;
586    ///
587    /// let dgram = &buf[..size];
588    /// assert_eq!(dgram, bytes);
589    ///
590    /// # Ok(())
591    /// # }
592    /// ```
593    pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
594        self.io.connect(path)
595    }
596
597    /// Sends data on the socket to the socket's peer.
598    ///
599    /// # Cancel safety
600    ///
601    /// This method is cancel safe. If `send` is used as the event in a
602    /// [`tokio::select!`](crate::select) statement and some other branch
603    /// completes first, then it is guaranteed that the message was not sent.
604    ///
605    /// # Examples
606    /// ```
607    /// # use std::error::Error;
608    /// # #[tokio::main]
609    /// # async fn main() -> Result<(), Box<dyn Error>> {
610    /// use tokio::net::UnixDatagram;
611    ///
612    /// // Create the pair of sockets
613    /// let (sock1, sock2) = UnixDatagram::pair()?;
614    ///
615    /// // Since the sockets are paired, the paired send/recv
616    /// // functions can be used
617    /// let bytes = b"hello world";
618    /// sock1.send(bytes).await?;
619    ///
620    /// let mut buff = vec![0u8; 24];
621    /// let size = sock2.recv(&mut buff).await?;
622    ///
623    /// let dgram = &buff[..size];
624    /// assert_eq!(dgram, bytes);
625    ///
626    /// # Ok(())
627    /// # }
628    /// ```
629    pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
630        self.io
631            .registration()
632            .async_io(Interest::WRITABLE, || self.io.send(buf))
633            .await
634    }
635
636    /// Tries to send a datagram to the peer without waiting.
637    ///
638    /// # Examples
639    ///
640    /// ```no_run
641    /// use tokio::net::UnixDatagram;
642    /// use std::io;
643    ///
644    /// #[tokio::main]
645    /// async fn main() -> io::Result<()> {
646    ///     let dir = tempfile::tempdir().unwrap();
647    ///     let client_path = dir.path().join("client.sock");
648    ///     let server_path = dir.path().join("server.sock");
649    ///     let socket = UnixDatagram::bind(&client_path)?;
650    ///     socket.connect(&server_path)?;
651    ///
652    ///     loop {
653    ///         // Wait for the socket to be writable
654    ///         socket.writable().await?;
655    ///
656    ///         // Try to send data, this may still fail with `WouldBlock`
657    ///         // if the readiness event is a false positive.
658    ///         match socket.try_send(b"hello world") {
659    ///             Ok(n) => {
660    ///                 break;
661    ///             }
662    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
663    ///                 continue;
664    ///             }
665    ///             Err(e) => {
666    ///                 return Err(e);
667    ///             }
668    ///         }
669    ///     }
670    ///
671    ///     Ok(())
672    /// }
673    /// ```
674    pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
675        self.io
676            .registration()
677            .try_io(Interest::WRITABLE, || self.io.send(buf))
678    }
679
680    /// Tries to send a datagram to the peer without waiting.
681    ///
682    /// # Examples
683    ///
684    /// ```no_run
685    /// use tokio::net::UnixDatagram;
686    /// use std::io;
687    ///
688    /// #[tokio::main]
689    /// async fn main() -> io::Result<()> {
690    ///     let dir = tempfile::tempdir().unwrap();
691    ///     let client_path = dir.path().join("client.sock");
692    ///     let server_path = dir.path().join("server.sock");
693    ///     let socket = UnixDatagram::bind(&client_path)?;
694    ///
695    ///     loop {
696    ///         // Wait for the socket to be writable
697    ///         socket.writable().await?;
698    ///
699    ///         // Try to send data, this may still fail with `WouldBlock`
700    ///         // if the readiness event is a false positive.
701    ///         match socket.try_send_to(b"hello world", &server_path) {
702    ///             Ok(n) => {
703    ///                 break;
704    ///             }
705    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
706    ///                 continue;
707    ///             }
708    ///             Err(e) => {
709    ///                 return Err(e);
710    ///             }
711    ///         }
712    ///     }
713    ///
714    ///     Ok(())
715    /// }
716    /// ```
717    pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
718    where
719        P: AsRef<Path>,
720    {
721        self.io
722            .registration()
723            .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
724    }
725
726    /// Receives data from the socket.
727    ///
728    /// # Cancel safety
729    ///
730    /// This method is cancel safe. If `recv` is used as the event in a
731    /// [`tokio::select!`](crate::select) statement and some other branch
732    /// completes first, it is guaranteed that no messages were received on this
733    /// socket.
734    ///
735    /// # Examples
736    /// ```
737    /// # use std::error::Error;
738    /// # #[tokio::main]
739    /// # async fn main() -> Result<(), Box<dyn Error>> {
740    /// use tokio::net::UnixDatagram;
741    ///
742    /// // Create the pair of sockets
743    /// let (sock1, sock2) = UnixDatagram::pair()?;
744    ///
745    /// // Since the sockets are paired, the paired send/recv
746    /// // functions can be used
747    /// let bytes = b"hello world";
748    /// sock1.send(bytes).await?;
749    ///
750    /// let mut buff = vec![0u8; 24];
751    /// let size = sock2.recv(&mut buff).await?;
752    ///
753    /// let dgram = &buff[..size];
754    /// assert_eq!(dgram, bytes);
755    ///
756    /// # Ok(())
757    /// # }
758    /// ```
759    pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
760        self.io
761            .registration()
762            .async_io(Interest::READABLE, || self.io.recv(buf))
763            .await
764    }
765
766    /// Tries to receive a datagram from the peer without waiting.
767    ///
768    /// # Examples
769    ///
770    /// ```no_run
771    /// use tokio::net::UnixDatagram;
772    /// use std::io;
773    ///
774    /// #[tokio::main]
775    /// async fn main() -> io::Result<()> {
776    ///     // Connect to a peer
777    ///     let dir = tempfile::tempdir().unwrap();
778    ///     let client_path = dir.path().join("client.sock");
779    ///     let server_path = dir.path().join("server.sock");
780    ///     let socket = UnixDatagram::bind(&client_path)?;
781    ///     socket.connect(&server_path)?;
782    ///
783    ///     loop {
784    ///         // Wait for the socket to be readable
785    ///         socket.readable().await?;
786    ///
787    ///         // The buffer is **not** included in the async task and will
788    ///         // only exist on the stack.
789    ///         let mut buf = [0; 1024];
790    ///
791    ///         // Try to recv data, this may still fail with `WouldBlock`
792    ///         // if the readiness event is a false positive.
793    ///         match socket.try_recv(&mut buf) {
794    ///             Ok(n) => {
795    ///                 println!("GOT {:?}", &buf[..n]);
796    ///                 break;
797    ///             }
798    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
799    ///                 continue;
800    ///             }
801    ///             Err(e) => {
802    ///                 return Err(e);
803    ///             }
804    ///         }
805    ///     }
806    ///
807    ///     Ok(())
808    /// }
809    /// ```
810    pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
811        self.io
812            .registration()
813            .try_io(Interest::READABLE, || self.io.recv(buf))
814    }
815
816    cfg_io_util! {
817        /// Tries to receive data from the socket without waiting.
818        ///
819        /// This method can be used even if `buf` is uninitialized.
820        ///
821        /// # Examples
822        ///
823        /// ```no_run
824        /// use tokio::net::UnixDatagram;
825        /// use std::io;
826        ///
827        /// #[tokio::main]
828        /// async fn main() -> io::Result<()> {
829        ///     // Connect to a peer
830        ///     let dir = tempfile::tempdir().unwrap();
831        ///     let client_path = dir.path().join("client.sock");
832        ///     let server_path = dir.path().join("server.sock");
833        ///     let socket = UnixDatagram::bind(&client_path)?;
834        ///
835        ///     loop {
836        ///         // Wait for the socket to be readable
837        ///         socket.readable().await?;
838        ///
839        ///         let mut buf = Vec::with_capacity(1024);
840        ///
841        ///         // Try to recv data, this may still fail with `WouldBlock`
842        ///         // if the readiness event is a false positive.
843        ///         match socket.try_recv_buf_from(&mut buf) {
844        ///             Ok((n, _addr)) => {
845        ///                 println!("GOT {:?}", &buf[..n]);
846        ///                 break;
847        ///             }
848        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
849        ///                 continue;
850        ///             }
851        ///             Err(e) => {
852        ///                 return Err(e);
853        ///             }
854        ///         }
855        ///     }
856        ///
857        ///     Ok(())
858        /// }
859        /// ```
860        pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
861            let (n, addr) = self.io.registration().try_io(Interest::READABLE, || {
862                let dst = buf.chunk_mut();
863                let dst =
864                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
865
866                // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
867                // buffer.
868                let (n, addr) = (*self.io).recv_from(dst)?;
869
870                unsafe {
871                    buf.advance_mut(n);
872                }
873
874                Ok((n, addr))
875            })?;
876
877            Ok((n, SocketAddr(addr)))
878        }
879
880        /// Receives from the socket, advances the
881        /// buffer's internal cursor and returns how many bytes were read and the origin.
882        ///
883        /// This method can be used even if `buf` is uninitialized.
884        ///
885        /// # Examples
886        /// ```
887        /// # use std::error::Error;
888        /// # #[tokio::main]
889        /// # async fn main() -> Result<(), Box<dyn Error>> {
890        /// use tokio::net::UnixDatagram;
891        /// use tempfile::tempdir;
892        ///
893        /// // We use a temporary directory so that the socket
894        /// // files left by the bound sockets will get cleaned up.
895        /// let tmp = tempdir()?;
896        ///
897        /// // Bind each socket to a filesystem path
898        /// let tx_path = tmp.path().join("tx");
899        /// let tx = UnixDatagram::bind(&tx_path)?;
900        /// let rx_path = tmp.path().join("rx");
901        /// let rx = UnixDatagram::bind(&rx_path)?;
902        ///
903        /// let bytes = b"hello world";
904        /// tx.send_to(bytes, &rx_path).await?;
905        ///
906        /// let mut buf = Vec::with_capacity(24);
907        /// let (size, addr) = rx.recv_buf_from(&mut buf).await?;
908        ///
909        /// let dgram = &buf[..size];
910        /// assert_eq!(dgram, bytes);
911        /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
912        ///
913        /// # Ok(())
914        /// # }
915        /// ```
916        pub async fn recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
917            self.io.registration().async_io(Interest::READABLE, || {
918                let dst = buf.chunk_mut();
919                let dst =
920                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
921
922                // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
923                // buffer.
924                let (n, addr) = (*self.io).recv_from(dst)?;
925
926                unsafe {
927                    buf.advance_mut(n);
928                }
929                Ok((n,SocketAddr(addr)))
930            }).await
931        }
932
933        /// Tries to read data from the stream into the provided buffer, advancing the
934        /// buffer's internal cursor, returning how many bytes were read.
935        ///
936        /// This method can be used even if `buf` is uninitialized.
937        ///
938        /// # Examples
939        ///
940        /// ```no_run
941        /// use tokio::net::UnixDatagram;
942        /// use std::io;
943        ///
944        /// #[tokio::main]
945        /// async fn main() -> io::Result<()> {
946        ///     // Connect to a peer
947        ///     let dir = tempfile::tempdir().unwrap();
948        ///     let client_path = dir.path().join("client.sock");
949        ///     let server_path = dir.path().join("server.sock");
950        ///     let socket = UnixDatagram::bind(&client_path)?;
951        ///     socket.connect(&server_path)?;
952        ///
953        ///     loop {
954        ///         // Wait for the socket to be readable
955        ///         socket.readable().await?;
956        ///
957        ///         let mut buf = Vec::with_capacity(1024);
958        ///
959        ///         // Try to recv data, this may still fail with `WouldBlock`
960        ///         // if the readiness event is a false positive.
961        ///         match socket.try_recv_buf(&mut buf) {
962        ///             Ok(n) => {
963        ///                 println!("GOT {:?}", &buf[..n]);
964        ///                 break;
965        ///             }
966        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
967        ///                 continue;
968        ///             }
969        ///             Err(e) => {
970        ///                 return Err(e);
971        ///             }
972        ///         }
973        ///     }
974        ///
975        ///     Ok(())
976        /// }
977        /// ```
978        pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
979            self.io.registration().try_io(Interest::READABLE, || {
980                let dst = buf.chunk_mut();
981                let dst =
982                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
983
984                // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the
985                // buffer.
986                let n = (*self.io).recv(dst)?;
987
988                unsafe {
989                    buf.advance_mut(n);
990                }
991
992                Ok(n)
993            })
994        }
995
996        /// Receives data from the socket from the address to which it is connected,
997        /// advancing the buffer's internal cursor, returning how many bytes were read.
998        ///
999        /// This method can be used even if `buf` is uninitialized.
1000        ///
1001        /// # Examples
1002        /// ```
1003        /// # use std::error::Error;
1004        /// # #[tokio::main]
1005        /// # async fn main() -> Result<(), Box<dyn Error>> {
1006        /// use tokio::net::UnixDatagram;
1007        ///
1008        /// // Create the pair of sockets
1009        /// let (sock1, sock2) = UnixDatagram::pair()?;
1010        ///
1011        /// // Since the sockets are paired, the paired send/recv
1012        /// // functions can be used
1013        /// let bytes = b"hello world";
1014        /// sock1.send(bytes).await?;
1015        ///
1016        /// let mut buff = Vec::with_capacity(24);
1017        /// let size = sock2.recv_buf(&mut buff).await?;
1018        ///
1019        /// let dgram = &buff[..size];
1020        /// assert_eq!(dgram, bytes);
1021        ///
1022        /// # Ok(())
1023        /// # }
1024        /// ```
1025        pub async fn recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1026            self.io.registration().async_io(Interest::READABLE, || {
1027                let dst = buf.chunk_mut();
1028                let dst =
1029                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1030
1031                // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
1032                // buffer.
1033                let n = (*self.io).recv(dst)?;
1034
1035                unsafe {
1036                    buf.advance_mut(n);
1037                }
1038                Ok(n)
1039            }).await
1040        }
1041    }
1042
1043    /// Sends data on the socket to the specified address.
1044    ///
1045    /// # Cancel safety
1046    ///
1047    /// This method is cancel safe. If `send_to` is used as the event in a
1048    /// [`tokio::select!`](crate::select) statement and some other branch
1049    /// completes first, then it is guaranteed that the message was not sent.
1050    ///
1051    /// # Examples
1052    /// ```
1053    /// # use std::error::Error;
1054    /// # #[tokio::main]
1055    /// # async fn main() -> Result<(), Box<dyn Error>> {
1056    /// use tokio::net::UnixDatagram;
1057    /// use tempfile::tempdir;
1058    ///
1059    /// // We use a temporary directory so that the socket
1060    /// // files left by the bound sockets will get cleaned up.
1061    /// let tmp = tempdir()?;
1062    ///
1063    /// // Bind each socket to a filesystem path
1064    /// let tx_path = tmp.path().join("tx");
1065    /// let tx = UnixDatagram::bind(&tx_path)?;
1066    /// let rx_path = tmp.path().join("rx");
1067    /// let rx = UnixDatagram::bind(&rx_path)?;
1068    ///
1069    /// let bytes = b"hello world";
1070    /// tx.send_to(bytes, &rx_path).await?;
1071    ///
1072    /// let mut buf = vec![0u8; 24];
1073    /// let (size, addr) = rx.recv_from(&mut buf).await?;
1074    ///
1075    /// let dgram = &buf[..size];
1076    /// assert_eq!(dgram, bytes);
1077    /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1078    ///
1079    /// # Ok(())
1080    /// # }
1081    /// ```
1082    pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
1083    where
1084        P: AsRef<Path>,
1085    {
1086        self.io
1087            .registration()
1088            .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref()))
1089            .await
1090    }
1091
1092    /// Receives data from the socket.
1093    ///
1094    /// # Cancel safety
1095    ///
1096    /// This method is cancel safe. If `recv_from` is used as the event in a
1097    /// [`tokio::select!`](crate::select) statement and some other branch
1098    /// completes first, it is guaranteed that no messages were received on this
1099    /// socket.
1100    ///
1101    /// # Examples
1102    /// ```
1103    /// # use std::error::Error;
1104    /// # #[tokio::main]
1105    /// # async fn main() -> Result<(), Box<dyn Error>> {
1106    /// use tokio::net::UnixDatagram;
1107    /// use tempfile::tempdir;
1108    ///
1109    /// // We use a temporary directory so that the socket
1110    /// // files left by the bound sockets will get cleaned up.
1111    /// let tmp = tempdir()?;
1112    ///
1113    /// // Bind each socket to a filesystem path
1114    /// let tx_path = tmp.path().join("tx");
1115    /// let tx = UnixDatagram::bind(&tx_path)?;
1116    /// let rx_path = tmp.path().join("rx");
1117    /// let rx = UnixDatagram::bind(&rx_path)?;
1118    ///
1119    /// let bytes = b"hello world";
1120    /// tx.send_to(bytes, &rx_path).await?;
1121    ///
1122    /// let mut buf = vec![0u8; 24];
1123    /// let (size, addr) = rx.recv_from(&mut buf).await?;
1124    ///
1125    /// let dgram = &buf[..size];
1126    /// assert_eq!(dgram, bytes);
1127    /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1128    ///
1129    /// # Ok(())
1130    /// # }
1131    /// ```
1132    pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1133        let (n, addr) = self
1134            .io
1135            .registration()
1136            .async_io(Interest::READABLE, || self.io.recv_from(buf))
1137            .await?;
1138
1139        Ok((n, SocketAddr(addr)))
1140    }
1141
1142    /// Attempts to receive a single datagram on the specified address.
1143    ///
1144    /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1145    /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1146    /// receive a wakeup.
1147    ///
1148    /// # Return value
1149    ///
1150    /// The function returns:
1151    ///
1152    /// * `Poll::Pending` if the socket is not ready to read
1153    /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
1154    /// * `Poll::Ready(Err(e))` if an error is encountered.
1155    ///
1156    /// # Errors
1157    ///
1158    /// This function may encounter any standard I/O error except `WouldBlock`.
1159    pub fn poll_recv_from(
1160        &self,
1161        cx: &mut Context<'_>,
1162        buf: &mut ReadBuf<'_>,
1163    ) -> Poll<io::Result<SocketAddr>> {
1164        #[allow(clippy::blocks_in_conditions)]
1165        let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
1166            // Safety: will not read the maybe uninitialized bytes.
1167            let b = unsafe {
1168                &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1169            };
1170
1171            self.io.recv_from(b)
1172        }))?;
1173
1174        // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1175        unsafe {
1176            buf.assume_init(n);
1177        }
1178        buf.advance(n);
1179        Poll::Ready(Ok(SocketAddr(addr)))
1180    }
1181
1182    /// Attempts to send data to the specified address.
1183    ///
1184    /// Note that on multiple calls to a `poll_*` method in the send direction, only the
1185    /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1186    /// receive a wakeup.
1187    ///
1188    /// # Return value
1189    ///
1190    /// The function returns:
1191    ///
1192    /// * `Poll::Pending` if the socket is not ready to write
1193    /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
1194    /// * `Poll::Ready(Err(e))` if an error is encountered.
1195    ///
1196    /// # Errors
1197    ///
1198    /// This function may encounter any standard I/O error except `WouldBlock`.
1199    pub fn poll_send_to<P>(
1200        &self,
1201        cx: &mut Context<'_>,
1202        buf: &[u8],
1203        target: P,
1204    ) -> Poll<io::Result<usize>>
1205    where
1206        P: AsRef<Path>,
1207    {
1208        self.io
1209            .registration()
1210            .poll_write_io(cx, || self.io.send_to(buf, target.as_ref()))
1211    }
1212
1213    /// Attempts to send data on the socket to the remote address to which it
1214    /// was previously `connect`ed.
1215    ///
1216    /// The [`connect`] method will connect this socket to a remote address.
1217    /// This method will fail if the socket is not connected.
1218    ///
1219    /// Note that on multiple calls to a `poll_*` method in the send direction,
1220    /// only the `Waker` from the `Context` passed to the most recent call will
1221    /// be scheduled to receive a wakeup.
1222    ///
1223    /// # Return value
1224    ///
1225    /// The function returns:
1226    ///
1227    /// * `Poll::Pending` if the socket is not available to write
1228    /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
1229    /// * `Poll::Ready(Err(e))` if an error is encountered.
1230    ///
1231    /// # Errors
1232    ///
1233    /// This function may encounter any standard I/O error except `WouldBlock`.
1234    ///
1235    /// [`connect`]: method@Self::connect
1236    pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
1237        self.io
1238            .registration()
1239            .poll_write_io(cx, || self.io.send(buf))
1240    }
1241
1242    /// Attempts to receive a single datagram message on the socket from the remote
1243    /// address to which it is `connect`ed.
1244    ///
1245    /// The [`connect`] method will connect this socket to a remote address. This method
1246    /// resolves to an error if the socket is not connected.
1247    ///
1248    /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1249    /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1250    /// receive a wakeup.
1251    ///
1252    /// # Return value
1253    ///
1254    /// The function returns:
1255    ///
1256    /// * `Poll::Pending` if the socket is not ready to read
1257    /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
1258    /// * `Poll::Ready(Err(e))` if an error is encountered.
1259    ///
1260    /// # Errors
1261    ///
1262    /// This function may encounter any standard I/O error except `WouldBlock`.
1263    ///
1264    /// [`connect`]: method@Self::connect
1265    pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
1266        #[allow(clippy::blocks_in_conditions)]
1267        let n = ready!(self.io.registration().poll_read_io(cx, || {
1268            // Safety: will not read the maybe uninitialized bytes.
1269            let b = unsafe {
1270                &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1271            };
1272
1273            self.io.recv(b)
1274        }))?;
1275
1276        // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1277        unsafe {
1278            buf.assume_init(n);
1279        }
1280        buf.advance(n);
1281        Poll::Ready(Ok(()))
1282    }
1283
1284    /// Tries to receive data from the socket without waiting.
1285    ///
1286    /// # Examples
1287    ///
1288    /// ```no_run
1289    /// use tokio::net::UnixDatagram;
1290    /// use std::io;
1291    ///
1292    /// #[tokio::main]
1293    /// async fn main() -> io::Result<()> {
1294    ///     // Connect to a peer
1295    ///     let dir = tempfile::tempdir().unwrap();
1296    ///     let client_path = dir.path().join("client.sock");
1297    ///     let server_path = dir.path().join("server.sock");
1298    ///     let socket = UnixDatagram::bind(&client_path)?;
1299    ///
1300    ///     loop {
1301    ///         // Wait for the socket to be readable
1302    ///         socket.readable().await?;
1303    ///
1304    ///         // The buffer is **not** included in the async task and will
1305    ///         // only exist on the stack.
1306    ///         let mut buf = [0; 1024];
1307    ///
1308    ///         // Try to recv data, this may still fail with `WouldBlock`
1309    ///         // if the readiness event is a false positive.
1310    ///         match socket.try_recv_from(&mut buf) {
1311    ///             Ok((n, _addr)) => {
1312    ///                 println!("GOT {:?}", &buf[..n]);
1313    ///                 break;
1314    ///             }
1315    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1316    ///                 continue;
1317    ///             }
1318    ///             Err(e) => {
1319    ///                 return Err(e);
1320    ///             }
1321    ///         }
1322    ///     }
1323    ///
1324    ///     Ok(())
1325    /// }
1326    /// ```
1327    pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1328        let (n, addr) = self
1329            .io
1330            .registration()
1331            .try_io(Interest::READABLE, || self.io.recv_from(buf))?;
1332
1333        Ok((n, SocketAddr(addr)))
1334    }
1335
1336    /// Tries to read or write from the socket using a user-provided IO operation.
1337    ///
1338    /// If the socket is ready, the provided closure is called. The closure
1339    /// should attempt to perform IO operation on the socket by manually
1340    /// calling the appropriate syscall. If the operation fails because the
1341    /// socket is not actually ready, then the closure should return a
1342    /// `WouldBlock` error and the readiness flag is cleared. The return value
1343    /// of the closure is then returned by `try_io`.
1344    ///
1345    /// If the socket is not ready, then the closure is not called
1346    /// and a `WouldBlock` error is returned.
1347    ///
1348    /// The closure should only return a `WouldBlock` error if it has performed
1349    /// an IO operation on the socket that failed due to the socket not being
1350    /// ready. Returning a `WouldBlock` error in any other situation will
1351    /// incorrectly clear the readiness flag, which can cause the socket to
1352    /// behave incorrectly.
1353    ///
1354    /// The closure should not perform the IO operation using any of the methods
1355    /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1356    /// readiness flag and can cause the socket to behave incorrectly.
1357    ///
1358    /// This method is not intended to be used with combined interests.
1359    /// The closure should perform only one type of IO operation, so it should not
1360    /// require more than one ready state. This method may panic or sleep forever
1361    /// if it is called with a combined interest.
1362    ///
1363    /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1364    ///
1365    /// [`readable()`]: UnixDatagram::readable()
1366    /// [`writable()`]: UnixDatagram::writable()
1367    /// [`ready()`]: UnixDatagram::ready()
1368    pub fn try_io<R>(
1369        &self,
1370        interest: Interest,
1371        f: impl FnOnce() -> io::Result<R>,
1372    ) -> io::Result<R> {
1373        self.io
1374            .registration()
1375            .try_io(interest, || self.io.try_io(f))
1376    }
1377
1378    /// Reads or writes from the socket using a user-provided IO operation.
1379    ///
1380    /// The readiness of the socket is awaited and when the socket is ready,
1381    /// the provided closure is called. The closure should attempt to perform
1382    /// IO operation on the socket by manually calling the appropriate syscall.
1383    /// If the operation fails because the socket is not actually ready,
1384    /// then the closure should return a `WouldBlock` error. In such case the
1385    /// readiness flag is cleared and the socket readiness is awaited again.
1386    /// This loop is repeated until the closure returns an `Ok` or an error
1387    /// other than `WouldBlock`.
1388    ///
1389    /// The closure should only return a `WouldBlock` error if it has performed
1390    /// an IO operation on the socket that failed due to the socket not being
1391    /// ready. Returning a `WouldBlock` error in any other situation will
1392    /// incorrectly clear the readiness flag, which can cause the socket to
1393    /// behave incorrectly.
1394    ///
1395    /// The closure should not perform the IO operation using any of the methods
1396    /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1397    /// readiness flag and can cause the socket to behave incorrectly.
1398    ///
1399    /// This method is not intended to be used with combined interests.
1400    /// The closure should perform only one type of IO operation, so it should not
1401    /// require more than one ready state. This method may panic or sleep forever
1402    /// if it is called with a combined interest.
1403    pub async fn async_io<R>(
1404        &self,
1405        interest: Interest,
1406        mut f: impl FnMut() -> io::Result<R>,
1407    ) -> io::Result<R> {
1408        self.io
1409            .registration()
1410            .async_io(interest, || self.io.try_io(&mut f))
1411            .await
1412    }
1413
1414    /// Returns the local address that this socket is bound to.
1415    ///
1416    /// # Examples
1417    /// For a socket bound to a local path
1418    /// ```
1419    /// # use std::error::Error;
1420    /// # #[tokio::main]
1421    /// # async fn main() -> Result<(), Box<dyn Error>> {
1422    /// use tokio::net::UnixDatagram;
1423    /// use tempfile::tempdir;
1424    ///
1425    /// // We use a temporary directory so that the socket
1426    /// // files left by the bound sockets will get cleaned up.
1427    /// let tmp = tempdir()?;
1428    ///
1429    /// // Bind socket to a filesystem path
1430    /// let socket_path = tmp.path().join("socket");
1431    /// let socket = UnixDatagram::bind(&socket_path)?;
1432    ///
1433    /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path);
1434    ///
1435    /// # Ok(())
1436    /// # }
1437    /// ```
1438    ///
1439    /// For an unbound socket
1440    /// ```
1441    /// # use std::error::Error;
1442    /// # #[tokio::main]
1443    /// # async fn main() -> Result<(), Box<dyn Error>> {
1444    /// use tokio::net::UnixDatagram;
1445    ///
1446    /// // Create an unbound socket
1447    /// let socket = UnixDatagram::unbound()?;
1448    ///
1449    /// assert!(socket.local_addr()?.is_unnamed());
1450    ///
1451    /// # Ok(())
1452    /// # }
1453    /// ```
1454    pub fn local_addr(&self) -> io::Result<SocketAddr> {
1455        self.io.local_addr().map(SocketAddr)
1456    }
1457
1458    /// Returns the address of this socket's peer.
1459    ///
1460    /// The `connect` method will connect the socket to a peer.
1461    ///
1462    /// # Examples
1463    /// For a peer with a local path
1464    /// ```
1465    /// # use std::error::Error;
1466    /// # #[tokio::main]
1467    /// # async fn main() -> Result<(), Box<dyn Error>> {
1468    /// use tokio::net::UnixDatagram;
1469    /// use tempfile::tempdir;
1470    ///
1471    /// // Create an unbound socket
1472    /// let tx = UnixDatagram::unbound()?;
1473    ///
1474    /// // Create another, bound socket
1475    /// let tmp = tempdir()?;
1476    /// let rx_path = tmp.path().join("rx");
1477    /// let rx = UnixDatagram::bind(&rx_path)?;
1478    ///
1479    /// // Connect to the bound socket
1480    /// tx.connect(&rx_path)?;
1481    ///
1482    /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path);
1483    ///
1484    /// # Ok(())
1485    /// # }
1486    /// ```
1487    ///
1488    /// For an unbound peer
1489    /// ```
1490    /// # use std::error::Error;
1491    /// # #[tokio::main]
1492    /// # async fn main() -> Result<(), Box<dyn Error>> {
1493    /// use tokio::net::UnixDatagram;
1494    ///
1495    /// // Create the pair of sockets
1496    /// let (sock1, sock2) = UnixDatagram::pair()?;
1497    ///
1498    /// assert!(sock1.peer_addr()?.is_unnamed());
1499    ///
1500    /// # Ok(())
1501    /// # }
1502    /// ```
1503    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
1504        self.io.peer_addr().map(SocketAddr)
1505    }
1506
1507    /// Returns the value of the `SO_ERROR` option.
1508    ///
1509    /// # Examples
1510    /// ```
1511    /// # use std::error::Error;
1512    /// # #[tokio::main]
1513    /// # async fn main() -> Result<(), Box<dyn Error>> {
1514    /// use tokio::net::UnixDatagram;
1515    ///
1516    /// // Create an unbound socket
1517    /// let socket = UnixDatagram::unbound()?;
1518    ///
1519    /// if let Ok(Some(err)) = socket.take_error() {
1520    ///     println!("Got error: {:?}", err);
1521    /// }
1522    ///
1523    /// # Ok(())
1524    /// # }
1525    /// ```
1526    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
1527        self.io.take_error()
1528    }
1529
1530    /// Shuts down the read, write, or both halves of this connection.
1531    ///
1532    /// This function will cause all pending and future I/O calls on the
1533    /// specified portions to immediately return with an appropriate value
1534    /// (see the documentation of `Shutdown`).
1535    ///
1536    /// # Examples
1537    /// ```
1538    /// # use std::error::Error;
1539    /// # #[tokio::main]
1540    /// # async fn main() -> Result<(), Box<dyn Error>> {
1541    /// use tokio::net::UnixDatagram;
1542    /// use std::net::Shutdown;
1543    ///
1544    /// // Create an unbound socket
1545    /// let (socket, other) = UnixDatagram::pair()?;
1546    ///
1547    /// socket.shutdown(Shutdown::Both)?;
1548    ///
1549    /// // NOTE: the following commented out code does NOT work as expected.
1550    /// // Due to an underlying issue, the recv call will block indefinitely.
1551    /// // See: https://github.com/tokio-rs/tokio/issues/1679
1552    /// //let mut buff = vec![0u8; 24];
1553    /// //let size = socket.recv(&mut buff).await?;
1554    /// //assert_eq!(size, 0);
1555    ///
1556    /// let send_result = socket.send(b"hello world").await;
1557    /// assert!(send_result.is_err());
1558    ///
1559    /// # Ok(())
1560    /// # }
1561    /// ```
1562    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
1563        self.io.shutdown(how)
1564    }
1565}
1566
1567impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram {
1568    type Error = io::Error;
1569
1570    /// Consumes stream, returning the Tokio I/O object.
1571    ///
1572    /// This is equivalent to
1573    /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std).
1574    fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> {
1575        Self::from_std(stream)
1576    }
1577}
1578
1579impl fmt::Debug for UnixDatagram {
1580    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1581        self.io.fmt(f)
1582    }
1583}
1584
1585impl AsRawFd for UnixDatagram {
1586    fn as_raw_fd(&self) -> RawFd {
1587        self.io.as_raw_fd()
1588    }
1589}
1590
1591impl AsFd for UnixDatagram {
1592    fn as_fd(&self) -> BorrowedFd<'_> {
1593        unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1594    }
1595}