tokio/fs/
file.rs

1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use crate::fs::{asyncify, OpenOptions};
6use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8use crate::sync::Mutex;
9
10use std::fmt;
11use std::fs::{Metadata, Permissions};
12use std::future::Future;
13use std::io::{self, Seek, SeekFrom};
14use std::path::Path;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{ready, Context, Poll};
18
19#[cfg(test)]
20use super::mocks::JoinHandle;
21#[cfg(test)]
22use super::mocks::MockFile as StdFile;
23#[cfg(test)]
24use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
25#[cfg(not(test))]
26use crate::blocking::JoinHandle;
27#[cfg(not(test))]
28use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
29#[cfg(not(test))]
30use std::fs::File as StdFile;
31
32/// A reference to an open file on the filesystem.
33///
34/// This is a specialized version of [`std::fs::File`] for usage from the
35/// Tokio runtime.
36///
37/// An instance of a `File` can be read and/or written depending on what options
38/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
39/// cursor that the file contains internally.
40///
41/// A file will not be closed immediately when it goes out of scope if there
42/// are any IO operations that have not yet completed. To ensure that a file is
43/// closed immediately when it is dropped, you should call [`flush`] before
44/// dropping it. Note that this does not ensure that the file has been fully
45/// written to disk; the operating system might keep the changes around in an
46/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
47/// the data to disk.
48///
49/// Reading and writing to a `File` is usually done using the convenience
50/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
51///
52/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
53/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
54/// [`sync_all`]: fn@crate::fs::File::sync_all
55/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
56/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
57///
58/// # Examples
59///
60/// Create a new file and asynchronously write bytes to it:
61///
62/// ```no_run
63/// use tokio::fs::File;
64/// use tokio::io::AsyncWriteExt; // for write_all()
65///
66/// # async fn dox() -> std::io::Result<()> {
67/// let mut file = File::create("foo.txt").await?;
68/// file.write_all(b"hello, world!").await?;
69/// # Ok(())
70/// # }
71/// ```
72///
73/// Read the contents of a file into a buffer:
74///
75/// ```no_run
76/// use tokio::fs::File;
77/// use tokio::io::AsyncReadExt; // for read_to_end()
78///
79/// # async fn dox() -> std::io::Result<()> {
80/// let mut file = File::open("foo.txt").await?;
81///
82/// let mut contents = vec![];
83/// file.read_to_end(&mut contents).await?;
84///
85/// println!("len = {}", contents.len());
86/// # Ok(())
87/// # }
88/// ```
89pub struct File {
90    std: Arc<StdFile>,
91    inner: Mutex<Inner>,
92    max_buf_size: usize,
93}
94
95struct Inner {
96    state: State,
97
98    /// Errors from writes/flushes are returned in write/flush calls. If a write
99    /// error is observed while performing a read, it is saved until the next
100    /// write / flush call.
101    last_write_err: Option<io::ErrorKind>,
102
103    pos: u64,
104}
105
106#[derive(Debug)]
107enum State {
108    Idle(Option<Buf>),
109    Busy(JoinHandle<(Operation, Buf)>),
110}
111
112#[derive(Debug)]
113enum Operation {
114    Read(io::Result<usize>),
115    Write(io::Result<()>),
116    Seek(io::Result<u64>),
117}
118
119impl File {
120    /// Attempts to open a file in read-only mode.
121    ///
122    /// See [`OpenOptions`] for more details.
123    ///
124    /// # Errors
125    ///
126    /// This function will return an error if called from outside of the Tokio
127    /// runtime or if path does not already exist. Other errors may also be
128    /// returned according to `OpenOptions::open`.
129    ///
130    /// # Examples
131    ///
132    /// ```no_run
133    /// use tokio::fs::File;
134    /// use tokio::io::AsyncReadExt;
135    ///
136    /// # async fn dox() -> std::io::Result<()> {
137    /// let mut file = File::open("foo.txt").await?;
138    ///
139    /// let mut contents = vec![];
140    /// file.read_to_end(&mut contents).await?;
141    ///
142    /// println!("len = {}", contents.len());
143    /// # Ok(())
144    /// # }
145    /// ```
146    ///
147    /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
148    ///
149    /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
150    /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
151    pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
152        let path = path.as_ref().to_owned();
153        let std = asyncify(|| StdFile::open(path)).await?;
154
155        Ok(File::from_std(std))
156    }
157
158    /// Opens a file in write-only mode.
159    ///
160    /// This function will create a file if it does not exist, and will truncate
161    /// it if it does.
162    ///
163    /// See [`OpenOptions`] for more details.
164    ///
165    /// # Errors
166    ///
167    /// Results in an error if called from outside of the Tokio runtime or if
168    /// the underlying [`create`] call results in an error.
169    ///
170    /// [`create`]: std::fs::File::create
171    ///
172    /// # Examples
173    ///
174    /// ```no_run
175    /// use tokio::fs::File;
176    /// use tokio::io::AsyncWriteExt;
177    ///
178    /// # async fn dox() -> std::io::Result<()> {
179    /// let mut file = File::create("foo.txt").await?;
180    /// file.write_all(b"hello, world!").await?;
181    /// # Ok(())
182    /// # }
183    /// ```
184    ///
185    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
186    ///
187    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
188    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
189    pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
190        let path = path.as_ref().to_owned();
191        let std_file = asyncify(move || StdFile::create(path)).await?;
192        Ok(File::from_std(std_file))
193    }
194
195    /// Opens a file in read-write mode.
196    ///
197    /// This function will create a file if it does not exist, or return an error
198    /// if it does. This way, if the call succeeds, the file returned is guaranteed
199    /// to be new.
200    ///
201    /// This option is useful because it is atomic. Otherwise between checking
202    /// whether a file exists and creating a new one, the file may have been
203    /// created by another process (a TOCTOU race condition / attack).
204    ///
205    /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`.
206    ///
207    /// See [`OpenOptions`] for more details.
208    ///
209    /// # Examples
210    ///
211    /// ```no_run
212    /// use tokio::fs::File;
213    /// use tokio::io::AsyncWriteExt;
214    ///
215    /// # async fn dox() -> std::io::Result<()> {
216    /// let mut file = File::create_new("foo.txt").await?;
217    /// file.write_all(b"hello, world!").await?;
218    /// # Ok(())
219    /// # }
220    /// ```
221    ///
222    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
223    ///
224    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
225    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
226    pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
227        Self::options()
228            .read(true)
229            .write(true)
230            .create_new(true)
231            .open(path)
232            .await
233    }
234
235    /// Returns a new [`OpenOptions`] object.
236    ///
237    /// This function returns a new `OpenOptions` object that you can use to
238    /// open or create a file with specific options if `open()` or `create()`
239    /// are not appropriate.
240    ///
241    /// It is equivalent to `OpenOptions::new()`, but allows you to write more
242    /// readable code. Instead of
243    /// `OpenOptions::new().append(true).open("example.log")`,
244    /// you can write `File::options().append(true).open("example.log")`. This
245    /// also avoids the need to import `OpenOptions`.
246    ///
247    /// See the [`OpenOptions::new`] function for more details.
248    ///
249    /// # Examples
250    ///
251    /// ```no_run
252    /// use tokio::fs::File;
253    /// use tokio::io::AsyncWriteExt;
254    ///
255    /// # async fn dox() -> std::io::Result<()> {
256    /// let mut f = File::options().append(true).open("example.log").await?;
257    /// f.write_all(b"new line\n").await?;
258    /// # Ok(())
259    /// # }
260    /// ```
261    #[must_use]
262    pub fn options() -> OpenOptions {
263        OpenOptions::new()
264    }
265
266    /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
267    ///
268    /// # Examples
269    ///
270    /// ```no_run
271    /// // This line could block. It is not recommended to do this on the Tokio
272    /// // runtime.
273    /// let std_file = std::fs::File::open("foo.txt").unwrap();
274    /// let file = tokio::fs::File::from_std(std_file);
275    /// ```
276    pub fn from_std(std: StdFile) -> File {
277        File {
278            std: Arc::new(std),
279            inner: Mutex::new(Inner {
280                state: State::Idle(Some(Buf::with_capacity(0))),
281                last_write_err: None,
282                pos: 0,
283            }),
284            max_buf_size: DEFAULT_MAX_BUF_SIZE,
285        }
286    }
287
288    /// Attempts to sync all OS-internal metadata to disk.
289    ///
290    /// This function will attempt to ensure that all in-core data reaches the
291    /// filesystem before returning.
292    ///
293    /// # Examples
294    ///
295    /// ```no_run
296    /// use tokio::fs::File;
297    /// use tokio::io::AsyncWriteExt;
298    ///
299    /// # async fn dox() -> std::io::Result<()> {
300    /// let mut file = File::create("foo.txt").await?;
301    /// file.write_all(b"hello, world!").await?;
302    /// file.sync_all().await?;
303    /// # Ok(())
304    /// # }
305    /// ```
306    ///
307    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
308    ///
309    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
310    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
311    pub async fn sync_all(&self) -> io::Result<()> {
312        let mut inner = self.inner.lock().await;
313        inner.complete_inflight().await;
314
315        let std = self.std.clone();
316        asyncify(move || std.sync_all()).await
317    }
318
319    /// This function is similar to `sync_all`, except that it may not
320    /// synchronize file metadata to the filesystem.
321    ///
322    /// This is intended for use cases that must synchronize content, but don't
323    /// need the metadata on disk. The goal of this method is to reduce disk
324    /// operations.
325    ///
326    /// Note that some platforms may simply implement this in terms of `sync_all`.
327    ///
328    /// # Examples
329    ///
330    /// ```no_run
331    /// use tokio::fs::File;
332    /// use tokio::io::AsyncWriteExt;
333    ///
334    /// # async fn dox() -> std::io::Result<()> {
335    /// let mut file = File::create("foo.txt").await?;
336    /// file.write_all(b"hello, world!").await?;
337    /// file.sync_data().await?;
338    /// # Ok(())
339    /// # }
340    /// ```
341    ///
342    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
343    ///
344    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
345    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
346    pub async fn sync_data(&self) -> io::Result<()> {
347        let mut inner = self.inner.lock().await;
348        inner.complete_inflight().await;
349
350        let std = self.std.clone();
351        asyncify(move || std.sync_data()).await
352    }
353
354    /// Truncates or extends the underlying file, updating the size of this file to become size.
355    ///
356    /// If the size is less than the current file's size, then the file will be
357    /// shrunk. If it is greater than the current file's size, then the file
358    /// will be extended to size and have all of the intermediate data filled in
359    /// with 0s.
360    ///
361    /// # Errors
362    ///
363    /// This function will return an error if the file is not opened for
364    /// writing.
365    ///
366    /// # Examples
367    ///
368    /// ```no_run
369    /// use tokio::fs::File;
370    /// use tokio::io::AsyncWriteExt;
371    ///
372    /// # async fn dox() -> std::io::Result<()> {
373    /// let mut file = File::create("foo.txt").await?;
374    /// file.write_all(b"hello, world!").await?;
375    /// file.set_len(10).await?;
376    /// # Ok(())
377    /// # }
378    /// ```
379    ///
380    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
381    ///
382    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
383    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
384    pub async fn set_len(&self, size: u64) -> io::Result<()> {
385        let mut inner = self.inner.lock().await;
386        inner.complete_inflight().await;
387
388        let mut buf = match inner.state {
389            State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
390            _ => unreachable!(),
391        };
392
393        let seek = if !buf.is_empty() {
394            Some(SeekFrom::Current(buf.discard_read()))
395        } else {
396            None
397        };
398
399        let std = self.std.clone();
400
401        inner.state = State::Busy(spawn_blocking(move || {
402            let res = if let Some(seek) = seek {
403                (&*std).seek(seek).and_then(|_| std.set_len(size))
404            } else {
405                std.set_len(size)
406            }
407            .map(|()| 0); // the value is discarded later
408
409            // Return the result as a seek
410            (Operation::Seek(res), buf)
411        }));
412
413        let (op, buf) = match inner.state {
414            State::Idle(_) => unreachable!(),
415            State::Busy(ref mut rx) => rx.await?,
416        };
417
418        inner.state = State::Idle(Some(buf));
419
420        match op {
421            Operation::Seek(res) => res.map(|pos| {
422                inner.pos = pos;
423            }),
424            _ => unreachable!(),
425        }
426    }
427
428    /// Queries metadata about the underlying file.
429    ///
430    /// # Examples
431    ///
432    /// ```no_run
433    /// use tokio::fs::File;
434    ///
435    /// # async fn dox() -> std::io::Result<()> {
436    /// let file = File::open("foo.txt").await?;
437    /// let metadata = file.metadata().await?;
438    ///
439    /// println!("{:?}", metadata);
440    /// # Ok(())
441    /// # }
442    /// ```
443    pub async fn metadata(&self) -> io::Result<Metadata> {
444        let std = self.std.clone();
445        asyncify(move || std.metadata()).await
446    }
447
448    /// Creates a new `File` instance that shares the same underlying file handle
449    /// as the existing `File` instance. Reads, writes, and seeks will affect both
450    /// File instances simultaneously.
451    ///
452    /// # Examples
453    ///
454    /// ```no_run
455    /// use tokio::fs::File;
456    ///
457    /// # async fn dox() -> std::io::Result<()> {
458    /// let file = File::open("foo.txt").await?;
459    /// let file_clone = file.try_clone().await?;
460    /// # Ok(())
461    /// # }
462    /// ```
463    pub async fn try_clone(&self) -> io::Result<File> {
464        self.inner.lock().await.complete_inflight().await;
465        let std = self.std.clone();
466        let std_file = asyncify(move || std.try_clone()).await?;
467        Ok(File::from_std(std_file))
468    }
469
470    /// Destructures `File` into a [`std::fs::File`]. This function is
471    /// async to allow any in-flight operations to complete.
472    ///
473    /// Use `File::try_into_std` to attempt conversion immediately.
474    ///
475    /// # Examples
476    ///
477    /// ```no_run
478    /// use tokio::fs::File;
479    ///
480    /// # async fn dox() -> std::io::Result<()> {
481    /// let tokio_file = File::open("foo.txt").await?;
482    /// let std_file = tokio_file.into_std().await;
483    /// # Ok(())
484    /// # }
485    /// ```
486    pub async fn into_std(mut self) -> StdFile {
487        self.inner.get_mut().complete_inflight().await;
488        Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
489    }
490
491    /// Tries to immediately destructure `File` into a [`std::fs::File`].
492    ///
493    /// # Errors
494    ///
495    /// This function will return an error containing the file if some
496    /// operation is in-flight.
497    ///
498    /// # Examples
499    ///
500    /// ```no_run
501    /// use tokio::fs::File;
502    ///
503    /// # async fn dox() -> std::io::Result<()> {
504    /// let tokio_file = File::open("foo.txt").await?;
505    /// let std_file = tokio_file.try_into_std().unwrap();
506    /// # Ok(())
507    /// # }
508    /// ```
509    pub fn try_into_std(mut self) -> Result<StdFile, Self> {
510        match Arc::try_unwrap(self.std) {
511            Ok(file) => Ok(file),
512            Err(std_file_arc) => {
513                self.std = std_file_arc;
514                Err(self)
515            }
516        }
517    }
518
519    /// Changes the permissions on the underlying file.
520    ///
521    /// # Platform-specific behavior
522    ///
523    /// This function currently corresponds to the `fchmod` function on Unix and
524    /// the `SetFileInformationByHandle` function on Windows. Note that, this
525    /// [may change in the future][changes].
526    ///
527    /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
528    ///
529    /// # Errors
530    ///
531    /// This function will return an error if the user lacks permission change
532    /// attributes on the underlying file. It may also return an error in other
533    /// os-specific unspecified cases.
534    ///
535    /// # Examples
536    ///
537    /// ```no_run
538    /// use tokio::fs::File;
539    ///
540    /// # async fn dox() -> std::io::Result<()> {
541    /// let file = File::open("foo.txt").await?;
542    /// let mut perms = file.metadata().await?.permissions();
543    /// perms.set_readonly(true);
544    /// file.set_permissions(perms).await?;
545    /// # Ok(())
546    /// # }
547    /// ```
548    pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
549        let std = self.std.clone();
550        asyncify(move || std.set_permissions(perm)).await
551    }
552
553    /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
554    ///
555    /// Although Tokio uses a sensible default value for this buffer size, this function would be
556    /// useful for changing that default depending on the situation.
557    ///
558    /// # Examples
559    ///
560    /// ```no_run
561    /// use tokio::fs::File;
562    /// use tokio::io::AsyncWriteExt;
563    ///
564    /// # async fn dox() -> std::io::Result<()> {
565    /// let mut file = File::open("foo.txt").await?;
566    ///
567    /// // Set maximum buffer size to 8 MiB
568    /// file.set_max_buf_size(8 * 1024 * 1024);
569    ///
570    /// let mut buf = vec![1; 1024 * 1024 * 1024];
571    ///
572    /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
573    /// file.write_all(&mut buf).await?;
574    /// # Ok(())
575    /// # }
576    /// ```
577    pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
578        self.max_buf_size = max_buf_size;
579    }
580}
581
582impl AsyncRead for File {
583    fn poll_read(
584        self: Pin<&mut Self>,
585        cx: &mut Context<'_>,
586        dst: &mut ReadBuf<'_>,
587    ) -> Poll<io::Result<()>> {
588        ready!(crate::trace::trace_leaf(cx));
589        let me = self.get_mut();
590        let inner = me.inner.get_mut();
591
592        loop {
593            match inner.state {
594                State::Idle(ref mut buf_cell) => {
595                    let mut buf = buf_cell.take().unwrap();
596
597                    if !buf.is_empty() {
598                        buf.copy_to(dst);
599                        *buf_cell = Some(buf);
600                        return Poll::Ready(Ok(()));
601                    }
602
603                    buf.ensure_capacity_for(dst, me.max_buf_size);
604                    let std = me.std.clone();
605
606                    inner.state = State::Busy(spawn_blocking(move || {
607                        let res = buf.read_from(&mut &*std);
608                        (Operation::Read(res), buf)
609                    }));
610                }
611                State::Busy(ref mut rx) => {
612                    let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
613
614                    match op {
615                        Operation::Read(Ok(_)) => {
616                            buf.copy_to(dst);
617                            inner.state = State::Idle(Some(buf));
618                            return Poll::Ready(Ok(()));
619                        }
620                        Operation::Read(Err(e)) => {
621                            assert!(buf.is_empty());
622
623                            inner.state = State::Idle(Some(buf));
624                            return Poll::Ready(Err(e));
625                        }
626                        Operation::Write(Ok(())) => {
627                            assert!(buf.is_empty());
628                            inner.state = State::Idle(Some(buf));
629                            continue;
630                        }
631                        Operation::Write(Err(e)) => {
632                            assert!(inner.last_write_err.is_none());
633                            inner.last_write_err = Some(e.kind());
634                            inner.state = State::Idle(Some(buf));
635                        }
636                        Operation::Seek(result) => {
637                            assert!(buf.is_empty());
638                            inner.state = State::Idle(Some(buf));
639                            if let Ok(pos) = result {
640                                inner.pos = pos;
641                            }
642                            continue;
643                        }
644                    }
645                }
646            }
647        }
648    }
649}
650
651impl AsyncSeek for File {
652    fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
653        let me = self.get_mut();
654        let inner = me.inner.get_mut();
655
656        match inner.state {
657            State::Busy(_) => Err(io::Error::new(
658                io::ErrorKind::Other,
659                "other file operation is pending, call poll_complete before start_seek",
660            )),
661            State::Idle(ref mut buf_cell) => {
662                let mut buf = buf_cell.take().unwrap();
663
664                // Factor in any unread data from the buf
665                if !buf.is_empty() {
666                    let n = buf.discard_read();
667
668                    if let SeekFrom::Current(ref mut offset) = pos {
669                        *offset += n;
670                    }
671                }
672
673                let std = me.std.clone();
674
675                inner.state = State::Busy(spawn_blocking(move || {
676                    let res = (&*std).seek(pos);
677                    (Operation::Seek(res), buf)
678                }));
679                Ok(())
680            }
681        }
682    }
683
684    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
685        ready!(crate::trace::trace_leaf(cx));
686        let inner = self.inner.get_mut();
687
688        loop {
689            match inner.state {
690                State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
691                State::Busy(ref mut rx) => {
692                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
693                    inner.state = State::Idle(Some(buf));
694
695                    match op {
696                        Operation::Read(_) => {}
697                        Operation::Write(Err(e)) => {
698                            assert!(inner.last_write_err.is_none());
699                            inner.last_write_err = Some(e.kind());
700                        }
701                        Operation::Write(_) => {}
702                        Operation::Seek(res) => {
703                            if let Ok(pos) = res {
704                                inner.pos = pos;
705                            }
706                            return Poll::Ready(res);
707                        }
708                    }
709                }
710            }
711        }
712    }
713}
714
715impl AsyncWrite for File {
716    fn poll_write(
717        self: Pin<&mut Self>,
718        cx: &mut Context<'_>,
719        src: &[u8],
720    ) -> Poll<io::Result<usize>> {
721        ready!(crate::trace::trace_leaf(cx));
722        let me = self.get_mut();
723        let inner = me.inner.get_mut();
724
725        if let Some(e) = inner.last_write_err.take() {
726            return Poll::Ready(Err(e.into()));
727        }
728
729        loop {
730            match inner.state {
731                State::Idle(ref mut buf_cell) => {
732                    let mut buf = buf_cell.take().unwrap();
733
734                    let seek = if !buf.is_empty() {
735                        Some(SeekFrom::Current(buf.discard_read()))
736                    } else {
737                        None
738                    };
739
740                    let n = buf.copy_from(src, me.max_buf_size);
741                    let std = me.std.clone();
742
743                    let blocking_task_join_handle = spawn_mandatory_blocking(move || {
744                        let res = if let Some(seek) = seek {
745                            (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
746                        } else {
747                            buf.write_to(&mut &*std)
748                        };
749
750                        (Operation::Write(res), buf)
751                    })
752                    .ok_or_else(|| {
753                        io::Error::new(io::ErrorKind::Other, "background task failed")
754                    })?;
755
756                    inner.state = State::Busy(blocking_task_join_handle);
757
758                    return Poll::Ready(Ok(n));
759                }
760                State::Busy(ref mut rx) => {
761                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
762                    inner.state = State::Idle(Some(buf));
763
764                    match op {
765                        Operation::Read(_) => {
766                            // We don't care about the result here. The fact
767                            // that the cursor has advanced will be reflected in
768                            // the next iteration of the loop
769                            continue;
770                        }
771                        Operation::Write(res) => {
772                            // If the previous write was successful, continue.
773                            // Otherwise, error.
774                            res?;
775                            continue;
776                        }
777                        Operation::Seek(_) => {
778                            // Ignore the seek
779                            continue;
780                        }
781                    }
782                }
783            }
784        }
785    }
786
787    fn poll_write_vectored(
788        self: Pin<&mut Self>,
789        cx: &mut Context<'_>,
790        bufs: &[io::IoSlice<'_>],
791    ) -> Poll<Result<usize, io::Error>> {
792        ready!(crate::trace::trace_leaf(cx));
793        let me = self.get_mut();
794        let inner = me.inner.get_mut();
795
796        if let Some(e) = inner.last_write_err.take() {
797            return Poll::Ready(Err(e.into()));
798        }
799
800        loop {
801            match inner.state {
802                State::Idle(ref mut buf_cell) => {
803                    let mut buf = buf_cell.take().unwrap();
804
805                    let seek = if !buf.is_empty() {
806                        Some(SeekFrom::Current(buf.discard_read()))
807                    } else {
808                        None
809                    };
810
811                    let n = buf.copy_from_bufs(bufs, me.max_buf_size);
812                    let std = me.std.clone();
813
814                    let blocking_task_join_handle = spawn_mandatory_blocking(move || {
815                        let res = if let Some(seek) = seek {
816                            (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
817                        } else {
818                            buf.write_to(&mut &*std)
819                        };
820
821                        (Operation::Write(res), buf)
822                    })
823                    .ok_or_else(|| {
824                        io::Error::new(io::ErrorKind::Other, "background task failed")
825                    })?;
826
827                    inner.state = State::Busy(blocking_task_join_handle);
828
829                    return Poll::Ready(Ok(n));
830                }
831                State::Busy(ref mut rx) => {
832                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
833                    inner.state = State::Idle(Some(buf));
834
835                    match op {
836                        Operation::Read(_) => {
837                            // We don't care about the result here. The fact
838                            // that the cursor has advanced will be reflected in
839                            // the next iteration of the loop
840                            continue;
841                        }
842                        Operation::Write(res) => {
843                            // If the previous write was successful, continue.
844                            // Otherwise, error.
845                            res?;
846                            continue;
847                        }
848                        Operation::Seek(_) => {
849                            // Ignore the seek
850                            continue;
851                        }
852                    }
853                }
854            }
855        }
856    }
857
858    fn is_write_vectored(&self) -> bool {
859        true
860    }
861
862    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
863        ready!(crate::trace::trace_leaf(cx));
864        let inner = self.inner.get_mut();
865        inner.poll_flush(cx)
866    }
867
868    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
869        ready!(crate::trace::trace_leaf(cx));
870        self.poll_flush(cx)
871    }
872}
873
874impl From<StdFile> for File {
875    fn from(std: StdFile) -> Self {
876        Self::from_std(std)
877    }
878}
879
880impl fmt::Debug for File {
881    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
882        fmt.debug_struct("tokio::fs::File")
883            .field("std", &self.std)
884            .finish()
885    }
886}
887
888#[cfg(unix)]
889impl std::os::unix::io::AsRawFd for File {
890    fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
891        self.std.as_raw_fd()
892    }
893}
894
895#[cfg(unix)]
896impl std::os::unix::io::AsFd for File {
897    fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
898        unsafe {
899            std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
900        }
901    }
902}
903
904#[cfg(unix)]
905impl std::os::unix::io::FromRawFd for File {
906    unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
907        StdFile::from_raw_fd(fd).into()
908    }
909}
910
911cfg_windows! {
912    use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
913
914    impl AsRawHandle for File {
915        fn as_raw_handle(&self) -> RawHandle {
916            self.std.as_raw_handle()
917        }
918    }
919
920    impl AsHandle for File {
921        fn as_handle(&self) -> BorrowedHandle<'_> {
922            unsafe {
923                BorrowedHandle::borrow_raw(
924                    AsRawHandle::as_raw_handle(self),
925                )
926            }
927        }
928    }
929
930    impl FromRawHandle for File {
931        unsafe fn from_raw_handle(handle: RawHandle) -> Self {
932            StdFile::from_raw_handle(handle).into()
933        }
934    }
935}
936
937impl Inner {
938    async fn complete_inflight(&mut self) {
939        use std::future::poll_fn;
940
941        poll_fn(|cx| self.poll_complete_inflight(cx)).await;
942    }
943
944    fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
945        ready!(crate::trace::trace_leaf(cx));
946        match self.poll_flush(cx) {
947            Poll::Ready(Err(e)) => {
948                self.last_write_err = Some(e.kind());
949                Poll::Ready(())
950            }
951            Poll::Ready(Ok(())) => Poll::Ready(()),
952            Poll::Pending => Poll::Pending,
953        }
954    }
955
956    fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
957        if let Some(e) = self.last_write_err.take() {
958            return Poll::Ready(Err(e.into()));
959        }
960
961        let (op, buf) = match self.state {
962            State::Idle(_) => return Poll::Ready(Ok(())),
963            State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
964        };
965
966        // The buffer is not used here
967        self.state = State::Idle(Some(buf));
968
969        match op {
970            Operation::Read(_) => Poll::Ready(Ok(())),
971            Operation::Write(res) => Poll::Ready(res),
972            Operation::Seek(_) => Poll::Ready(Ok(())),
973        }
974    }
975}
976
977#[cfg(test)]
978mod tests;