tokio/fs/file.rs
1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use crate::fs::{asyncify, OpenOptions};
6use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8use crate::sync::Mutex;
9
10use std::fmt;
11use std::fs::{Metadata, Permissions};
12use std::future::Future;
13use std::io::{self, Seek, SeekFrom};
14use std::path::Path;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{ready, Context, Poll};
18
19#[cfg(test)]
20use super::mocks::JoinHandle;
21#[cfg(test)]
22use super::mocks::MockFile as StdFile;
23#[cfg(test)]
24use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
25#[cfg(not(test))]
26use crate::blocking::JoinHandle;
27#[cfg(not(test))]
28use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
29#[cfg(not(test))]
30use std::fs::File as StdFile;
31
32/// A reference to an open file on the filesystem.
33///
34/// This is a specialized version of [`std::fs::File`] for usage from the
35/// Tokio runtime.
36///
37/// An instance of a `File` can be read and/or written depending on what options
38/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
39/// cursor that the file contains internally.
40///
41/// A file will not be closed immediately when it goes out of scope if there
42/// are any IO operations that have not yet completed. To ensure that a file is
43/// closed immediately when it is dropped, you should call [`flush`] before
44/// dropping it. Note that this does not ensure that the file has been fully
45/// written to disk; the operating system might keep the changes around in an
46/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
47/// the data to disk.
48///
49/// Reading and writing to a `File` is usually done using the convenience
50/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
51///
52/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
53/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
54/// [`sync_all`]: fn@crate::fs::File::sync_all
55/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
56/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
57///
58/// # Examples
59///
60/// Create a new file and asynchronously write bytes to it:
61///
62/// ```no_run
63/// use tokio::fs::File;
64/// use tokio::io::AsyncWriteExt; // for write_all()
65///
66/// # async fn dox() -> std::io::Result<()> {
67/// let mut file = File::create("foo.txt").await?;
68/// file.write_all(b"hello, world!").await?;
69/// # Ok(())
70/// # }
71/// ```
72///
73/// Read the contents of a file into a buffer:
74///
75/// ```no_run
76/// use tokio::fs::File;
77/// use tokio::io::AsyncReadExt; // for read_to_end()
78///
79/// # async fn dox() -> std::io::Result<()> {
80/// let mut file = File::open("foo.txt").await?;
81///
82/// let mut contents = vec![];
83/// file.read_to_end(&mut contents).await?;
84///
85/// println!("len = {}", contents.len());
86/// # Ok(())
87/// # }
88/// ```
89pub struct File {
90 std: Arc<StdFile>,
91 inner: Mutex<Inner>,
92 max_buf_size: usize,
93}
94
95struct Inner {
96 state: State,
97
98 /// Errors from writes/flushes are returned in write/flush calls. If a write
99 /// error is observed while performing a read, it is saved until the next
100 /// write / flush call.
101 last_write_err: Option<io::ErrorKind>,
102
103 pos: u64,
104}
105
106#[derive(Debug)]
107enum State {
108 Idle(Option<Buf>),
109 Busy(JoinHandle<(Operation, Buf)>),
110}
111
112#[derive(Debug)]
113enum Operation {
114 Read(io::Result<usize>),
115 Write(io::Result<()>),
116 Seek(io::Result<u64>),
117}
118
119impl File {
120 /// Attempts to open a file in read-only mode.
121 ///
122 /// See [`OpenOptions`] for more details.
123 ///
124 /// # Errors
125 ///
126 /// This function will return an error if called from outside of the Tokio
127 /// runtime or if path does not already exist. Other errors may also be
128 /// returned according to `OpenOptions::open`.
129 ///
130 /// # Examples
131 ///
132 /// ```no_run
133 /// use tokio::fs::File;
134 /// use tokio::io::AsyncReadExt;
135 ///
136 /// # async fn dox() -> std::io::Result<()> {
137 /// let mut file = File::open("foo.txt").await?;
138 ///
139 /// let mut contents = vec![];
140 /// file.read_to_end(&mut contents).await?;
141 ///
142 /// println!("len = {}", contents.len());
143 /// # Ok(())
144 /// # }
145 /// ```
146 ///
147 /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
148 ///
149 /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
150 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
151 pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
152 let path = path.as_ref().to_owned();
153 let std = asyncify(|| StdFile::open(path)).await?;
154
155 Ok(File::from_std(std))
156 }
157
158 /// Opens a file in write-only mode.
159 ///
160 /// This function will create a file if it does not exist, and will truncate
161 /// it if it does.
162 ///
163 /// See [`OpenOptions`] for more details.
164 ///
165 /// # Errors
166 ///
167 /// Results in an error if called from outside of the Tokio runtime or if
168 /// the underlying [`create`] call results in an error.
169 ///
170 /// [`create`]: std::fs::File::create
171 ///
172 /// # Examples
173 ///
174 /// ```no_run
175 /// use tokio::fs::File;
176 /// use tokio::io::AsyncWriteExt;
177 ///
178 /// # async fn dox() -> std::io::Result<()> {
179 /// let mut file = File::create("foo.txt").await?;
180 /// file.write_all(b"hello, world!").await?;
181 /// # Ok(())
182 /// # }
183 /// ```
184 ///
185 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
186 ///
187 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
188 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
189 pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
190 let path = path.as_ref().to_owned();
191 let std_file = asyncify(move || StdFile::create(path)).await?;
192 Ok(File::from_std(std_file))
193 }
194
195 /// Opens a file in read-write mode.
196 ///
197 /// This function will create a file if it does not exist, or return an error
198 /// if it does. This way, if the call succeeds, the file returned is guaranteed
199 /// to be new.
200 ///
201 /// This option is useful because it is atomic. Otherwise between checking
202 /// whether a file exists and creating a new one, the file may have been
203 /// created by another process (a TOCTOU race condition / attack).
204 ///
205 /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`.
206 ///
207 /// See [`OpenOptions`] for more details.
208 ///
209 /// # Examples
210 ///
211 /// ```no_run
212 /// use tokio::fs::File;
213 /// use tokio::io::AsyncWriteExt;
214 ///
215 /// # async fn dox() -> std::io::Result<()> {
216 /// let mut file = File::create_new("foo.txt").await?;
217 /// file.write_all(b"hello, world!").await?;
218 /// # Ok(())
219 /// # }
220 /// ```
221 ///
222 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
223 ///
224 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
225 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
226 pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
227 Self::options()
228 .read(true)
229 .write(true)
230 .create_new(true)
231 .open(path)
232 .await
233 }
234
235 /// Returns a new [`OpenOptions`] object.
236 ///
237 /// This function returns a new `OpenOptions` object that you can use to
238 /// open or create a file with specific options if `open()` or `create()`
239 /// are not appropriate.
240 ///
241 /// It is equivalent to `OpenOptions::new()`, but allows you to write more
242 /// readable code. Instead of
243 /// `OpenOptions::new().append(true).open("example.log")`,
244 /// you can write `File::options().append(true).open("example.log")`. This
245 /// also avoids the need to import `OpenOptions`.
246 ///
247 /// See the [`OpenOptions::new`] function for more details.
248 ///
249 /// # Examples
250 ///
251 /// ```no_run
252 /// use tokio::fs::File;
253 /// use tokio::io::AsyncWriteExt;
254 ///
255 /// # async fn dox() -> std::io::Result<()> {
256 /// let mut f = File::options().append(true).open("example.log").await?;
257 /// f.write_all(b"new line\n").await?;
258 /// # Ok(())
259 /// # }
260 /// ```
261 #[must_use]
262 pub fn options() -> OpenOptions {
263 OpenOptions::new()
264 }
265
266 /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
267 ///
268 /// # Examples
269 ///
270 /// ```no_run
271 /// // This line could block. It is not recommended to do this on the Tokio
272 /// // runtime.
273 /// let std_file = std::fs::File::open("foo.txt").unwrap();
274 /// let file = tokio::fs::File::from_std(std_file);
275 /// ```
276 pub fn from_std(std: StdFile) -> File {
277 File {
278 std: Arc::new(std),
279 inner: Mutex::new(Inner {
280 state: State::Idle(Some(Buf::with_capacity(0))),
281 last_write_err: None,
282 pos: 0,
283 }),
284 max_buf_size: DEFAULT_MAX_BUF_SIZE,
285 }
286 }
287
288 /// Attempts to sync all OS-internal metadata to disk.
289 ///
290 /// This function will attempt to ensure that all in-core data reaches the
291 /// filesystem before returning.
292 ///
293 /// # Examples
294 ///
295 /// ```no_run
296 /// use tokio::fs::File;
297 /// use tokio::io::AsyncWriteExt;
298 ///
299 /// # async fn dox() -> std::io::Result<()> {
300 /// let mut file = File::create("foo.txt").await?;
301 /// file.write_all(b"hello, world!").await?;
302 /// file.sync_all().await?;
303 /// # Ok(())
304 /// # }
305 /// ```
306 ///
307 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
308 ///
309 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
310 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
311 pub async fn sync_all(&self) -> io::Result<()> {
312 let mut inner = self.inner.lock().await;
313 inner.complete_inflight().await;
314
315 let std = self.std.clone();
316 asyncify(move || std.sync_all()).await
317 }
318
319 /// This function is similar to `sync_all`, except that it may not
320 /// synchronize file metadata to the filesystem.
321 ///
322 /// This is intended for use cases that must synchronize content, but don't
323 /// need the metadata on disk. The goal of this method is to reduce disk
324 /// operations.
325 ///
326 /// Note that some platforms may simply implement this in terms of `sync_all`.
327 ///
328 /// # Examples
329 ///
330 /// ```no_run
331 /// use tokio::fs::File;
332 /// use tokio::io::AsyncWriteExt;
333 ///
334 /// # async fn dox() -> std::io::Result<()> {
335 /// let mut file = File::create("foo.txt").await?;
336 /// file.write_all(b"hello, world!").await?;
337 /// file.sync_data().await?;
338 /// # Ok(())
339 /// # }
340 /// ```
341 ///
342 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
343 ///
344 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
345 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
346 pub async fn sync_data(&self) -> io::Result<()> {
347 let mut inner = self.inner.lock().await;
348 inner.complete_inflight().await;
349
350 let std = self.std.clone();
351 asyncify(move || std.sync_data()).await
352 }
353
354 /// Truncates or extends the underlying file, updating the size of this file to become size.
355 ///
356 /// If the size is less than the current file's size, then the file will be
357 /// shrunk. If it is greater than the current file's size, then the file
358 /// will be extended to size and have all of the intermediate data filled in
359 /// with 0s.
360 ///
361 /// # Errors
362 ///
363 /// This function will return an error if the file is not opened for
364 /// writing.
365 ///
366 /// # Examples
367 ///
368 /// ```no_run
369 /// use tokio::fs::File;
370 /// use tokio::io::AsyncWriteExt;
371 ///
372 /// # async fn dox() -> std::io::Result<()> {
373 /// let mut file = File::create("foo.txt").await?;
374 /// file.write_all(b"hello, world!").await?;
375 /// file.set_len(10).await?;
376 /// # Ok(())
377 /// # }
378 /// ```
379 ///
380 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
381 ///
382 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
383 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
384 pub async fn set_len(&self, size: u64) -> io::Result<()> {
385 let mut inner = self.inner.lock().await;
386 inner.complete_inflight().await;
387
388 let mut buf = match inner.state {
389 State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
390 _ => unreachable!(),
391 };
392
393 let seek = if !buf.is_empty() {
394 Some(SeekFrom::Current(buf.discard_read()))
395 } else {
396 None
397 };
398
399 let std = self.std.clone();
400
401 inner.state = State::Busy(spawn_blocking(move || {
402 let res = if let Some(seek) = seek {
403 (&*std).seek(seek).and_then(|_| std.set_len(size))
404 } else {
405 std.set_len(size)
406 }
407 .map(|()| 0); // the value is discarded later
408
409 // Return the result as a seek
410 (Operation::Seek(res), buf)
411 }));
412
413 let (op, buf) = match inner.state {
414 State::Idle(_) => unreachable!(),
415 State::Busy(ref mut rx) => rx.await?,
416 };
417
418 inner.state = State::Idle(Some(buf));
419
420 match op {
421 Operation::Seek(res) => res.map(|pos| {
422 inner.pos = pos;
423 }),
424 _ => unreachable!(),
425 }
426 }
427
428 /// Queries metadata about the underlying file.
429 ///
430 /// # Examples
431 ///
432 /// ```no_run
433 /// use tokio::fs::File;
434 ///
435 /// # async fn dox() -> std::io::Result<()> {
436 /// let file = File::open("foo.txt").await?;
437 /// let metadata = file.metadata().await?;
438 ///
439 /// println!("{:?}", metadata);
440 /// # Ok(())
441 /// # }
442 /// ```
443 pub async fn metadata(&self) -> io::Result<Metadata> {
444 let std = self.std.clone();
445 asyncify(move || std.metadata()).await
446 }
447
448 /// Creates a new `File` instance that shares the same underlying file handle
449 /// as the existing `File` instance. Reads, writes, and seeks will affect both
450 /// File instances simultaneously.
451 ///
452 /// # Examples
453 ///
454 /// ```no_run
455 /// use tokio::fs::File;
456 ///
457 /// # async fn dox() -> std::io::Result<()> {
458 /// let file = File::open("foo.txt").await?;
459 /// let file_clone = file.try_clone().await?;
460 /// # Ok(())
461 /// # }
462 /// ```
463 pub async fn try_clone(&self) -> io::Result<File> {
464 self.inner.lock().await.complete_inflight().await;
465 let std = self.std.clone();
466 let std_file = asyncify(move || std.try_clone()).await?;
467 Ok(File::from_std(std_file))
468 }
469
470 /// Destructures `File` into a [`std::fs::File`]. This function is
471 /// async to allow any in-flight operations to complete.
472 ///
473 /// Use `File::try_into_std` to attempt conversion immediately.
474 ///
475 /// # Examples
476 ///
477 /// ```no_run
478 /// use tokio::fs::File;
479 ///
480 /// # async fn dox() -> std::io::Result<()> {
481 /// let tokio_file = File::open("foo.txt").await?;
482 /// let std_file = tokio_file.into_std().await;
483 /// # Ok(())
484 /// # }
485 /// ```
486 pub async fn into_std(mut self) -> StdFile {
487 self.inner.get_mut().complete_inflight().await;
488 Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
489 }
490
491 /// Tries to immediately destructure `File` into a [`std::fs::File`].
492 ///
493 /// # Errors
494 ///
495 /// This function will return an error containing the file if some
496 /// operation is in-flight.
497 ///
498 /// # Examples
499 ///
500 /// ```no_run
501 /// use tokio::fs::File;
502 ///
503 /// # async fn dox() -> std::io::Result<()> {
504 /// let tokio_file = File::open("foo.txt").await?;
505 /// let std_file = tokio_file.try_into_std().unwrap();
506 /// # Ok(())
507 /// # }
508 /// ```
509 pub fn try_into_std(mut self) -> Result<StdFile, Self> {
510 match Arc::try_unwrap(self.std) {
511 Ok(file) => Ok(file),
512 Err(std_file_arc) => {
513 self.std = std_file_arc;
514 Err(self)
515 }
516 }
517 }
518
519 /// Changes the permissions on the underlying file.
520 ///
521 /// # Platform-specific behavior
522 ///
523 /// This function currently corresponds to the `fchmod` function on Unix and
524 /// the `SetFileInformationByHandle` function on Windows. Note that, this
525 /// [may change in the future][changes].
526 ///
527 /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
528 ///
529 /// # Errors
530 ///
531 /// This function will return an error if the user lacks permission change
532 /// attributes on the underlying file. It may also return an error in other
533 /// os-specific unspecified cases.
534 ///
535 /// # Examples
536 ///
537 /// ```no_run
538 /// use tokio::fs::File;
539 ///
540 /// # async fn dox() -> std::io::Result<()> {
541 /// let file = File::open("foo.txt").await?;
542 /// let mut perms = file.metadata().await?.permissions();
543 /// perms.set_readonly(true);
544 /// file.set_permissions(perms).await?;
545 /// # Ok(())
546 /// # }
547 /// ```
548 pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
549 let std = self.std.clone();
550 asyncify(move || std.set_permissions(perm)).await
551 }
552
553 /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
554 ///
555 /// Although Tokio uses a sensible default value for this buffer size, this function would be
556 /// useful for changing that default depending on the situation.
557 ///
558 /// # Examples
559 ///
560 /// ```no_run
561 /// use tokio::fs::File;
562 /// use tokio::io::AsyncWriteExt;
563 ///
564 /// # async fn dox() -> std::io::Result<()> {
565 /// let mut file = File::open("foo.txt").await?;
566 ///
567 /// // Set maximum buffer size to 8 MiB
568 /// file.set_max_buf_size(8 * 1024 * 1024);
569 ///
570 /// let mut buf = vec![1; 1024 * 1024 * 1024];
571 ///
572 /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
573 /// file.write_all(&mut buf).await?;
574 /// # Ok(())
575 /// # }
576 /// ```
577 pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
578 self.max_buf_size = max_buf_size;
579 }
580}
581
582impl AsyncRead for File {
583 fn poll_read(
584 self: Pin<&mut Self>,
585 cx: &mut Context<'_>,
586 dst: &mut ReadBuf<'_>,
587 ) -> Poll<io::Result<()>> {
588 ready!(crate::trace::trace_leaf(cx));
589 let me = self.get_mut();
590 let inner = me.inner.get_mut();
591
592 loop {
593 match inner.state {
594 State::Idle(ref mut buf_cell) => {
595 let mut buf = buf_cell.take().unwrap();
596
597 if !buf.is_empty() {
598 buf.copy_to(dst);
599 *buf_cell = Some(buf);
600 return Poll::Ready(Ok(()));
601 }
602
603 buf.ensure_capacity_for(dst, me.max_buf_size);
604 let std = me.std.clone();
605
606 inner.state = State::Busy(spawn_blocking(move || {
607 let res = buf.read_from(&mut &*std);
608 (Operation::Read(res), buf)
609 }));
610 }
611 State::Busy(ref mut rx) => {
612 let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
613
614 match op {
615 Operation::Read(Ok(_)) => {
616 buf.copy_to(dst);
617 inner.state = State::Idle(Some(buf));
618 return Poll::Ready(Ok(()));
619 }
620 Operation::Read(Err(e)) => {
621 assert!(buf.is_empty());
622
623 inner.state = State::Idle(Some(buf));
624 return Poll::Ready(Err(e));
625 }
626 Operation::Write(Ok(())) => {
627 assert!(buf.is_empty());
628 inner.state = State::Idle(Some(buf));
629 continue;
630 }
631 Operation::Write(Err(e)) => {
632 assert!(inner.last_write_err.is_none());
633 inner.last_write_err = Some(e.kind());
634 inner.state = State::Idle(Some(buf));
635 }
636 Operation::Seek(result) => {
637 assert!(buf.is_empty());
638 inner.state = State::Idle(Some(buf));
639 if let Ok(pos) = result {
640 inner.pos = pos;
641 }
642 continue;
643 }
644 }
645 }
646 }
647 }
648 }
649}
650
651impl AsyncSeek for File {
652 fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
653 let me = self.get_mut();
654 let inner = me.inner.get_mut();
655
656 match inner.state {
657 State::Busy(_) => Err(io::Error::new(
658 io::ErrorKind::Other,
659 "other file operation is pending, call poll_complete before start_seek",
660 )),
661 State::Idle(ref mut buf_cell) => {
662 let mut buf = buf_cell.take().unwrap();
663
664 // Factor in any unread data from the buf
665 if !buf.is_empty() {
666 let n = buf.discard_read();
667
668 if let SeekFrom::Current(ref mut offset) = pos {
669 *offset += n;
670 }
671 }
672
673 let std = me.std.clone();
674
675 inner.state = State::Busy(spawn_blocking(move || {
676 let res = (&*std).seek(pos);
677 (Operation::Seek(res), buf)
678 }));
679 Ok(())
680 }
681 }
682 }
683
684 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
685 ready!(crate::trace::trace_leaf(cx));
686 let inner = self.inner.get_mut();
687
688 loop {
689 match inner.state {
690 State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
691 State::Busy(ref mut rx) => {
692 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
693 inner.state = State::Idle(Some(buf));
694
695 match op {
696 Operation::Read(_) => {}
697 Operation::Write(Err(e)) => {
698 assert!(inner.last_write_err.is_none());
699 inner.last_write_err = Some(e.kind());
700 }
701 Operation::Write(_) => {}
702 Operation::Seek(res) => {
703 if let Ok(pos) = res {
704 inner.pos = pos;
705 }
706 return Poll::Ready(res);
707 }
708 }
709 }
710 }
711 }
712 }
713}
714
715impl AsyncWrite for File {
716 fn poll_write(
717 self: Pin<&mut Self>,
718 cx: &mut Context<'_>,
719 src: &[u8],
720 ) -> Poll<io::Result<usize>> {
721 ready!(crate::trace::trace_leaf(cx));
722 let me = self.get_mut();
723 let inner = me.inner.get_mut();
724
725 if let Some(e) = inner.last_write_err.take() {
726 return Poll::Ready(Err(e.into()));
727 }
728
729 loop {
730 match inner.state {
731 State::Idle(ref mut buf_cell) => {
732 let mut buf = buf_cell.take().unwrap();
733
734 let seek = if !buf.is_empty() {
735 Some(SeekFrom::Current(buf.discard_read()))
736 } else {
737 None
738 };
739
740 let n = buf.copy_from(src, me.max_buf_size);
741 let std = me.std.clone();
742
743 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
744 let res = if let Some(seek) = seek {
745 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
746 } else {
747 buf.write_to(&mut &*std)
748 };
749
750 (Operation::Write(res), buf)
751 })
752 .ok_or_else(|| {
753 io::Error::new(io::ErrorKind::Other, "background task failed")
754 })?;
755
756 inner.state = State::Busy(blocking_task_join_handle);
757
758 return Poll::Ready(Ok(n));
759 }
760 State::Busy(ref mut rx) => {
761 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
762 inner.state = State::Idle(Some(buf));
763
764 match op {
765 Operation::Read(_) => {
766 // We don't care about the result here. The fact
767 // that the cursor has advanced will be reflected in
768 // the next iteration of the loop
769 continue;
770 }
771 Operation::Write(res) => {
772 // If the previous write was successful, continue.
773 // Otherwise, error.
774 res?;
775 continue;
776 }
777 Operation::Seek(_) => {
778 // Ignore the seek
779 continue;
780 }
781 }
782 }
783 }
784 }
785 }
786
787 fn poll_write_vectored(
788 self: Pin<&mut Self>,
789 cx: &mut Context<'_>,
790 bufs: &[io::IoSlice<'_>],
791 ) -> Poll<Result<usize, io::Error>> {
792 ready!(crate::trace::trace_leaf(cx));
793 let me = self.get_mut();
794 let inner = me.inner.get_mut();
795
796 if let Some(e) = inner.last_write_err.take() {
797 return Poll::Ready(Err(e.into()));
798 }
799
800 loop {
801 match inner.state {
802 State::Idle(ref mut buf_cell) => {
803 let mut buf = buf_cell.take().unwrap();
804
805 let seek = if !buf.is_empty() {
806 Some(SeekFrom::Current(buf.discard_read()))
807 } else {
808 None
809 };
810
811 let n = buf.copy_from_bufs(bufs, me.max_buf_size);
812 let std = me.std.clone();
813
814 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
815 let res = if let Some(seek) = seek {
816 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
817 } else {
818 buf.write_to(&mut &*std)
819 };
820
821 (Operation::Write(res), buf)
822 })
823 .ok_or_else(|| {
824 io::Error::new(io::ErrorKind::Other, "background task failed")
825 })?;
826
827 inner.state = State::Busy(blocking_task_join_handle);
828
829 return Poll::Ready(Ok(n));
830 }
831 State::Busy(ref mut rx) => {
832 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
833 inner.state = State::Idle(Some(buf));
834
835 match op {
836 Operation::Read(_) => {
837 // We don't care about the result here. The fact
838 // that the cursor has advanced will be reflected in
839 // the next iteration of the loop
840 continue;
841 }
842 Operation::Write(res) => {
843 // If the previous write was successful, continue.
844 // Otherwise, error.
845 res?;
846 continue;
847 }
848 Operation::Seek(_) => {
849 // Ignore the seek
850 continue;
851 }
852 }
853 }
854 }
855 }
856 }
857
858 fn is_write_vectored(&self) -> bool {
859 true
860 }
861
862 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
863 ready!(crate::trace::trace_leaf(cx));
864 let inner = self.inner.get_mut();
865 inner.poll_flush(cx)
866 }
867
868 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
869 ready!(crate::trace::trace_leaf(cx));
870 self.poll_flush(cx)
871 }
872}
873
874impl From<StdFile> for File {
875 fn from(std: StdFile) -> Self {
876 Self::from_std(std)
877 }
878}
879
880impl fmt::Debug for File {
881 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
882 fmt.debug_struct("tokio::fs::File")
883 .field("std", &self.std)
884 .finish()
885 }
886}
887
888#[cfg(unix)]
889impl std::os::unix::io::AsRawFd for File {
890 fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
891 self.std.as_raw_fd()
892 }
893}
894
895#[cfg(unix)]
896impl std::os::unix::io::AsFd for File {
897 fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
898 unsafe {
899 std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
900 }
901 }
902}
903
904#[cfg(unix)]
905impl std::os::unix::io::FromRawFd for File {
906 unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
907 StdFile::from_raw_fd(fd).into()
908 }
909}
910
911cfg_windows! {
912 use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
913
914 impl AsRawHandle for File {
915 fn as_raw_handle(&self) -> RawHandle {
916 self.std.as_raw_handle()
917 }
918 }
919
920 impl AsHandle for File {
921 fn as_handle(&self) -> BorrowedHandle<'_> {
922 unsafe {
923 BorrowedHandle::borrow_raw(
924 AsRawHandle::as_raw_handle(self),
925 )
926 }
927 }
928 }
929
930 impl FromRawHandle for File {
931 unsafe fn from_raw_handle(handle: RawHandle) -> Self {
932 StdFile::from_raw_handle(handle).into()
933 }
934 }
935}
936
937impl Inner {
938 async fn complete_inflight(&mut self) {
939 use std::future::poll_fn;
940
941 poll_fn(|cx| self.poll_complete_inflight(cx)).await;
942 }
943
944 fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
945 ready!(crate::trace::trace_leaf(cx));
946 match self.poll_flush(cx) {
947 Poll::Ready(Err(e)) => {
948 self.last_write_err = Some(e.kind());
949 Poll::Ready(())
950 }
951 Poll::Ready(Ok(())) => Poll::Ready(()),
952 Poll::Pending => Poll::Pending,
953 }
954 }
955
956 fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
957 if let Some(e) = self.last_write_err.take() {
958 return Poll::Ready(Err(e.into()));
959 }
960
961 let (op, buf) = match self.state {
962 State::Idle(_) => return Poll::Ready(Ok(())),
963 State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
964 };
965
966 // The buffer is not used here
967 self.state = State::Idle(Some(buf));
968
969 match op {
970 Operation::Read(_) => Poll::Ready(Ok(())),
971 Operation::Write(res) => Poll::Ready(res),
972 Operation::Seek(_) => Poll::Ready(Ok(())),
973 }
974 }
975}
976
977#[cfg(test)]
978mod tests;