tokio/net/windows/named_pipe.rs
1//! Tokio support for [Windows named pipes].
2//!
3//! [Windows named pipes]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
4
5use std::ffi::c_void;
6use std::ffi::OsStr;
7use std::io::{self, Read, Write};
8use std::pin::Pin;
9use std::ptr;
10use std::task::{Context, Poll};
11
12use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
13use crate::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, FromRawHandle, RawHandle};
14
15cfg_io_util! {
16 use bytes::BufMut;
17}
18
19// Hide imports which are not used when generating documentation.
20#[cfg(windows)]
21mod doc {
22 pub(super) use crate::os::windows::ffi::OsStrExt;
23 pub(super) mod windows_sys {
24 pub(crate) use windows_sys::{
25 Win32::Foundation::*, Win32::Storage::FileSystem::*, Win32::System::Pipes::*,
26 Win32::System::SystemServices::*,
27 };
28 }
29 pub(super) use mio::windows as mio_windows;
30}
31
32// NB: none of these shows up in public API, so don't document them.
33#[cfg(not(windows))]
34mod doc {
35 pub(super) mod mio_windows {
36 pub type NamedPipe = crate::doc::NotDefinedHere;
37 }
38}
39
40use self::doc::*;
41
42/// A [Windows named pipe] server.
43///
44/// Accepting client connections involves creating a server with
45/// [`ServerOptions::create`] and waiting for clients to connect using
46/// [`NamedPipeServer::connect`].
47///
48/// To avoid having clients sporadically fail with
49/// [`std::io::ErrorKind::NotFound`] when they connect to a server, we must
50/// ensure that at least one server instance is available at all times. This
51/// means that the typical listen loop for a server is a bit involved, because
52/// we have to ensure that we never drop a server accidentally while a client
53/// might connect.
54///
55/// So a correctly implemented server looks like this:
56///
57/// ```no_run
58/// use std::io;
59/// use tokio::net::windows::named_pipe::ServerOptions;
60///
61/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-server";
62///
63/// # #[tokio::main] async fn main() -> std::io::Result<()> {
64/// // The first server needs to be constructed early so that clients can
65/// // be correctly connected. Otherwise calling .wait will cause the client to
66/// // error.
67/// //
68/// // Here we also make use of `first_pipe_instance`, which will ensure that
69/// // there are no other servers up and running already.
70/// let mut server = ServerOptions::new()
71/// .first_pipe_instance(true)
72/// .create(PIPE_NAME)?;
73///
74/// // Spawn the server loop.
75/// let server = tokio::spawn(async move {
76/// loop {
77/// // Wait for a client to connect.
78/// server.connect().await?;
79/// let connected_client = server;
80///
81/// // Construct the next server to be connected before sending the one
82/// // we already have of onto a task. This ensures that the server
83/// // isn't closed (after it's done in the task) before a new one is
84/// // available. Otherwise the client might error with
85/// // `io::ErrorKind::NotFound`.
86/// server = ServerOptions::new().create(PIPE_NAME)?;
87///
88/// let client = tokio::spawn(async move {
89/// /* use the connected client */
90/// # Ok::<_, std::io::Error>(())
91/// });
92/// # if true { break } // needed for type inference to work
93/// }
94///
95/// Ok::<_, io::Error>(())
96/// });
97///
98/// /* do something else not server related here */
99/// # Ok(()) }
100/// ```
101///
102/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
103#[derive(Debug)]
104pub struct NamedPipeServer {
105 io: PollEvented<mio_windows::NamedPipe>,
106}
107
108impl NamedPipeServer {
109 /// Constructs a new named pipe server from the specified raw handle.
110 ///
111 /// This function will consume ownership of the handle given, passing
112 /// responsibility for closing the handle to the returned object.
113 ///
114 /// This function is also unsafe as the primitives currently returned have
115 /// the contract that they are the sole owner of the file descriptor they
116 /// are wrapping. Usage of this function could accidentally allow violating
117 /// this contract which can cause memory unsafety in code that relies on it
118 /// being true.
119 ///
120 /// # Errors
121 ///
122 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
123 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
124 ///
125 /// [Tokio Runtime]: crate::runtime::Runtime
126 /// [enabled I/O]: crate::runtime::Builder::enable_io
127 pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
128 let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
129
130 Ok(Self {
131 io: PollEvented::new(named_pipe)?,
132 })
133 }
134
135 /// Retrieves information about the named pipe the server is associated
136 /// with.
137 ///
138 /// ```no_run
139 /// use tokio::net::windows::named_pipe::{PipeEnd, PipeMode, ServerOptions};
140 ///
141 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-info";
142 ///
143 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
144 /// let server = ServerOptions::new()
145 /// .pipe_mode(PipeMode::Message)
146 /// .max_instances(5)
147 /// .create(PIPE_NAME)?;
148 ///
149 /// let server_info = server.info()?;
150 ///
151 /// assert_eq!(server_info.end, PipeEnd::Server);
152 /// assert_eq!(server_info.mode, PipeMode::Message);
153 /// assert_eq!(server_info.max_instances, 5);
154 /// # Ok(()) }
155 /// ```
156 pub fn info(&self) -> io::Result<PipeInfo> {
157 // Safety: we're ensuring the lifetime of the named pipe.
158 unsafe { named_pipe_info(self.io.as_raw_handle()) }
159 }
160
161 /// Enables a named pipe server process to wait for a client process to
162 /// connect to an instance of a named pipe. A client process connects by
163 /// creating a named pipe with the same name.
164 ///
165 /// This corresponds to the [`ConnectNamedPipe`] system call.
166 ///
167 /// # Cancel safety
168 ///
169 /// This method is cancellation safe in the sense that if it is used as the
170 /// event in a [`select!`](crate::select) statement and some other branch
171 /// completes first, then no connection events have been lost.
172 ///
173 /// [`ConnectNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe
174 ///
175 /// # Example
176 ///
177 /// ```no_run
178 /// use tokio::net::windows::named_pipe::ServerOptions;
179 ///
180 /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
181 ///
182 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
183 /// let pipe = ServerOptions::new().create(PIPE_NAME)?;
184 ///
185 /// // Wait for a client to connect.
186 /// pipe.connect().await?;
187 ///
188 /// // Use the connected client...
189 /// # Ok(()) }
190 /// ```
191 pub async fn connect(&self) -> io::Result<()> {
192 match self.io.connect() {
193 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
194 self.io
195 .registration()
196 .async_io(Interest::WRITABLE, || self.io.connect())
197 .await
198 }
199 x => x,
200 }
201 }
202
203 /// Disconnects the server end of a named pipe instance from a client
204 /// process.
205 ///
206 /// ```
207 /// use tokio::io::AsyncWriteExt;
208 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
209 /// use windows_sys::Win32::Foundation::ERROR_PIPE_NOT_CONNECTED;
210 ///
211 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-disconnect";
212 ///
213 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
214 /// let server = ServerOptions::new()
215 /// .create(PIPE_NAME)?;
216 ///
217 /// let mut client = ClientOptions::new()
218 /// .open(PIPE_NAME)?;
219 ///
220 /// // Wait for a client to become connected.
221 /// server.connect().await?;
222 ///
223 /// // Forcibly disconnect the client.
224 /// server.disconnect()?;
225 ///
226 /// // Write fails with an OS-specific error after client has been
227 /// // disconnected.
228 /// let e = client.write(b"ping").await.unwrap_err();
229 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_NOT_CONNECTED as i32));
230 /// # Ok(()) }
231 /// ```
232 pub fn disconnect(&self) -> io::Result<()> {
233 self.io.disconnect()
234 }
235
236 /// Waits for any of the requested ready states.
237 ///
238 /// This function is usually paired with `try_read()` or `try_write()`. It
239 /// can be used to concurrently read / write to the same pipe on a single
240 /// task without splitting the pipe.
241 ///
242 /// The function may complete without the pipe being ready. This is a
243 /// false-positive and attempting an operation will return with
244 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
245 /// [`Ready`] set, so you should always check the returned value and possibly
246 /// wait again if the requested states are not set.
247 ///
248 /// # Examples
249 ///
250 /// Concurrently read and write to the pipe on the same task without
251 /// splitting.
252 ///
253 /// ```no_run
254 /// use tokio::io::Interest;
255 /// use tokio::net::windows::named_pipe;
256 /// use std::error::Error;
257 /// use std::io;
258 ///
259 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-ready";
260 ///
261 /// #[tokio::main]
262 /// async fn main() -> Result<(), Box<dyn Error>> {
263 /// let server = named_pipe::ServerOptions::new()
264 /// .create(PIPE_NAME)?;
265 ///
266 /// loop {
267 /// let ready = server.ready(Interest::READABLE | Interest::WRITABLE).await?;
268 ///
269 /// if ready.is_readable() {
270 /// let mut data = vec![0; 1024];
271 /// // Try to read data, this may still fail with `WouldBlock`
272 /// // if the readiness event is a false positive.
273 /// match server.try_read(&mut data) {
274 /// Ok(n) => {
275 /// println!("read {} bytes", n);
276 /// }
277 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
278 /// continue;
279 /// }
280 /// Err(e) => {
281 /// return Err(e.into());
282 /// }
283 /// }
284 /// }
285 ///
286 /// if ready.is_writable() {
287 /// // Try to write data, this may still fail with `WouldBlock`
288 /// // if the readiness event is a false positive.
289 /// match server.try_write(b"hello world") {
290 /// Ok(n) => {
291 /// println!("write {} bytes", n);
292 /// }
293 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
294 /// continue;
295 /// }
296 /// Err(e) => {
297 /// return Err(e.into());
298 /// }
299 /// }
300 /// }
301 /// }
302 /// }
303 /// ```
304 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
305 let event = self.io.registration().readiness(interest).await?;
306 Ok(event.ready)
307 }
308
309 /// Waits for the pipe to become readable.
310 ///
311 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
312 /// paired with `try_read()`.
313 ///
314 /// # Examples
315 ///
316 /// ```no_run
317 /// use tokio::net::windows::named_pipe;
318 /// use std::error::Error;
319 /// use std::io;
320 ///
321 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-readable";
322 ///
323 /// #[tokio::main]
324 /// async fn main() -> Result<(), Box<dyn Error>> {
325 /// let server = named_pipe::ServerOptions::new()
326 /// .create(PIPE_NAME)?;
327 ///
328 /// let mut msg = vec![0; 1024];
329 ///
330 /// loop {
331 /// // Wait for the pipe to be readable
332 /// server.readable().await?;
333 ///
334 /// // Try to read data, this may still fail with `WouldBlock`
335 /// // if the readiness event is a false positive.
336 /// match server.try_read(&mut msg) {
337 /// Ok(n) => {
338 /// msg.truncate(n);
339 /// break;
340 /// }
341 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
342 /// continue;
343 /// }
344 /// Err(e) => {
345 /// return Err(e.into());
346 /// }
347 /// }
348 /// }
349 ///
350 /// println!("GOT = {:?}", msg);
351 /// Ok(())
352 /// }
353 /// ```
354 pub async fn readable(&self) -> io::Result<()> {
355 self.ready(Interest::READABLE).await?;
356 Ok(())
357 }
358
359 /// Polls for read readiness.
360 ///
361 /// If the pipe is not currently ready for reading, this method will
362 /// store a clone of the `Waker` from the provided `Context`. When the pipe
363 /// becomes ready for reading, `Waker::wake` will be called on the waker.
364 ///
365 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
366 /// the `Waker` from the `Context` passed to the most recent call is
367 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
368 /// second, independent waker.)
369 ///
370 /// This function is intended for cases where creating and pinning a future
371 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
372 /// preferred, as this supports polling from multiple tasks at once.
373 ///
374 /// # Return value
375 ///
376 /// The function returns:
377 ///
378 /// * `Poll::Pending` if the pipe is not ready for reading.
379 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
380 /// * `Poll::Ready(Err(e))` if an error is encountered.
381 ///
382 /// # Errors
383 ///
384 /// This function may encounter any standard I/O error except `WouldBlock`.
385 ///
386 /// [`readable`]: method@Self::readable
387 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
388 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
389 }
390
391 /// Tries to read data from the pipe into the provided buffer, returning how
392 /// many bytes were read.
393 ///
394 /// Receives any pending data from the pipe but does not wait for new data
395 /// to arrive. On success, returns the number of bytes read. Because
396 /// `try_read()` is non-blocking, the buffer does not have to be stored by
397 /// the async task and can exist entirely on the stack.
398 ///
399 /// Usually, [`readable()`] or [`ready()`] is used with this function.
400 ///
401 /// [`readable()`]: NamedPipeServer::readable()
402 /// [`ready()`]: NamedPipeServer::ready()
403 ///
404 /// # Return
405 ///
406 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
407 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
408 ///
409 /// 1. The pipe's read half is closed and will no longer yield data.
410 /// 2. The specified buffer was 0 bytes in length.
411 ///
412 /// If the pipe is not ready to read data,
413 /// `Err(io::ErrorKind::WouldBlock)` is returned.
414 ///
415 /// # Examples
416 ///
417 /// ```no_run
418 /// use tokio::net::windows::named_pipe;
419 /// use std::error::Error;
420 /// use std::io;
421 ///
422 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read";
423 ///
424 /// #[tokio::main]
425 /// async fn main() -> Result<(), Box<dyn Error>> {
426 /// let server = named_pipe::ServerOptions::new()
427 /// .create(PIPE_NAME)?;
428 ///
429 /// loop {
430 /// // Wait for the pipe to be readable
431 /// server.readable().await?;
432 ///
433 /// // Creating the buffer **after** the `await` prevents it from
434 /// // being stored in the async task.
435 /// let mut buf = [0; 4096];
436 ///
437 /// // Try to read data, this may still fail with `WouldBlock`
438 /// // if the readiness event is a false positive.
439 /// match server.try_read(&mut buf) {
440 /// Ok(0) => break,
441 /// Ok(n) => {
442 /// println!("read {} bytes", n);
443 /// }
444 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
445 /// continue;
446 /// }
447 /// Err(e) => {
448 /// return Err(e.into());
449 /// }
450 /// }
451 /// }
452 ///
453 /// Ok(())
454 /// }
455 /// ```
456 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
457 self.io
458 .registration()
459 .try_io(Interest::READABLE, || (&*self.io).read(buf))
460 }
461
462 /// Tries to read data from the pipe into the provided buffers, returning
463 /// how many bytes were read.
464 ///
465 /// Data is copied to fill each buffer in order, with the final buffer
466 /// written to possibly being only partially filled. This method behaves
467 /// equivalently to a single call to [`try_read()`] with concatenated
468 /// buffers.
469 ///
470 /// Receives any pending data from the pipe but does not wait for new data
471 /// to arrive. On success, returns the number of bytes read. Because
472 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
473 /// stored by the async task and can exist entirely on the stack.
474 ///
475 /// Usually, [`readable()`] or [`ready()`] is used with this function.
476 ///
477 /// [`try_read()`]: NamedPipeServer::try_read()
478 /// [`readable()`]: NamedPipeServer::readable()
479 /// [`ready()`]: NamedPipeServer::ready()
480 ///
481 /// # Return
482 ///
483 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
484 /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
485 /// and will no longer yield data. If the pipe is not ready to read data
486 /// `Err(io::ErrorKind::WouldBlock)` is returned.
487 ///
488 /// # Examples
489 ///
490 /// ```no_run
491 /// use tokio::net::windows::named_pipe;
492 /// use std::error::Error;
493 /// use std::io::{self, IoSliceMut};
494 ///
495 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read-vectored";
496 ///
497 /// #[tokio::main]
498 /// async fn main() -> Result<(), Box<dyn Error>> {
499 /// let server = named_pipe::ServerOptions::new()
500 /// .create(PIPE_NAME)?;
501 ///
502 /// loop {
503 /// // Wait for the pipe to be readable
504 /// server.readable().await?;
505 ///
506 /// // Creating the buffer **after** the `await` prevents it from
507 /// // being stored in the async task.
508 /// let mut buf_a = [0; 512];
509 /// let mut buf_b = [0; 1024];
510 /// let mut bufs = [
511 /// IoSliceMut::new(&mut buf_a),
512 /// IoSliceMut::new(&mut buf_b),
513 /// ];
514 ///
515 /// // Try to read data, this may still fail with `WouldBlock`
516 /// // if the readiness event is a false positive.
517 /// match server.try_read_vectored(&mut bufs) {
518 /// Ok(0) => break,
519 /// Ok(n) => {
520 /// println!("read {} bytes", n);
521 /// }
522 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
523 /// continue;
524 /// }
525 /// Err(e) => {
526 /// return Err(e.into());
527 /// }
528 /// }
529 /// }
530 ///
531 /// Ok(())
532 /// }
533 /// ```
534 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
535 self.io
536 .registration()
537 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
538 }
539
540 cfg_io_util! {
541 /// Tries to read data from the stream into the provided buffer, advancing the
542 /// buffer's internal cursor, returning how many bytes were read.
543 ///
544 /// Receives any pending data from the pipe but does not wait for new data
545 /// to arrive. On success, returns the number of bytes read. Because
546 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
547 /// the async task and can exist entirely on the stack.
548 ///
549 /// Usually, [`readable()`] or [`ready()`] is used with this function.
550 ///
551 /// [`readable()`]: NamedPipeServer::readable()
552 /// [`ready()`]: NamedPipeServer::ready()
553 ///
554 /// # Return
555 ///
556 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
557 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
558 /// and will no longer yield data. If the stream is not ready to read data
559 /// `Err(io::ErrorKind::WouldBlock)` is returned.
560 ///
561 /// # Examples
562 ///
563 /// ```no_run
564 /// use tokio::net::windows::named_pipe;
565 /// use std::error::Error;
566 /// use std::io;
567 ///
568 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
569 ///
570 /// #[tokio::main]
571 /// async fn main() -> Result<(), Box<dyn Error>> {
572 /// let server = named_pipe::ServerOptions::new().create(PIPE_NAME)?;
573 ///
574 /// loop {
575 /// // Wait for the pipe to be readable
576 /// server.readable().await?;
577 ///
578 /// let mut buf = Vec::with_capacity(4096);
579 ///
580 /// // Try to read data, this may still fail with `WouldBlock`
581 /// // if the readiness event is a false positive.
582 /// match server.try_read_buf(&mut buf) {
583 /// Ok(0) => break,
584 /// Ok(n) => {
585 /// println!("read {} bytes", n);
586 /// }
587 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
588 /// continue;
589 /// }
590 /// Err(e) => {
591 /// return Err(e.into());
592 /// }
593 /// }
594 /// }
595 ///
596 /// Ok(())
597 /// }
598 /// ```
599 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
600 self.io.registration().try_io(Interest::READABLE, || {
601 use std::io::Read;
602
603 let dst = buf.chunk_mut();
604 let dst =
605 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
606
607 // Safety: We trust `NamedPipeServer::read` to have filled up `n` bytes in the
608 // buffer.
609 let n = (&*self.io).read(dst)?;
610
611 unsafe {
612 buf.advance_mut(n);
613 }
614
615 Ok(n)
616 })
617 }
618 }
619
620 /// Waits for the pipe to become writable.
621 ///
622 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
623 /// paired with `try_write()`.
624 ///
625 /// # Examples
626 ///
627 /// ```no_run
628 /// use tokio::net::windows::named_pipe;
629 /// use std::error::Error;
630 /// use std::io;
631 ///
632 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-writable";
633 ///
634 /// #[tokio::main]
635 /// async fn main() -> Result<(), Box<dyn Error>> {
636 /// let server = named_pipe::ServerOptions::new()
637 /// .create(PIPE_NAME)?;
638 ///
639 /// loop {
640 /// // Wait for the pipe to be writable
641 /// server.writable().await?;
642 ///
643 /// // Try to write data, this may still fail with `WouldBlock`
644 /// // if the readiness event is a false positive.
645 /// match server.try_write(b"hello world") {
646 /// Ok(n) => {
647 /// break;
648 /// }
649 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
650 /// continue;
651 /// }
652 /// Err(e) => {
653 /// return Err(e.into());
654 /// }
655 /// }
656 /// }
657 ///
658 /// Ok(())
659 /// }
660 /// ```
661 pub async fn writable(&self) -> io::Result<()> {
662 self.ready(Interest::WRITABLE).await?;
663 Ok(())
664 }
665
666 /// Polls for write readiness.
667 ///
668 /// If the pipe is not currently ready for writing, this method will
669 /// store a clone of the `Waker` from the provided `Context`. When the pipe
670 /// becomes ready for writing, `Waker::wake` will be called on the waker.
671 ///
672 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
673 /// the `Waker` from the `Context` passed to the most recent call is
674 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
675 /// second, independent waker.)
676 ///
677 /// This function is intended for cases where creating and pinning a future
678 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
679 /// preferred, as this supports polling from multiple tasks at once.
680 ///
681 /// # Return value
682 ///
683 /// The function returns:
684 ///
685 /// * `Poll::Pending` if the pipe is not ready for writing.
686 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
687 /// * `Poll::Ready(Err(e))` if an error is encountered.
688 ///
689 /// # Errors
690 ///
691 /// This function may encounter any standard I/O error except `WouldBlock`.
692 ///
693 /// [`writable`]: method@Self::writable
694 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
695 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
696 }
697
698 /// Tries to write a buffer to the pipe, returning how many bytes were
699 /// written.
700 ///
701 /// The function will attempt to write the entire contents of `buf`, but
702 /// only part of the buffer may be written.
703 ///
704 /// This function is usually paired with `writable()`.
705 ///
706 /// # Return
707 ///
708 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
709 /// number of bytes written. If the pipe is not ready to write data,
710 /// `Err(io::ErrorKind::WouldBlock)` is returned.
711 ///
712 /// # Examples
713 ///
714 /// ```no_run
715 /// use tokio::net::windows::named_pipe;
716 /// use std::error::Error;
717 /// use std::io;
718 ///
719 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write";
720 ///
721 /// #[tokio::main]
722 /// async fn main() -> Result<(), Box<dyn Error>> {
723 /// let server = named_pipe::ServerOptions::new()
724 /// .create(PIPE_NAME)?;
725 ///
726 /// loop {
727 /// // Wait for the pipe to be writable
728 /// server.writable().await?;
729 ///
730 /// // Try to write data, this may still fail with `WouldBlock`
731 /// // if the readiness event is a false positive.
732 /// match server.try_write(b"hello world") {
733 /// Ok(n) => {
734 /// break;
735 /// }
736 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
737 /// continue;
738 /// }
739 /// Err(e) => {
740 /// return Err(e.into());
741 /// }
742 /// }
743 /// }
744 ///
745 /// Ok(())
746 /// }
747 /// ```
748 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
749 self.io
750 .registration()
751 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
752 }
753
754 /// Tries to write several buffers to the pipe, returning how many bytes
755 /// were written.
756 ///
757 /// Data is written from each buffer in order, with the final buffer read
758 /// from possible being only partially consumed. This method behaves
759 /// equivalently to a single call to [`try_write()`] with concatenated
760 /// buffers.
761 ///
762 /// This function is usually paired with `writable()`.
763 ///
764 /// [`try_write()`]: NamedPipeServer::try_write()
765 ///
766 /// # Return
767 ///
768 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
769 /// number of bytes written. If the pipe is not ready to write data,
770 /// `Err(io::ErrorKind::WouldBlock)` is returned.
771 ///
772 /// # Examples
773 ///
774 /// ```no_run
775 /// use tokio::net::windows::named_pipe;
776 /// use std::error::Error;
777 /// use std::io;
778 ///
779 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write-vectored";
780 ///
781 /// #[tokio::main]
782 /// async fn main() -> Result<(), Box<dyn Error>> {
783 /// let server = named_pipe::ServerOptions::new()
784 /// .create(PIPE_NAME)?;
785 ///
786 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
787 ///
788 /// loop {
789 /// // Wait for the pipe to be writable
790 /// server.writable().await?;
791 ///
792 /// // Try to write data, this may still fail with `WouldBlock`
793 /// // if the readiness event is a false positive.
794 /// match server.try_write_vectored(&bufs) {
795 /// Ok(n) => {
796 /// break;
797 /// }
798 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
799 /// continue;
800 /// }
801 /// Err(e) => {
802 /// return Err(e.into());
803 /// }
804 /// }
805 /// }
806 ///
807 /// Ok(())
808 /// }
809 /// ```
810 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
811 self.io
812 .registration()
813 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
814 }
815
816 /// Tries to read or write from the pipe using a user-provided IO operation.
817 ///
818 /// If the pipe is ready, the provided closure is called. The closure
819 /// should attempt to perform IO operation from the pipe by manually
820 /// calling the appropriate syscall. If the operation fails because the
821 /// pipe is not actually ready, then the closure should return a
822 /// `WouldBlock` error and the readiness flag is cleared. The return value
823 /// of the closure is then returned by `try_io`.
824 ///
825 /// If the pipe is not ready, then the closure is not called
826 /// and a `WouldBlock` error is returned.
827 ///
828 /// The closure should only return a `WouldBlock` error if it has performed
829 /// an IO operation on the pipe that failed due to the pipe not being
830 /// ready. Returning a `WouldBlock` error in any other situation will
831 /// incorrectly clear the readiness flag, which can cause the pipe to
832 /// behave incorrectly.
833 ///
834 /// The closure should not perform the IO operation using any of the
835 /// methods defined on the Tokio `NamedPipeServer` type, as this will mess with
836 /// the readiness flag and can cause the pipe to behave incorrectly.
837 ///
838 /// This method is not intended to be used with combined interests.
839 /// The closure should perform only one type of IO operation, so it should not
840 /// require more than one ready state. This method may panic or sleep forever
841 /// if it is called with a combined interest.
842 ///
843 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
844 ///
845 /// [`readable()`]: NamedPipeServer::readable()
846 /// [`writable()`]: NamedPipeServer::writable()
847 /// [`ready()`]: NamedPipeServer::ready()
848 pub fn try_io<R>(
849 &self,
850 interest: Interest,
851 f: impl FnOnce() -> io::Result<R>,
852 ) -> io::Result<R> {
853 self.io.registration().try_io(interest, f)
854 }
855
856 /// Reads or writes from the pipe using a user-provided IO operation.
857 ///
858 /// The readiness of the pipe is awaited and when the pipe is ready,
859 /// the provided closure is called. The closure should attempt to perform
860 /// IO operation on the pipe by manually calling the appropriate syscall.
861 /// If the operation fails because the pipe is not actually ready,
862 /// then the closure should return a `WouldBlock` error. In such case the
863 /// readiness flag is cleared and the pipe readiness is awaited again.
864 /// This loop is repeated until the closure returns an `Ok` or an error
865 /// other than `WouldBlock`.
866 ///
867 /// The closure should only return a `WouldBlock` error if it has performed
868 /// an IO operation on the pipe that failed due to the pipe not being
869 /// ready. Returning a `WouldBlock` error in any other situation will
870 /// incorrectly clear the readiness flag, which can cause the pipe to
871 /// behave incorrectly.
872 ///
873 /// The closure should not perform the IO operation using any of the methods
874 /// defined on the Tokio `NamedPipeServer` type, as this will mess with the
875 /// readiness flag and can cause the pipe to behave incorrectly.
876 ///
877 /// This method is not intended to be used with combined interests.
878 /// The closure should perform only one type of IO operation, so it should not
879 /// require more than one ready state. This method may panic or sleep forever
880 /// if it is called with a combined interest.
881 pub async fn async_io<R>(
882 &self,
883 interest: Interest,
884 f: impl FnMut() -> io::Result<R>,
885 ) -> io::Result<R> {
886 self.io.registration().async_io(interest, f).await
887 }
888}
889
890impl AsyncRead for NamedPipeServer {
891 fn poll_read(
892 self: Pin<&mut Self>,
893 cx: &mut Context<'_>,
894 buf: &mut ReadBuf<'_>,
895 ) -> Poll<io::Result<()>> {
896 unsafe { self.io.poll_read(cx, buf) }
897 }
898}
899
900impl AsyncWrite for NamedPipeServer {
901 fn poll_write(
902 self: Pin<&mut Self>,
903 cx: &mut Context<'_>,
904 buf: &[u8],
905 ) -> Poll<io::Result<usize>> {
906 self.io.poll_write(cx, buf)
907 }
908
909 fn poll_write_vectored(
910 self: Pin<&mut Self>,
911 cx: &mut Context<'_>,
912 bufs: &[io::IoSlice<'_>],
913 ) -> Poll<io::Result<usize>> {
914 self.io.poll_write_vectored(cx, bufs)
915 }
916
917 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
918 Poll::Ready(Ok(()))
919 }
920
921 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
922 self.poll_flush(cx)
923 }
924}
925
926impl AsRawHandle for NamedPipeServer {
927 fn as_raw_handle(&self) -> RawHandle {
928 self.io.as_raw_handle()
929 }
930}
931
932impl AsHandle for NamedPipeServer {
933 fn as_handle(&self) -> BorrowedHandle<'_> {
934 unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
935 }
936}
937
938/// A [Windows named pipe] client.
939///
940/// Constructed using [`ClientOptions::open`].
941///
942/// Connecting a client correctly involves a few steps. When connecting through
943/// [`ClientOptions::open`], it might error indicating one of two things:
944///
945/// * [`std::io::ErrorKind::NotFound`] - There is no server available.
946/// * [`ERROR_PIPE_BUSY`] - There is a server available, but it is busy. Sleep
947/// for a while and try again.
948///
949/// So a correctly implemented client looks like this:
950///
951/// ```no_run
952/// use std::time::Duration;
953/// use tokio::net::windows::named_pipe::ClientOptions;
954/// use tokio::time;
955/// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
956///
957/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
958///
959/// # #[tokio::main] async fn main() -> std::io::Result<()> {
960/// let client = loop {
961/// match ClientOptions::new().open(PIPE_NAME) {
962/// Ok(client) => break client,
963/// Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
964/// Err(e) => return Err(e),
965/// }
966///
967/// time::sleep(Duration::from_millis(50)).await;
968/// };
969///
970/// /* use the connected client */
971/// # Ok(()) }
972/// ```
973///
974/// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
975/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
976#[derive(Debug)]
977pub struct NamedPipeClient {
978 io: PollEvented<mio_windows::NamedPipe>,
979}
980
981impl NamedPipeClient {
982 /// Constructs a new named pipe client from the specified raw handle.
983 ///
984 /// This function will consume ownership of the handle given, passing
985 /// responsibility for closing the handle to the returned object.
986 ///
987 /// This function is also unsafe as the primitives currently returned have
988 /// the contract that they are the sole owner of the file descriptor they
989 /// are wrapping. Usage of this function could accidentally allow violating
990 /// this contract which can cause memory unsafety in code that relies on it
991 /// being true.
992 ///
993 /// # Errors
994 ///
995 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
996 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
997 ///
998 /// [Tokio Runtime]: crate::runtime::Runtime
999 /// [enabled I/O]: crate::runtime::Builder::enable_io
1000 pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
1001 let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
1002
1003 Ok(Self {
1004 io: PollEvented::new(named_pipe)?,
1005 })
1006 }
1007
1008 /// Retrieves information about the named pipe the client is associated
1009 /// with.
1010 ///
1011 /// ```no_run
1012 /// use tokio::net::windows::named_pipe::{ClientOptions, PipeEnd, PipeMode};
1013 ///
1014 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-info";
1015 ///
1016 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1017 /// let client = ClientOptions::new()
1018 /// .open(PIPE_NAME)?;
1019 ///
1020 /// let client_info = client.info()?;
1021 ///
1022 /// assert_eq!(client_info.end, PipeEnd::Client);
1023 /// assert_eq!(client_info.mode, PipeMode::Message);
1024 /// assert_eq!(client_info.max_instances, 5);
1025 /// # Ok(()) }
1026 /// ```
1027 pub fn info(&self) -> io::Result<PipeInfo> {
1028 // Safety: we're ensuring the lifetime of the named pipe.
1029 unsafe { named_pipe_info(self.io.as_raw_handle()) }
1030 }
1031
1032 /// Waits for any of the requested ready states.
1033 ///
1034 /// This function is usually paired with `try_read()` or `try_write()`. It
1035 /// can be used to concurrently read / write to the same pipe on a single
1036 /// task without splitting the pipe.
1037 ///
1038 /// The function may complete without the pipe being ready. This is a
1039 /// false-positive and attempting an operation will return with
1040 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
1041 /// [`Ready`] set, so you should always check the returned value and possibly
1042 /// wait again if the requested states are not set.
1043 ///
1044 /// # Examples
1045 ///
1046 /// Concurrently read and write to the pipe on the same task without
1047 /// splitting.
1048 ///
1049 /// ```no_run
1050 /// use tokio::io::Interest;
1051 /// use tokio::net::windows::named_pipe;
1052 /// use std::error::Error;
1053 /// use std::io;
1054 ///
1055 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready";
1056 ///
1057 /// #[tokio::main]
1058 /// async fn main() -> Result<(), Box<dyn Error>> {
1059 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1060 ///
1061 /// loop {
1062 /// let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?;
1063 ///
1064 /// if ready.is_readable() {
1065 /// let mut data = vec![0; 1024];
1066 /// // Try to read data, this may still fail with `WouldBlock`
1067 /// // if the readiness event is a false positive.
1068 /// match client.try_read(&mut data) {
1069 /// Ok(n) => {
1070 /// println!("read {} bytes", n);
1071 /// }
1072 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1073 /// continue;
1074 /// }
1075 /// Err(e) => {
1076 /// return Err(e.into());
1077 /// }
1078 /// }
1079 /// }
1080 ///
1081 /// if ready.is_writable() {
1082 /// // Try to write data, this may still fail with `WouldBlock`
1083 /// // if the readiness event is a false positive.
1084 /// match client.try_write(b"hello world") {
1085 /// Ok(n) => {
1086 /// println!("write {} bytes", n);
1087 /// }
1088 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1089 /// continue;
1090 /// }
1091 /// Err(e) => {
1092 /// return Err(e.into());
1093 /// }
1094 /// }
1095 /// }
1096 /// }
1097 /// }
1098 /// ```
1099 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
1100 let event = self.io.registration().readiness(interest).await?;
1101 Ok(event.ready)
1102 }
1103
1104 /// Waits for the pipe to become readable.
1105 ///
1106 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
1107 /// paired with `try_read()`.
1108 ///
1109 /// # Examples
1110 ///
1111 /// ```no_run
1112 /// use tokio::net::windows::named_pipe;
1113 /// use std::error::Error;
1114 /// use std::io;
1115 ///
1116 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1117 ///
1118 /// #[tokio::main]
1119 /// async fn main() -> Result<(), Box<dyn Error>> {
1120 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1121 ///
1122 /// let mut msg = vec![0; 1024];
1123 ///
1124 /// loop {
1125 /// // Wait for the pipe to be readable
1126 /// client.readable().await?;
1127 ///
1128 /// // Try to read data, this may still fail with `WouldBlock`
1129 /// // if the readiness event is a false positive.
1130 /// match client.try_read(&mut msg) {
1131 /// Ok(n) => {
1132 /// msg.truncate(n);
1133 /// break;
1134 /// }
1135 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1136 /// continue;
1137 /// }
1138 /// Err(e) => {
1139 /// return Err(e.into());
1140 /// }
1141 /// }
1142 /// }
1143 ///
1144 /// println!("GOT = {:?}", msg);
1145 /// Ok(())
1146 /// }
1147 /// ```
1148 pub async fn readable(&self) -> io::Result<()> {
1149 self.ready(Interest::READABLE).await?;
1150 Ok(())
1151 }
1152
1153 /// Polls for read readiness.
1154 ///
1155 /// If the pipe is not currently ready for reading, this method will
1156 /// store a clone of the `Waker` from the provided `Context`. When the pipe
1157 /// becomes ready for reading, `Waker::wake` will be called on the waker.
1158 ///
1159 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1160 /// the `Waker` from the `Context` passed to the most recent call is
1161 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
1162 /// second, independent waker.)
1163 ///
1164 /// This function is intended for cases where creating and pinning a future
1165 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1166 /// preferred, as this supports polling from multiple tasks at once.
1167 ///
1168 /// # Return value
1169 ///
1170 /// The function returns:
1171 ///
1172 /// * `Poll::Pending` if the pipe is not ready for reading.
1173 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1174 /// * `Poll::Ready(Err(e))` if an error is encountered.
1175 ///
1176 /// # Errors
1177 ///
1178 /// This function may encounter any standard I/O error except `WouldBlock`.
1179 ///
1180 /// [`readable`]: method@Self::readable
1181 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1182 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1183 }
1184
1185 /// Tries to read data from the pipe into the provided buffer, returning how
1186 /// many bytes were read.
1187 ///
1188 /// Receives any pending data from the pipe but does not wait for new data
1189 /// to arrive. On success, returns the number of bytes read. Because
1190 /// `try_read()` is non-blocking, the buffer does not have to be stored by
1191 /// the async task and can exist entirely on the stack.
1192 ///
1193 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1194 ///
1195 /// [`readable()`]: NamedPipeClient::readable()
1196 /// [`ready()`]: NamedPipeClient::ready()
1197 ///
1198 /// # Return
1199 ///
1200 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1201 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
1202 ///
1203 /// 1. The pipe's read half is closed and will no longer yield data.
1204 /// 2. The specified buffer was 0 bytes in length.
1205 ///
1206 /// If the pipe is not ready to read data,
1207 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1208 ///
1209 /// # Examples
1210 ///
1211 /// ```no_run
1212 /// use tokio::net::windows::named_pipe;
1213 /// use std::error::Error;
1214 /// use std::io;
1215 ///
1216 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read";
1217 ///
1218 /// #[tokio::main]
1219 /// async fn main() -> Result<(), Box<dyn Error>> {
1220 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1221 ///
1222 /// loop {
1223 /// // Wait for the pipe to be readable
1224 /// client.readable().await?;
1225 ///
1226 /// // Creating the buffer **after** the `await` prevents it from
1227 /// // being stored in the async task.
1228 /// let mut buf = [0; 4096];
1229 ///
1230 /// // Try to read data, this may still fail with `WouldBlock`
1231 /// // if the readiness event is a false positive.
1232 /// match client.try_read(&mut buf) {
1233 /// Ok(0) => break,
1234 /// Ok(n) => {
1235 /// println!("read {} bytes", n);
1236 /// }
1237 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1238 /// continue;
1239 /// }
1240 /// Err(e) => {
1241 /// return Err(e.into());
1242 /// }
1243 /// }
1244 /// }
1245 ///
1246 /// Ok(())
1247 /// }
1248 /// ```
1249 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1250 self.io
1251 .registration()
1252 .try_io(Interest::READABLE, || (&*self.io).read(buf))
1253 }
1254
1255 /// Tries to read data from the pipe into the provided buffers, returning
1256 /// how many bytes were read.
1257 ///
1258 /// Data is copied to fill each buffer in order, with the final buffer
1259 /// written to possibly being only partially filled. This method behaves
1260 /// equivalently to a single call to [`try_read()`] with concatenated
1261 /// buffers.
1262 ///
1263 /// Receives any pending data from the pipe but does not wait for new data
1264 /// to arrive. On success, returns the number of bytes read. Because
1265 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1266 /// stored by the async task and can exist entirely on the stack.
1267 ///
1268 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1269 ///
1270 /// [`try_read()`]: NamedPipeClient::try_read()
1271 /// [`readable()`]: NamedPipeClient::readable()
1272 /// [`ready()`]: NamedPipeClient::ready()
1273 ///
1274 /// # Return
1275 ///
1276 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1277 /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
1278 /// and will no longer yield data. If the pipe is not ready to read data
1279 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1280 ///
1281 /// # Examples
1282 ///
1283 /// ```no_run
1284 /// use tokio::net::windows::named_pipe;
1285 /// use std::error::Error;
1286 /// use std::io::{self, IoSliceMut};
1287 ///
1288 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored";
1289 ///
1290 /// #[tokio::main]
1291 /// async fn main() -> Result<(), Box<dyn Error>> {
1292 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1293 ///
1294 /// loop {
1295 /// // Wait for the pipe to be readable
1296 /// client.readable().await?;
1297 ///
1298 /// // Creating the buffer **after** the `await` prevents it from
1299 /// // being stored in the async task.
1300 /// let mut buf_a = [0; 512];
1301 /// let mut buf_b = [0; 1024];
1302 /// let mut bufs = [
1303 /// IoSliceMut::new(&mut buf_a),
1304 /// IoSliceMut::new(&mut buf_b),
1305 /// ];
1306 ///
1307 /// // Try to read data, this may still fail with `WouldBlock`
1308 /// // if the readiness event is a false positive.
1309 /// match client.try_read_vectored(&mut bufs) {
1310 /// Ok(0) => break,
1311 /// Ok(n) => {
1312 /// println!("read {} bytes", n);
1313 /// }
1314 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1315 /// continue;
1316 /// }
1317 /// Err(e) => {
1318 /// return Err(e.into());
1319 /// }
1320 /// }
1321 /// }
1322 ///
1323 /// Ok(())
1324 /// }
1325 /// ```
1326 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1327 self.io
1328 .registration()
1329 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1330 }
1331
1332 cfg_io_util! {
1333 /// Tries to read data from the stream into the provided buffer, advancing the
1334 /// buffer's internal cursor, returning how many bytes were read.
1335 ///
1336 /// Receives any pending data from the pipe but does not wait for new data
1337 /// to arrive. On success, returns the number of bytes read. Because
1338 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1339 /// the async task and can exist entirely on the stack.
1340 ///
1341 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1342 ///
1343 /// [`readable()`]: NamedPipeClient::readable()
1344 /// [`ready()`]: NamedPipeClient::ready()
1345 ///
1346 /// # Return
1347 ///
1348 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1349 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
1350 /// and will no longer yield data. If the stream is not ready to read data
1351 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1352 ///
1353 /// # Examples
1354 ///
1355 /// ```no_run
1356 /// use tokio::net::windows::named_pipe;
1357 /// use std::error::Error;
1358 /// use std::io;
1359 ///
1360 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1361 ///
1362 /// #[tokio::main]
1363 /// async fn main() -> Result<(), Box<dyn Error>> {
1364 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1365 ///
1366 /// loop {
1367 /// // Wait for the pipe to be readable
1368 /// client.readable().await?;
1369 ///
1370 /// let mut buf = Vec::with_capacity(4096);
1371 ///
1372 /// // Try to read data, this may still fail with `WouldBlock`
1373 /// // if the readiness event is a false positive.
1374 /// match client.try_read_buf(&mut buf) {
1375 /// Ok(0) => break,
1376 /// Ok(n) => {
1377 /// println!("read {} bytes", n);
1378 /// }
1379 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1380 /// continue;
1381 /// }
1382 /// Err(e) => {
1383 /// return Err(e.into());
1384 /// }
1385 /// }
1386 /// }
1387 ///
1388 /// Ok(())
1389 /// }
1390 /// ```
1391 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1392 self.io.registration().try_io(Interest::READABLE, || {
1393 use std::io::Read;
1394
1395 let dst = buf.chunk_mut();
1396 let dst =
1397 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1398
1399 // Safety: We trust `NamedPipeClient::read` to have filled up `n` bytes in the
1400 // buffer.
1401 let n = (&*self.io).read(dst)?;
1402
1403 unsafe {
1404 buf.advance_mut(n);
1405 }
1406
1407 Ok(n)
1408 })
1409 }
1410 }
1411
1412 /// Waits for the pipe to become writable.
1413 ///
1414 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
1415 /// paired with `try_write()`.
1416 ///
1417 /// # Examples
1418 ///
1419 /// ```no_run
1420 /// use tokio::net::windows::named_pipe;
1421 /// use std::error::Error;
1422 /// use std::io;
1423 ///
1424 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable";
1425 ///
1426 /// #[tokio::main]
1427 /// async fn main() -> Result<(), Box<dyn Error>> {
1428 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1429 ///
1430 /// loop {
1431 /// // Wait for the pipe to be writable
1432 /// client.writable().await?;
1433 ///
1434 /// // Try to write data, this may still fail with `WouldBlock`
1435 /// // if the readiness event is a false positive.
1436 /// match client.try_write(b"hello world") {
1437 /// Ok(n) => {
1438 /// break;
1439 /// }
1440 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1441 /// continue;
1442 /// }
1443 /// Err(e) => {
1444 /// return Err(e.into());
1445 /// }
1446 /// }
1447 /// }
1448 ///
1449 /// Ok(())
1450 /// }
1451 /// ```
1452 pub async fn writable(&self) -> io::Result<()> {
1453 self.ready(Interest::WRITABLE).await?;
1454 Ok(())
1455 }
1456
1457 /// Polls for write readiness.
1458 ///
1459 /// If the pipe is not currently ready for writing, this method will
1460 /// store a clone of the `Waker` from the provided `Context`. When the pipe
1461 /// becomes ready for writing, `Waker::wake` will be called on the waker.
1462 ///
1463 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
1464 /// the `Waker` from the `Context` passed to the most recent call is
1465 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
1466 /// second, independent waker.)
1467 ///
1468 /// This function is intended for cases where creating and pinning a future
1469 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
1470 /// preferred, as this supports polling from multiple tasks at once.
1471 ///
1472 /// # Return value
1473 ///
1474 /// The function returns:
1475 ///
1476 /// * `Poll::Pending` if the pipe is not ready for writing.
1477 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
1478 /// * `Poll::Ready(Err(e))` if an error is encountered.
1479 ///
1480 /// # Errors
1481 ///
1482 /// This function may encounter any standard I/O error except `WouldBlock`.
1483 ///
1484 /// [`writable`]: method@Self::writable
1485 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1486 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
1487 }
1488
1489 /// Tries to write a buffer to the pipe, returning how many bytes were
1490 /// written.
1491 ///
1492 /// The function will attempt to write the entire contents of `buf`, but
1493 /// only part of the buffer may be written.
1494 ///
1495 /// This function is usually paired with `writable()`.
1496 ///
1497 /// # Return
1498 ///
1499 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1500 /// number of bytes written. If the pipe is not ready to write data,
1501 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1502 ///
1503 /// # Examples
1504 ///
1505 /// ```no_run
1506 /// use tokio::net::windows::named_pipe;
1507 /// use std::error::Error;
1508 /// use std::io;
1509 ///
1510 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write";
1511 ///
1512 /// #[tokio::main]
1513 /// async fn main() -> Result<(), Box<dyn Error>> {
1514 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1515 ///
1516 /// loop {
1517 /// // Wait for the pipe to be writable
1518 /// client.writable().await?;
1519 ///
1520 /// // Try to write data, this may still fail with `WouldBlock`
1521 /// // if the readiness event is a false positive.
1522 /// match client.try_write(b"hello world") {
1523 /// Ok(n) => {
1524 /// break;
1525 /// }
1526 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1527 /// continue;
1528 /// }
1529 /// Err(e) => {
1530 /// return Err(e.into());
1531 /// }
1532 /// }
1533 /// }
1534 ///
1535 /// Ok(())
1536 /// }
1537 /// ```
1538 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
1539 self.io
1540 .registration()
1541 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
1542 }
1543
1544 /// Tries to write several buffers to the pipe, returning how many bytes
1545 /// were written.
1546 ///
1547 /// Data is written from each buffer in order, with the final buffer read
1548 /// from possible being only partially consumed. This method behaves
1549 /// equivalently to a single call to [`try_write()`] with concatenated
1550 /// buffers.
1551 ///
1552 /// This function is usually paired with `writable()`.
1553 ///
1554 /// [`try_write()`]: NamedPipeClient::try_write()
1555 ///
1556 /// # Return
1557 ///
1558 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1559 /// number of bytes written. If the pipe is not ready to write data,
1560 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1561 ///
1562 /// # Examples
1563 ///
1564 /// ```no_run
1565 /// use tokio::net::windows::named_pipe;
1566 /// use std::error::Error;
1567 /// use std::io;
1568 ///
1569 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored";
1570 ///
1571 /// #[tokio::main]
1572 /// async fn main() -> Result<(), Box<dyn Error>> {
1573 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1574 ///
1575 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
1576 ///
1577 /// loop {
1578 /// // Wait for the pipe to be writable
1579 /// client.writable().await?;
1580 ///
1581 /// // Try to write data, this may still fail with `WouldBlock`
1582 /// // if the readiness event is a false positive.
1583 /// match client.try_write_vectored(&bufs) {
1584 /// Ok(n) => {
1585 /// break;
1586 /// }
1587 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1588 /// continue;
1589 /// }
1590 /// Err(e) => {
1591 /// return Err(e.into());
1592 /// }
1593 /// }
1594 /// }
1595 ///
1596 /// Ok(())
1597 /// }
1598 /// ```
1599 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
1600 self.io
1601 .registration()
1602 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
1603 }
1604
1605 /// Tries to read or write from the pipe using a user-provided IO operation.
1606 ///
1607 /// If the pipe is ready, the provided closure is called. The closure
1608 /// should attempt to perform IO operation from the pipe by manually
1609 /// calling the appropriate syscall. If the operation fails because the
1610 /// pipe is not actually ready, then the closure should return a
1611 /// `WouldBlock` error and the readiness flag is cleared. The return value
1612 /// of the closure is then returned by `try_io`.
1613 ///
1614 /// If the pipe is not ready, then the closure is not called
1615 /// and a `WouldBlock` error is returned.
1616 ///
1617 /// The closure should only return a `WouldBlock` error if it has performed
1618 /// an IO operation on the pipe that failed due to the pipe not being
1619 /// ready. Returning a `WouldBlock` error in any other situation will
1620 /// incorrectly clear the readiness flag, which can cause the pipe to
1621 /// behave incorrectly.
1622 ///
1623 /// The closure should not perform the IO operation using any of the methods
1624 /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1625 /// readiness flag and can cause the pipe to behave incorrectly.
1626 ///
1627 /// This method is not intended to be used with combined interests.
1628 /// The closure should perform only one type of IO operation, so it should not
1629 /// require more than one ready state. This method may panic or sleep forever
1630 /// if it is called with a combined interest.
1631 ///
1632 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1633 ///
1634 /// [`readable()`]: NamedPipeClient::readable()
1635 /// [`writable()`]: NamedPipeClient::writable()
1636 /// [`ready()`]: NamedPipeClient::ready()
1637 pub fn try_io<R>(
1638 &self,
1639 interest: Interest,
1640 f: impl FnOnce() -> io::Result<R>,
1641 ) -> io::Result<R> {
1642 self.io.registration().try_io(interest, f)
1643 }
1644
1645 /// Reads or writes from the pipe using a user-provided IO operation.
1646 ///
1647 /// The readiness of the pipe is awaited and when the pipe is ready,
1648 /// the provided closure is called. The closure should attempt to perform
1649 /// IO operation on the pipe by manually calling the appropriate syscall.
1650 /// If the operation fails because the pipe is not actually ready,
1651 /// then the closure should return a `WouldBlock` error. In such case the
1652 /// readiness flag is cleared and the pipe readiness is awaited again.
1653 /// This loop is repeated until the closure returns an `Ok` or an error
1654 /// other than `WouldBlock`.
1655 ///
1656 /// The closure should only return a `WouldBlock` error if it has performed
1657 /// an IO operation on the pipe that failed due to the pipe not being
1658 /// ready. Returning a `WouldBlock` error in any other situation will
1659 /// incorrectly clear the readiness flag, which can cause the pipe to
1660 /// behave incorrectly.
1661 ///
1662 /// The closure should not perform the IO operation using any of the methods
1663 /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1664 /// readiness flag and can cause the pipe to behave incorrectly.
1665 ///
1666 /// This method is not intended to be used with combined interests.
1667 /// The closure should perform only one type of IO operation, so it should not
1668 /// require more than one ready state. This method may panic or sleep forever
1669 /// if it is called with a combined interest.
1670 pub async fn async_io<R>(
1671 &self,
1672 interest: Interest,
1673 f: impl FnMut() -> io::Result<R>,
1674 ) -> io::Result<R> {
1675 self.io.registration().async_io(interest, f).await
1676 }
1677}
1678
1679impl AsyncRead for NamedPipeClient {
1680 fn poll_read(
1681 self: Pin<&mut Self>,
1682 cx: &mut Context<'_>,
1683 buf: &mut ReadBuf<'_>,
1684 ) -> Poll<io::Result<()>> {
1685 unsafe { self.io.poll_read(cx, buf) }
1686 }
1687}
1688
1689impl AsyncWrite for NamedPipeClient {
1690 fn poll_write(
1691 self: Pin<&mut Self>,
1692 cx: &mut Context<'_>,
1693 buf: &[u8],
1694 ) -> Poll<io::Result<usize>> {
1695 self.io.poll_write(cx, buf)
1696 }
1697
1698 fn poll_write_vectored(
1699 self: Pin<&mut Self>,
1700 cx: &mut Context<'_>,
1701 bufs: &[io::IoSlice<'_>],
1702 ) -> Poll<io::Result<usize>> {
1703 self.io.poll_write_vectored(cx, bufs)
1704 }
1705
1706 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1707 Poll::Ready(Ok(()))
1708 }
1709
1710 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1711 self.poll_flush(cx)
1712 }
1713}
1714
1715impl AsRawHandle for NamedPipeClient {
1716 fn as_raw_handle(&self) -> RawHandle {
1717 self.io.as_raw_handle()
1718 }
1719}
1720
1721impl AsHandle for NamedPipeClient {
1722 fn as_handle(&self) -> BorrowedHandle<'_> {
1723 unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
1724 }
1725}
1726
1727/// A builder structure for construct a named pipe with named pipe-specific
1728/// options. This is required to use for named pipe servers who wants to modify
1729/// pipe-related options.
1730///
1731/// See [`ServerOptions::create`].
1732#[derive(Debug, Clone)]
1733pub struct ServerOptions {
1734 // dwOpenMode
1735 access_inbound: bool,
1736 access_outbound: bool,
1737 first_pipe_instance: bool,
1738 write_dac: bool,
1739 write_owner: bool,
1740 access_system_security: bool,
1741 // dwPipeMode
1742 pipe_mode: PipeMode,
1743 reject_remote_clients: bool,
1744 // other options
1745 max_instances: u32,
1746 out_buffer_size: u32,
1747 in_buffer_size: u32,
1748 default_timeout: u32,
1749}
1750
1751impl ServerOptions {
1752 /// Creates a new named pipe builder with the default settings.
1753 ///
1754 /// ```
1755 /// use tokio::net::windows::named_pipe::ServerOptions;
1756 ///
1757 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-new";
1758 ///
1759 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1760 /// let server = ServerOptions::new().create(PIPE_NAME)?;
1761 /// # Ok(()) }
1762 /// ```
1763 pub fn new() -> ServerOptions {
1764 ServerOptions {
1765 access_inbound: true,
1766 access_outbound: true,
1767 first_pipe_instance: false,
1768 write_dac: false,
1769 write_owner: false,
1770 access_system_security: false,
1771 pipe_mode: PipeMode::Byte,
1772 reject_remote_clients: true,
1773 max_instances: windows_sys::PIPE_UNLIMITED_INSTANCES,
1774 out_buffer_size: 65536,
1775 in_buffer_size: 65536,
1776 default_timeout: 0,
1777 }
1778 }
1779
1780 /// The pipe mode.
1781 ///
1782 /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
1783 /// documentation of what each mode means.
1784 ///
1785 /// This corresponds to specifying `PIPE_TYPE_` and `PIPE_READMODE_` in [`dwPipeMode`].
1786 ///
1787 /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
1788 pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
1789 self.pipe_mode = pipe_mode;
1790 self
1791 }
1792
1793 /// The flow of data in the pipe goes from client to server only.
1794 ///
1795 /// This corresponds to setting [`PIPE_ACCESS_INBOUND`].
1796 ///
1797 /// [`PIPE_ACCESS_INBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_inbound
1798 ///
1799 /// # Errors
1800 ///
1801 /// Server side prevents connecting by denying inbound access, client errors
1802 /// with [`std::io::ErrorKind::PermissionDenied`] when attempting to create
1803 /// the connection.
1804 ///
1805 /// ```
1806 /// use std::io;
1807 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1808 ///
1809 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err1";
1810 ///
1811 /// # #[tokio::main] async fn main() -> io::Result<()> {
1812 /// let _server = ServerOptions::new()
1813 /// .access_inbound(false)
1814 /// .create(PIPE_NAME)?;
1815 ///
1816 /// let e = ClientOptions::new()
1817 /// .open(PIPE_NAME)
1818 /// .unwrap_err();
1819 ///
1820 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1821 /// # Ok(()) }
1822 /// ```
1823 ///
1824 /// Disabling writing allows a client to connect, but errors with
1825 /// [`std::io::ErrorKind::PermissionDenied`] if a write is attempted.
1826 ///
1827 /// ```
1828 /// use std::io;
1829 /// use tokio::io::AsyncWriteExt;
1830 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1831 ///
1832 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err2";
1833 ///
1834 /// # #[tokio::main] async fn main() -> io::Result<()> {
1835 /// let server = ServerOptions::new()
1836 /// .access_inbound(false)
1837 /// .create(PIPE_NAME)?;
1838 ///
1839 /// let mut client = ClientOptions::new()
1840 /// .write(false)
1841 /// .open(PIPE_NAME)?;
1842 ///
1843 /// server.connect().await?;
1844 ///
1845 /// let e = client.write(b"ping").await.unwrap_err();
1846 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1847 /// # Ok(()) }
1848 /// ```
1849 ///
1850 /// # Examples
1851 ///
1852 /// A unidirectional named pipe that only supports server-to-client
1853 /// communication.
1854 ///
1855 /// ```
1856 /// use std::io;
1857 /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1858 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1859 ///
1860 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound";
1861 ///
1862 /// # #[tokio::main] async fn main() -> io::Result<()> {
1863 /// let mut server = ServerOptions::new()
1864 /// .access_inbound(false)
1865 /// .create(PIPE_NAME)?;
1866 ///
1867 /// let mut client = ClientOptions::new()
1868 /// .write(false)
1869 /// .open(PIPE_NAME)?;
1870 ///
1871 /// server.connect().await?;
1872 ///
1873 /// let write = server.write_all(b"ping");
1874 ///
1875 /// let mut buf = [0u8; 4];
1876 /// let read = client.read_exact(&mut buf);
1877 ///
1878 /// let ((), read) = tokio::try_join!(write, read)?;
1879 ///
1880 /// assert_eq!(read, 4);
1881 /// assert_eq!(&buf[..], b"ping");
1882 /// # Ok(()) }
1883 /// ```
1884 pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
1885 self.access_inbound = allowed;
1886 self
1887 }
1888
1889 /// The flow of data in the pipe goes from server to client only.
1890 ///
1891 /// This corresponds to setting [`PIPE_ACCESS_OUTBOUND`].
1892 ///
1893 /// [`PIPE_ACCESS_OUTBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_outbound
1894 ///
1895 /// # Errors
1896 ///
1897 /// Server side prevents connecting by denying outbound access, client
1898 /// errors with [`std::io::ErrorKind::PermissionDenied`] when attempting to
1899 /// create the connection.
1900 ///
1901 /// ```
1902 /// use std::io;
1903 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1904 ///
1905 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err1";
1906 ///
1907 /// # #[tokio::main] async fn main() -> io::Result<()> {
1908 /// let server = ServerOptions::new()
1909 /// .access_outbound(false)
1910 /// .create(PIPE_NAME)?;
1911 ///
1912 /// let e = ClientOptions::new()
1913 /// .open(PIPE_NAME)
1914 /// .unwrap_err();
1915 ///
1916 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1917 /// # Ok(()) }
1918 /// ```
1919 ///
1920 /// Disabling reading allows a client to connect, but attempting to read
1921 /// will error with [`std::io::ErrorKind::PermissionDenied`].
1922 ///
1923 /// ```
1924 /// use std::io;
1925 /// use tokio::io::AsyncReadExt;
1926 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1927 ///
1928 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err2";
1929 ///
1930 /// # #[tokio::main] async fn main() -> io::Result<()> {
1931 /// let server = ServerOptions::new()
1932 /// .access_outbound(false)
1933 /// .create(PIPE_NAME)?;
1934 ///
1935 /// let mut client = ClientOptions::new()
1936 /// .read(false)
1937 /// .open(PIPE_NAME)?;
1938 ///
1939 /// server.connect().await?;
1940 ///
1941 /// let mut buf = [0u8; 4];
1942 /// let e = client.read(&mut buf).await.unwrap_err();
1943 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1944 /// # Ok(()) }
1945 /// ```
1946 ///
1947 /// # Examples
1948 ///
1949 /// A unidirectional named pipe that only supports client-to-server
1950 /// communication.
1951 ///
1952 /// ```
1953 /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1954 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1955 ///
1956 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound";
1957 ///
1958 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1959 /// let mut server = ServerOptions::new()
1960 /// .access_outbound(false)
1961 /// .create(PIPE_NAME)?;
1962 ///
1963 /// let mut client = ClientOptions::new()
1964 /// .read(false)
1965 /// .open(PIPE_NAME)?;
1966 ///
1967 /// server.connect().await?;
1968 ///
1969 /// let write = client.write_all(b"ping");
1970 ///
1971 /// let mut buf = [0u8; 4];
1972 /// let read = server.read_exact(&mut buf);
1973 ///
1974 /// let ((), read) = tokio::try_join!(write, read)?;
1975 ///
1976 /// println!("done reading and writing");
1977 ///
1978 /// assert_eq!(read, 4);
1979 /// assert_eq!(&buf[..], b"ping");
1980 /// # Ok(()) }
1981 /// ```
1982 pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
1983 self.access_outbound = allowed;
1984 self
1985 }
1986
1987 /// If you attempt to create multiple instances of a pipe with this flag
1988 /// set, creation of the first server instance succeeds, but creation of any
1989 /// subsequent instances will fail with
1990 /// [`std::io::ErrorKind::PermissionDenied`].
1991 ///
1992 /// This option is intended to be used with servers that want to ensure that
1993 /// they are the only process listening for clients on a given named pipe.
1994 /// This is accomplished by enabling it for the first server instance
1995 /// created in a process.
1996 ///
1997 /// This corresponds to setting [`FILE_FLAG_FIRST_PIPE_INSTANCE`].
1998 ///
1999 /// # Errors
2000 ///
2001 /// If this option is set and more than one instance of the server for a
2002 /// given named pipe exists, calling [`create`] will fail with
2003 /// [`std::io::ErrorKind::PermissionDenied`].
2004 ///
2005 /// ```
2006 /// use std::io;
2007 /// use tokio::net::windows::named_pipe::ServerOptions;
2008 ///
2009 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance-error";
2010 ///
2011 /// # #[tokio::main] async fn main() -> io::Result<()> {
2012 /// let server1 = ServerOptions::new()
2013 /// .first_pipe_instance(true)
2014 /// .create(PIPE_NAME)?;
2015 ///
2016 /// // Second server errs, since it's not the first instance.
2017 /// let e = ServerOptions::new()
2018 /// .first_pipe_instance(true)
2019 /// .create(PIPE_NAME)
2020 /// .unwrap_err();
2021 ///
2022 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2023 /// # Ok(()) }
2024 /// ```
2025 ///
2026 /// # Examples
2027 ///
2028 /// ```
2029 /// use std::io;
2030 /// use tokio::net::windows::named_pipe::ServerOptions;
2031 ///
2032 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance";
2033 ///
2034 /// # #[tokio::main] async fn main() -> io::Result<()> {
2035 /// let mut builder = ServerOptions::new();
2036 /// builder.first_pipe_instance(true);
2037 ///
2038 /// let server = builder.create(PIPE_NAME)?;
2039 /// let e = builder.create(PIPE_NAME).unwrap_err();
2040 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2041 /// drop(server);
2042 ///
2043 /// // OK: since, we've closed the other instance.
2044 /// let _server2 = builder.create(PIPE_NAME)?;
2045 /// # Ok(()) }
2046 /// ```
2047 ///
2048 /// [`create`]: ServerOptions::create
2049 /// [`FILE_FLAG_FIRST_PIPE_INSTANCE`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_first_pipe_instance
2050 pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
2051 self.first_pipe_instance = first;
2052 self
2053 }
2054
2055 /// Requests permission to modify the pipe's discretionary access control list.
2056 ///
2057 /// This corresponds to setting [`WRITE_DAC`] in dwOpenMode.
2058 ///
2059 /// # Examples
2060 ///
2061 /// ```
2062 /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2063 ///
2064 /// use tokio::net::windows::named_pipe::ServerOptions;
2065 /// use windows_sys::{
2066 /// Win32::Foundation::ERROR_SUCCESS,
2067 /// Win32::Security::DACL_SECURITY_INFORMATION,
2068 /// Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2069 /// };
2070 ///
2071 /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe";
2072 ///
2073 /// # #[tokio::main] async fn main() -> io::Result<()> {
2074 /// let mut pipe_template = ServerOptions::new();
2075 /// pipe_template.write_dac(true);
2076 /// let pipe = pipe_template.create(PIPE_NAME)?;
2077 ///
2078 /// unsafe {
2079 /// assert_eq!(
2080 /// ERROR_SUCCESS,
2081 /// SetSecurityInfo(
2082 /// pipe.as_raw_handle() as _,
2083 /// SE_KERNEL_OBJECT,
2084 /// DACL_SECURITY_INFORMATION,
2085 /// ptr::null_mut(),
2086 /// ptr::null_mut(),
2087 /// ptr::null_mut(),
2088 /// ptr::null_mut(),
2089 /// )
2090 /// );
2091 /// }
2092 ///
2093 /// # Ok(()) }
2094 /// ```
2095 ///
2096 /// ```
2097 /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2098 ///
2099 /// use tokio::net::windows::named_pipe::ServerOptions;
2100 /// use windows_sys::{
2101 /// Win32::Foundation::ERROR_ACCESS_DENIED,
2102 /// Win32::Security::DACL_SECURITY_INFORMATION,
2103 /// Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2104 /// };
2105 ///
2106 /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail";
2107 ///
2108 /// # #[tokio::main] async fn main() -> io::Result<()> {
2109 /// let mut pipe_template = ServerOptions::new();
2110 /// pipe_template.write_dac(false);
2111 /// let pipe = pipe_template.create(PIPE_NAME)?;
2112 ///
2113 /// unsafe {
2114 /// assert_eq!(
2115 /// ERROR_ACCESS_DENIED,
2116 /// SetSecurityInfo(
2117 /// pipe.as_raw_handle() as _,
2118 /// SE_KERNEL_OBJECT,
2119 /// DACL_SECURITY_INFORMATION,
2120 /// ptr::null_mut(),
2121 /// ptr::null_mut(),
2122 /// ptr::null_mut(),
2123 /// ptr::null_mut(),
2124 /// )
2125 /// );
2126 /// }
2127 ///
2128 /// # Ok(()) }
2129 /// ```
2130 ///
2131 /// [`WRITE_DAC`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2132 pub fn write_dac(&mut self, requested: bool) -> &mut Self {
2133 self.write_dac = requested;
2134 self
2135 }
2136
2137 /// Requests permission to modify the pipe's owner.
2138 ///
2139 /// This corresponds to setting [`WRITE_OWNER`] in dwOpenMode.
2140 ///
2141 /// [`WRITE_OWNER`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2142 pub fn write_owner(&mut self, requested: bool) -> &mut Self {
2143 self.write_owner = requested;
2144 self
2145 }
2146
2147 /// Requests permission to modify the pipe's system access control list.
2148 ///
2149 /// This corresponds to setting [`ACCESS_SYSTEM_SECURITY`] in dwOpenMode.
2150 ///
2151 /// [`ACCESS_SYSTEM_SECURITY`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2152 pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
2153 self.access_system_security = requested;
2154 self
2155 }
2156
2157 /// Indicates whether this server can accept remote clients or not. Remote
2158 /// clients are disabled by default.
2159 ///
2160 /// This corresponds to setting [`PIPE_REJECT_REMOTE_CLIENTS`].
2161 ///
2162 /// [`PIPE_REJECT_REMOTE_CLIENTS`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_reject_remote_clients
2163 pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
2164 self.reject_remote_clients = reject;
2165 self
2166 }
2167
2168 /// The maximum number of instances that can be created for this pipe. The
2169 /// first instance of the pipe can specify this value; the same number must
2170 /// be specified for other instances of the pipe. Acceptable values are in
2171 /// the range 1 through 254. The default value is unlimited.
2172 ///
2173 /// This corresponds to specifying [`nMaxInstances`].
2174 ///
2175 /// [`nMaxInstances`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2176 ///
2177 /// # Errors
2178 ///
2179 /// The same numbers of `max_instances` have to be used by all servers. Any
2180 /// additional servers trying to be built which uses a mismatching value
2181 /// might error.
2182 ///
2183 /// ```
2184 /// use std::io;
2185 /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2186 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2187 ///
2188 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-max-instances";
2189 ///
2190 /// # #[tokio::main] async fn main() -> io::Result<()> {
2191 /// let mut server = ServerOptions::new();
2192 /// server.max_instances(2);
2193 ///
2194 /// let s1 = server.create(PIPE_NAME)?;
2195 /// let c1 = ClientOptions::new().open(PIPE_NAME);
2196 ///
2197 /// let s2 = server.create(PIPE_NAME)?;
2198 /// let c2 = ClientOptions::new().open(PIPE_NAME);
2199 ///
2200 /// // Too many servers!
2201 /// let e = server.create(PIPE_NAME).unwrap_err();
2202 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2203 ///
2204 /// // Still too many servers even if we specify a higher value!
2205 /// let e = server.max_instances(100).create(PIPE_NAME).unwrap_err();
2206 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2207 /// # Ok(()) }
2208 /// ```
2209 ///
2210 /// # Panics
2211 ///
2212 /// This function will panic if more than 254 instances are specified. If
2213 /// you do not wish to set an instance limit, leave it unspecified.
2214 ///
2215 /// ```should_panic
2216 /// use tokio::net::windows::named_pipe::ServerOptions;
2217 ///
2218 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2219 /// let builder = ServerOptions::new().max_instances(255);
2220 /// # Ok(()) }
2221 /// ```
2222 #[track_caller]
2223 pub fn max_instances(&mut self, instances: usize) -> &mut Self {
2224 assert!(instances < 255, "cannot specify more than 254 instances");
2225 self.max_instances = instances as u32;
2226 self
2227 }
2228
2229 /// The number of bytes to reserve for the output buffer.
2230 ///
2231 /// This corresponds to specifying [`nOutBufferSize`].
2232 ///
2233 /// [`nOutBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2234 pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
2235 self.out_buffer_size = buffer;
2236 self
2237 }
2238
2239 /// The number of bytes to reserve for the input buffer.
2240 ///
2241 /// This corresponds to specifying [`nInBufferSize`].
2242 ///
2243 /// [`nInBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2244 pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
2245 self.in_buffer_size = buffer;
2246 self
2247 }
2248
2249 /// Creates the named pipe identified by `addr` for use as a server.
2250 ///
2251 /// This uses the [`CreateNamedPipe`] function.
2252 ///
2253 /// [`CreateNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2254 ///
2255 /// # Errors
2256 ///
2257 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2258 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2259 ///
2260 /// [Tokio Runtime]: crate::runtime::Runtime
2261 /// [enabled I/O]: crate::runtime::Builder::enable_io
2262 ///
2263 /// # Examples
2264 ///
2265 /// ```
2266 /// use tokio::net::windows::named_pipe::ServerOptions;
2267 ///
2268 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-create";
2269 ///
2270 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2271 /// let server = ServerOptions::new().create(PIPE_NAME)?;
2272 /// # Ok(()) }
2273 /// ```
2274 pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
2275 // Safety: We're calling create_with_security_attributes_raw w/ a null
2276 // pointer which disables it.
2277 unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) }
2278 }
2279
2280 /// Creates the named pipe identified by `addr` for use as a server.
2281 ///
2282 /// This is the same as [`create`] except that it supports providing the raw
2283 /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2284 /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2285 ///
2286 /// # Errors
2287 ///
2288 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2289 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2290 ///
2291 /// [Tokio Runtime]: crate::runtime::Runtime
2292 /// [enabled I/O]: crate::runtime::Builder::enable_io
2293 ///
2294 /// # Safety
2295 ///
2296 /// The `attrs` argument must either be null or point at a valid instance of
2297 /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2298 /// behavior is identical to calling the [`create`] method.
2299 ///
2300 /// [`create`]: ServerOptions::create
2301 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2302 /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
2303 pub unsafe fn create_with_security_attributes_raw(
2304 &self,
2305 addr: impl AsRef<OsStr>,
2306 attrs: *mut c_void,
2307 ) -> io::Result<NamedPipeServer> {
2308 let addr = encode_addr(addr);
2309
2310 let pipe_mode = {
2311 let mut mode = if matches!(self.pipe_mode, PipeMode::Message) {
2312 windows_sys::PIPE_TYPE_MESSAGE | windows_sys::PIPE_READMODE_MESSAGE
2313 } else {
2314 windows_sys::PIPE_TYPE_BYTE | windows_sys::PIPE_READMODE_BYTE
2315 };
2316 if self.reject_remote_clients {
2317 mode |= windows_sys::PIPE_REJECT_REMOTE_CLIENTS;
2318 } else {
2319 mode |= windows_sys::PIPE_ACCEPT_REMOTE_CLIENTS;
2320 }
2321 mode
2322 };
2323 let open_mode = {
2324 let mut mode = windows_sys::FILE_FLAG_OVERLAPPED;
2325 if self.access_inbound {
2326 mode |= windows_sys::PIPE_ACCESS_INBOUND;
2327 }
2328 if self.access_outbound {
2329 mode |= windows_sys::PIPE_ACCESS_OUTBOUND;
2330 }
2331 if self.first_pipe_instance {
2332 mode |= windows_sys::FILE_FLAG_FIRST_PIPE_INSTANCE;
2333 }
2334 if self.write_dac {
2335 mode |= windows_sys::WRITE_DAC;
2336 }
2337 if self.write_owner {
2338 mode |= windows_sys::WRITE_OWNER;
2339 }
2340 if self.access_system_security {
2341 mode |= windows_sys::ACCESS_SYSTEM_SECURITY;
2342 }
2343 mode
2344 };
2345
2346 let h = windows_sys::CreateNamedPipeW(
2347 addr.as_ptr(),
2348 open_mode,
2349 pipe_mode,
2350 self.max_instances,
2351 self.out_buffer_size,
2352 self.in_buffer_size,
2353 self.default_timeout,
2354 attrs as *mut _,
2355 );
2356
2357 if h == windows_sys::INVALID_HANDLE_VALUE {
2358 return Err(io::Error::last_os_error());
2359 }
2360
2361 NamedPipeServer::from_raw_handle(h as _)
2362 }
2363}
2364
2365/// A builder suitable for building and interacting with named pipes from the
2366/// client side.
2367///
2368/// See [`ClientOptions::open`].
2369#[derive(Debug, Clone)]
2370pub struct ClientOptions {
2371 generic_read: bool,
2372 generic_write: bool,
2373 security_qos_flags: u32,
2374 pipe_mode: PipeMode,
2375}
2376
2377impl ClientOptions {
2378 /// Creates a new named pipe builder with the default settings.
2379 ///
2380 /// ```
2381 /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2382 ///
2383 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-new";
2384 ///
2385 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2386 /// // Server must be created in order for the client creation to succeed.
2387 /// let server = ServerOptions::new().create(PIPE_NAME)?;
2388 /// let client = ClientOptions::new().open(PIPE_NAME)?;
2389 /// # Ok(()) }
2390 /// ```
2391 pub fn new() -> Self {
2392 Self {
2393 generic_read: true,
2394 generic_write: true,
2395 security_qos_flags: windows_sys::SECURITY_IDENTIFICATION
2396 | windows_sys::SECURITY_SQOS_PRESENT,
2397 pipe_mode: PipeMode::Byte,
2398 }
2399 }
2400
2401 /// If the client supports reading data. This is enabled by default.
2402 ///
2403 /// This corresponds to setting [`GENERIC_READ`] in the call to [`CreateFile`].
2404 ///
2405 /// [`GENERIC_READ`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2406 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2407 pub fn read(&mut self, allowed: bool) -> &mut Self {
2408 self.generic_read = allowed;
2409 self
2410 }
2411
2412 /// If the created pipe supports writing data. This is enabled by default.
2413 ///
2414 /// This corresponds to setting [`GENERIC_WRITE`] in the call to [`CreateFile`].
2415 ///
2416 /// [`GENERIC_WRITE`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2417 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2418 pub fn write(&mut self, allowed: bool) -> &mut Self {
2419 self.generic_write = allowed;
2420 self
2421 }
2422
2423 /// Sets qos flags which are combined with other flags and attributes in the
2424 /// call to [`CreateFile`].
2425 ///
2426 /// By default `security_qos_flags` is set to [`SECURITY_IDENTIFICATION`],
2427 /// calling this function would override that value completely with the
2428 /// argument specified.
2429 ///
2430 /// When `security_qos_flags` is not set, a malicious program can gain the
2431 /// elevated privileges of a privileged Rust process when it allows opening
2432 /// user-specified paths, by tricking it into opening a named pipe. So
2433 /// arguably `security_qos_flags` should also be set when opening arbitrary
2434 /// paths. However the bits can then conflict with other flags, specifically
2435 /// `FILE_FLAG_OPEN_NO_RECALL`.
2436 ///
2437 /// For information about possible values, see [Impersonation Levels] on the
2438 /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set
2439 /// automatically when using this method.
2440 ///
2441 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2442 /// [`SECURITY_IDENTIFICATION`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Storage/FileSystem/constant.SECURITY_IDENTIFICATION.html
2443 /// [Impersonation Levels]: https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
2444 pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
2445 // See: https://github.com/rust-lang/rust/pull/58216
2446 self.security_qos_flags = flags | windows_sys::SECURITY_SQOS_PRESENT;
2447 self
2448 }
2449
2450 /// The pipe mode.
2451 ///
2452 /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
2453 /// documentation of what each mode means.
2454 pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
2455 self.pipe_mode = pipe_mode;
2456 self
2457 }
2458
2459 /// Opens the named pipe identified by `addr`.
2460 ///
2461 /// This opens the client using [`CreateFile`] with the
2462 /// `dwCreationDisposition` option set to `OPEN_EXISTING`.
2463 ///
2464 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2465 ///
2466 /// # Errors
2467 ///
2468 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2469 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2470 ///
2471 /// There are a few errors you need to take into account when creating a
2472 /// named pipe on the client side:
2473 ///
2474 /// * [`std::io::ErrorKind::NotFound`] - This indicates that the named pipe
2475 /// does not exist. Presumably the server is not up.
2476 /// * [`ERROR_PIPE_BUSY`] - This error is raised when the named pipe exists,
2477 /// but the server is not currently waiting for a connection. Please see the
2478 /// examples for how to check for this error.
2479 ///
2480 /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
2481 /// [enabled I/O]: crate::runtime::Builder::enable_io
2482 /// [Tokio Runtime]: crate::runtime::Runtime
2483 ///
2484 /// A connect loop that waits until a pipe becomes available looks like
2485 /// this:
2486 ///
2487 /// ```no_run
2488 /// use std::time::Duration;
2489 /// use tokio::net::windows::named_pipe::ClientOptions;
2490 /// use tokio::time;
2491 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2492 ///
2493 /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
2494 ///
2495 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2496 /// let client = loop {
2497 /// match ClientOptions::new().open(PIPE_NAME) {
2498 /// Ok(client) => break client,
2499 /// Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
2500 /// Err(e) => return Err(e),
2501 /// }
2502 ///
2503 /// time::sleep(Duration::from_millis(50)).await;
2504 /// };
2505 ///
2506 /// // use the connected client.
2507 /// # Ok(()) }
2508 /// ```
2509 pub fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
2510 // Safety: We're calling open_with_security_attributes_raw w/ a null
2511 // pointer which disables it.
2512 unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) }
2513 }
2514
2515 /// Opens the named pipe identified by `addr`.
2516 ///
2517 /// This is the same as [`open`] except that it supports providing the raw
2518 /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2519 /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2520 ///
2521 /// # Safety
2522 ///
2523 /// The `attrs` argument must either be null or point at a valid instance of
2524 /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2525 /// behavior is identical to calling the [`open`] method.
2526 ///
2527 /// [`open`]: ClientOptions::open
2528 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2529 /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
2530 pub unsafe fn open_with_security_attributes_raw(
2531 &self,
2532 addr: impl AsRef<OsStr>,
2533 attrs: *mut c_void,
2534 ) -> io::Result<NamedPipeClient> {
2535 let addr = encode_addr(addr);
2536
2537 let desired_access = {
2538 let mut access = 0;
2539 if self.generic_read {
2540 access |= windows_sys::GENERIC_READ;
2541 }
2542 if self.generic_write {
2543 access |= windows_sys::GENERIC_WRITE;
2544 }
2545 access
2546 };
2547
2548 // NB: We could use a platform specialized `OpenOptions` here, but since
2549 // we have access to windows_sys it ultimately doesn't hurt to use
2550 // `CreateFile` explicitly since it allows the use of our already
2551 // well-structured wide `addr` to pass into CreateFileW.
2552 let h = windows_sys::CreateFileW(
2553 addr.as_ptr(),
2554 desired_access,
2555 0,
2556 attrs as *mut _,
2557 windows_sys::OPEN_EXISTING,
2558 self.get_flags(),
2559 0,
2560 );
2561
2562 if h == windows_sys::INVALID_HANDLE_VALUE {
2563 return Err(io::Error::last_os_error());
2564 }
2565
2566 if matches!(self.pipe_mode, PipeMode::Message) {
2567 let mode = windows_sys::PIPE_READMODE_MESSAGE;
2568 let result =
2569 windows_sys::SetNamedPipeHandleState(h, &mode, ptr::null_mut(), ptr::null_mut());
2570
2571 if result == 0 {
2572 return Err(io::Error::last_os_error());
2573 }
2574 }
2575
2576 NamedPipeClient::from_raw_handle(h as _)
2577 }
2578
2579 fn get_flags(&self) -> u32 {
2580 self.security_qos_flags | windows_sys::FILE_FLAG_OVERLAPPED
2581 }
2582}
2583
2584/// The pipe mode of a named pipe.
2585///
2586/// Set through [`ServerOptions::pipe_mode`].
2587#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2588#[non_exhaustive]
2589pub enum PipeMode {
2590 /// Data is written to the pipe as a stream of bytes. The pipe does not
2591 /// distinguish bytes written during different write operations.
2592 ///
2593 /// Corresponds to [`PIPE_TYPE_BYTE`].
2594 ///
2595 /// [`PIPE_TYPE_BYTE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_BYTE.html
2596 Byte,
2597 /// Data is written to the pipe as a stream of messages. The pipe treats the
2598 /// bytes written during each write operation as a message unit. Any reading
2599 /// on a named pipe returns [`ERROR_MORE_DATA`] when a message is not read
2600 /// completely.
2601 ///
2602 /// Corresponds to [`PIPE_TYPE_MESSAGE`].
2603 ///
2604 /// [`ERROR_MORE_DATA`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_MORE_DATA.html
2605 /// [`PIPE_TYPE_MESSAGE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_MESSAGE.html
2606 Message,
2607}
2608
2609/// Indicates the end of a named pipe.
2610#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2611#[non_exhaustive]
2612pub enum PipeEnd {
2613 /// The named pipe refers to the client end of a named pipe instance.
2614 ///
2615 /// Corresponds to [`PIPE_CLIENT_END`].
2616 ///
2617 /// [`PIPE_CLIENT_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_CLIENT_END.html
2618 Client,
2619 /// The named pipe refers to the server end of a named pipe instance.
2620 ///
2621 /// Corresponds to [`PIPE_SERVER_END`].
2622 ///
2623 /// [`PIPE_SERVER_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_SERVER_END.html
2624 Server,
2625}
2626
2627/// Information about a named pipe.
2628///
2629/// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`].
2630#[derive(Debug, Clone)]
2631#[non_exhaustive]
2632pub struct PipeInfo {
2633 /// Indicates the mode of a named pipe.
2634 pub mode: PipeMode,
2635 /// Indicates the end of a named pipe.
2636 pub end: PipeEnd,
2637 /// The maximum number of instances that can be created for this pipe.
2638 pub max_instances: u32,
2639 /// The number of bytes to reserve for the output buffer.
2640 pub out_buffer_size: u32,
2641 /// The number of bytes to reserve for the input buffer.
2642 pub in_buffer_size: u32,
2643}
2644
2645/// Encodes an address so that it is a null-terminated wide string.
2646fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> {
2647 let len = addr.as_ref().encode_wide().count();
2648 let mut vec = Vec::with_capacity(len + 1);
2649 vec.extend(addr.as_ref().encode_wide());
2650 vec.push(0);
2651 vec.into_boxed_slice()
2652}
2653
2654/// Internal function to get the info out of a raw named pipe.
2655unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {
2656 let mut flags = 0;
2657 let mut out_buffer_size = 0;
2658 let mut in_buffer_size = 0;
2659 let mut max_instances = 0;
2660
2661 let result = windows_sys::GetNamedPipeInfo(
2662 handle as _,
2663 &mut flags,
2664 &mut out_buffer_size,
2665 &mut in_buffer_size,
2666 &mut max_instances,
2667 );
2668
2669 if result == 0 {
2670 return Err(io::Error::last_os_error());
2671 }
2672
2673 let mut end = PipeEnd::Client;
2674 let mut mode = PipeMode::Byte;
2675
2676 if flags & windows_sys::PIPE_SERVER_END != 0 {
2677 end = PipeEnd::Server;
2678 }
2679
2680 if flags & windows_sys::PIPE_TYPE_MESSAGE != 0 {
2681 mode = PipeMode::Message;
2682 }
2683
2684 Ok(PipeInfo {
2685 end,
2686 mode,
2687 out_buffer_size,
2688 in_buffer_size,
2689 max_instances,
2690 })
2691}