
use crate::error;
use crate::fmt;
use crate::io::{
self, ErrorKind, IntoInnerError, IoSlice, Seek, SeekFrom, Write, DEFAULT_BUF_SIZE,
};
use crate::mem;
use crate::ptr;
/// 包装一个 writer 并缓冲其输出。
///
/// 直接与实现 [`Write`] 的组件一起工作可能会非常低效。
/// 例如,对 [`TcpStream`] 上 [`write`][`TcpStream::write`] 的每次调用都会导致系统调用。
/// `BufWriter<W>` 保留数据的内存缓冲区,并以大批量、不频繁的方式将其写入底层 writer。
///
/// `BufWriter<W>` 可以提高对同一文件或网络套接字进行小规模重复写调用的程序的速度。
/// 一次写入大量或一次写入几次都无济于事。
/// 在写入内存中的目标时,它也没有提供任何优势,例如 <code>[Vec]\<u8></code>.
///
/// 在丢弃 `BufWriter<W>` 之前,调用 [`flush`] 至关重要。
/// 尽管丢弃将尝试刷新缓冲区的内容,但丢弃过程中发生的任何错误都将被忽略。
/// 调用 [`flush`] 可确保缓冲区为空,因此丢弃操作甚至不会尝试文件操作。
///
/// # Examples
///
/// 让我们将数字 1 到 10 写入 [`TcpStream`]:
///
/// ```no_run
/// use std::io::prelude::*;
/// use std::net::TcpStream;
///
/// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
///
/// for i in 0..10 {
/// stream.write(&[i+1]).unwrap();
/// }
/// ```
///
/// 因为我们没有缓冲,所以我们依次写入每个字节,从而导致写入的每个字节占用系统调用的开销。我们可以用
/// `BufWriter<W>`:
///
/// ```no_run
/// use std::io::prelude::*;
/// use std::io::BufWriter;
/// use std::net::TcpStream;
///
/// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
///
/// for i in 0..10 {
/// stream.write(&[i+1]).unwrap();
/// }
/// stream.flush().unwrap();
/// ```
///
/// 通过用 `BufWriter<W>` 包装流,这十次写操作全部由缓冲区分组,并且在刷新 `stream` 时将全部写在一个系统调用中。
///
///
///
///
///
///
///
///
///
///
// HACK(#78696): 不能将 `crate` 用于关联的项
/// [`TcpStream::write`]: super::super::super::net::TcpStream::write
/// [`TcpStream`]: crate::net::TcpStream
/// [`flush`]: BufWriter::flush
#[stable(feature = "rust1", since = "1.0.0")]
pub struct BufWriter<W: Write> {
inner: W,
// 缓冲区。避免在公共代码路径中像普通 `Vec` 一样使用它。
// 也就是说,不要使用 `buf.push`、`buf.extend_from_slice` 或任何其他需要边界检查等的方法。
// 这对性能产生了巨大的影响 (我们可能希望完全停止使用 `Vec`)。
//
buf: Vec<u8>,
// #30888: 如果内部写入程序在调用 write 时 panics,我们不希望在 BufWriter 的析构函数中再次写入缓冲的数据。
// 该标志告诉 Drop impl 是否应跳过刷新。
//
panicked: bool,
}
impl<W: Write> BufWriter<W> {
/// 创建一个具有默认缓冲区容量的新 `BufWriter<W>`。
/// 当前默认值为 8 KB,但可能会在 future 中进行更改。
///
/// # Examples
///
/// ```no_run
/// use std::io::BufWriter;
/// use std::net::TcpStream;
///
/// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn new(inner: W) -> BufWriter<W> {
BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
}
/// 创建一个至少具有指定缓冲区容量的新 `BufWriter<W>`。
///
/// # Examples
///
/// 创建一个缓冲区,至少有一百字节的缓冲区。
///
/// ```no_run
/// use std::io::BufWriter;
/// use std::net::TcpStream;
///
/// let stream = TcpStream::connect("127.0.0.1:34254").unwrap();
/// let mut buffer = BufWriter::with_capacity(100, stream);
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
BufWriter { inner, buf: Vec::with_capacity(capacity), panicked: false }
}
/// 将本地缓冲区中的数据发送到内部 writer 中,并根据需要进行循环,直到全部发送或发生错误为止。
///
/// 由于已将缓冲区中的所有数据报告为 "successfully written" (通过从 `write` 返回非零成功值) 通知给我们的所有者,因此,从 `inner` 进行任何 0 长度的写操作都必须通过此方法报告为 i/o 错误。
///
///
///
///
pub(in crate::io) fn flush_buf(&mut self) -> io::Result<()> {
/// Helper 结构体,以确保在所有写操作完成后更新缓冲区。
/// 它跟踪写入的字节数,并在丢弃时从缓冲区的前端排出所有字节。
///
struct BufGuard<'a> {
buffer: &'a mut Vec<u8>,
written: usize,
}
impl<'a> BufGuard<'a> {
fn new(buffer: &'a mut Vec<u8>) -> Self {
Self { buffer, written: 0 }
}
/// 缓冲区的未写部分
fn remaining(&self) -> &[u8] {
&self.buffer[self.written..]
}
/// 将某些字节标记为从缓冲区的最前面删除
fn consume(&mut self, amt: usize) {
self.written += amt;
}
/// 如果所有字节均已写入,则为 true
fn done(&self) -> bool {
self.written >= self.buffer.len()
}
}
impl Drop for BufGuard<'_> {
fn drop(&mut self) {
if self.written > 0 {
self.buffer.drain(..self.written);
}
}
}
let mut guard = BufGuard::new(&mut self.buf);
while !guard.done() {
self.panicked = true;
let r = self.inner.write(guard.remaining());
self.panicked = false;
match r {
Ok(0) => {
return Err(io::const_io_error!(
ErrorKind::WriteZero,
"failed to write the buffered data",
));
}
Ok(n) => guard.consume(n),
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
/// 无论数据大小如何,都可以缓冲某些数据而不刷新它们。
/// 尽可能多地写入,而不会超出容量。
/// 返回写入的字节数。
pub(super) fn write_to_buf(&mut self, buf: &[u8]) -> usize {
let available = self.spare_capacity();
let amt_to_buffer = available.min(buf.len());
// SAFETY: `amt_to_buffer` 是 <= 缓冲区的构造备用容量。
unsafe {
self.write_to_buffer_unchecked(&buf[..amt_to_buffer]);
}
amt_to_buffer
}
/// 获取对底层 writer 的引用。
///
/// # Examples
///
/// ```no_run
/// use std::io::BufWriter;
/// use std::net::TcpStream;
///
/// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
///
/// // 我们可以像缓冲区一样使用引用
/// let reference = buffer.get_ref();
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn get_ref(&self) -> &W {
&self.inner
}
/// 获取底层 writer 的可变引用。
///
/// 直接写给底层的 writer 是不可取的。
///
/// # Examples
///
/// ```no_run
/// use std::io::BufWriter;
/// use std::net::TcpStream;
///
/// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
///
/// // 我们可以像缓冲区一样使用引用
/// let reference = buffer.get_mut();
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn get_mut(&mut self) -> &mut W {
&mut self.inner
}
/// 返回对内部缓冲数据的引用。
///
/// # Examples
///
/// ```no_run
/// use std::io::BufWriter;
/// use std::net::TcpStream;
///
/// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
///
/// // 查看当前缓冲了多少字节
/// let bytes_buffered = buf_writer.buffer().len();
/// ```
#[stable(feature = "bufreader_buffer", since = "1.37.0")]
pub fn buffer(&self) -> &[u8] {
&self.buf
}
/// 返回一个可变引用到内部缓冲区。
///
/// 这可用于将数据直接写入缓冲区,而无需触发 writers 到底层 writer。
///
/// 缓冲区是 `Vec` 是实现细节。
/// 调用者不应修改容量,因为当前没有公共 API 可以进行修改,因此,任何容量更改都将是用户无法预料的。
///
///
pub(in crate::io) fn buffer_mut(&mut self) -> &mut Vec<u8> {
&mut self.buf
}
/// 返回内部缓冲区在不刷新的情况下可以容纳的字节数。
///
/// # Examples
///
/// ```no_run
/// use std::io::BufWriter;
/// use std::net::TcpStream;
///
/// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
///
/// // 检查内部缓冲区的容量
/// let capacity = buf_writer.capacity();
/// // 计算不刷新就可以写入多少个字节
/// let without_flush = capacity - buf_writer.buffer().len();
/// ```
#[stable(feature = "buffered_io_capacity", since = "1.46.0")]
pub fn capacity(&self) -> usize {
self.buf.capacity()
}
/// 解包此 `BufWriter<W>`,返回底层 writer。
///
/// 在返回 writer 之前将缓冲区写出。
///
/// # Errors
///
/// 如果刷新缓冲区时发生错误,将返回 [`Err`]。
///
/// # Examples
///
/// ```no_run
/// use std::io::BufWriter;
/// use std::net::TcpStream;
///
/// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
///
/// // 拆开 TcpStream 并刷新缓冲区
/// let stream = buffer.into_inner().unwrap();
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn into_inner(mut self) -> Result<W, IntoInnerError<BufWriter<W>>> {
match self.flush_buf() {
Err(e) => Err(IntoInnerError::new(self, e)),
Ok(()) => Ok(self.into_parts().0),
}
}
/// 反汇编这个 `BufWriter<W>`,返回底层 writer,以及任何缓冲但未写入的数据。
///
/// 如果底层的 writer 发生 panic,就不知道数据的哪一部分被写入了。
/// 在这种情况下,我们返回 `WriterPanicked` 作为缓冲数据 (仍然可以从中恢复缓冲内容)。
///
///
/// `into_parts` 不会尝试刷新数据,并且不会失败。
///
/// # Examples
///
/// ```
/// use std::io::{BufWriter, Write};
///
/// let mut buffer = [0u8; 10];
/// let mut stream = BufWriter::new(buffer.as_mut());
/// write!(stream, "too much data").unwrap();
/// stream.flush().expect_err("it doesn't fit");
/// let (recovered_writer, buffered_data) = stream.into_parts();
/// assert_eq!(recovered_writer.len(), 0);
/// assert_eq!(&buffered_data.unwrap(), b"ata");
/// ```
///
#[stable(feature = "bufwriter_into_parts", since = "1.56.0")]
pub fn into_parts(mut self) -> (W, Result<Vec<u8>, WriterPanicked>) {
let buf = mem::take(&mut self.buf);
let buf = if !self.panicked { Ok(buf) } else { Err(WriterPanicked { buf }) };
// SAFETY: forget(self) 防止双重丢弃内部
let inner = unsafe { ptr::read(&self.inner) };
mem::forget(self);
(inner, buf)
}
// 确保这个函数不会被内联到 `write` 中,这样它就可以保持内联并且它的公共路径尽可能地短。
// 如果这个函数最终相对于 `write` 被频繁调用,则可能表明客户端使用了大小不正确的缓冲区或他们的写入模式有点病态。
//
//
//
#[cold]
#[inline(never)]
fn write_cold(&mut self, buf: &[u8]) -> io::Result<usize> {
if buf.len() > self.spare_capacity() {
self.flush_buf()?;
}
// 为什么不是 len > capacity? 以避免在输入正好填满缓冲区时发生不必要的遍历。
// 无论如何,我们只需要将其刷新到底层 writer 即可。
if buf.len() >= self.buf.capacity() {
self.panicked = true;
let r = self.get_mut().write(buf);
self.panicked = false;
r
} else {
// 写入缓冲区。在这种情况下,即使缓冲区完全填满,我们也会写入缓冲区。
// 否则将意味着刷新缓冲区,然后将此输入写入内部 writer,这在许多情况下将是更糟糕的策略。
//
// SAFETY: 要么已经有足够的空闲容量,要么没有,我们刷新了缓冲区以确保有。
// 在后一种情况下,我们知道是因为 flush 确保了我们整个缓冲区都是空闲容量,我们进入这个块是因为输入缓冲区长度小于那个容量。
//
// 无论哪种情况,将输入缓冲区写入我们的缓冲区都是安全的。
//
unsafe {
self.write_to_buffer_unchecked(buf);
}
Ok(buf.len())
}
}
// 确保这个函数不会被内联到 `write_all` 中,这样它就可以保持内联并且它的公共路径尽可能地短。
// 如果这个函数最终相对于 `write_all` 被频繁调用,则可能表明客户端使用了大小不正确的缓冲区或他们的写入模式有点病态。
//
//
//
#[cold]
#[inline(never)]
fn write_all_cold(&mut self, buf: &[u8]) -> io::Result<()> {
// 通常,`write_all` 只是在循环中调用 `write`。
// 我们可以通过直接调用 `self.get_mut().write_all()` 来做得更好,这样可以避免在某些情况下发生一系列局部写操作时通过缓冲区的往返行程。
//
//
if buf.len() > self.spare_capacity() {
self.flush_buf()?;
}
// 为什么不是 len > capacity? 以避免在输入正好填满缓冲区时发生不必要的遍历。
// 无论如何,我们只需要将其刷新到底层 writer 即可。
if buf.len() >= self.buf.capacity() {
self.panicked = true;
let r = self.get_mut().write_all(buf);
self.panicked = false;
r
} else {
// 写入缓冲区。在这种情况下,即使缓冲区完全填满,我们也会写入缓冲区。
// 否则将意味着刷新缓冲区,然后将此输入写入内部 writer,这在许多情况下将是更糟糕的策略。
//
// SAFETY: 要么已经有足够的空闲容量,要么没有,我们刷新了缓冲区以确保有。
// 在后一种情况下,我们知道是因为 flush 确保了我们整个缓冲区都是空闲容量,我们进入这个块是因为输入缓冲区长度小于那个容量。
//
// 无论哪种情况,将输入缓冲区写入我们的缓冲区都是安全的。
//
unsafe {
self.write_to_buffer_unchecked(buf);
}
Ok(())
}
}
// SAFETY: 需要 `buf.len() <= self.buf.capacity() - self.buf.len()`,即输入缓冲区长度小于或等于备用容量。
//
#[inline]
unsafe fn write_to_buffer_unchecked(&mut self, buf: &[u8]) {
debug_assert!(buf.len() <= self.spare_capacity());
let old_len = self.buf.len();
let buf_len = buf.len();
let src = buf.as_ptr();
let dst = self.buf.as_mut_ptr().add(old_len);
ptr::copy_nonoverlapping(src, dst, buf_len);
self.buf.set_len(old_len + buf_len);
}
#[inline]
fn spare_capacity(&self) -> usize {
self.buf.capacity() - self.buf.len()
}
}
#[stable(feature = "bufwriter_into_parts", since = "1.56.0")]
/// 当底层 writer 之前有 panicked 时,为来自 `BufWriter::into_parts` 的缓冲数据返回错误。
/// 包含 (可能是部分写入的) 缓冲数据。
///
/// # Example
///
/// ```
/// use std::io::{self, BufWriter, Write};
/// use std::panic::{catch_unwind, AssertUnwindSafe};
///
/// struct PanickingWriter;
/// impl Write for PanickingWriter {
/// fn write(&mut self, buf: &[u8]) -> io::Result<usize> { panic!() }
/// fn flush(&mut self) -> io::Result<()> { panic!() }
/// }
///
/// let mut stream = BufWriter::new(PanickingWriter);
/// write!(stream, "some data").unwrap();
/// let result = catch_unwind(AssertUnwindSafe(|| {
/// stream.flush().unwrap()
/// }));
/// assert!(result.is_err());
/// let (recovered_writer, buffered_data) = stream.into_parts();
/// assert!(matches!(recovered_writer, PanickingWriter));
/// assert_eq!(buffered_data.unwrap_err().into_inner(), b"some data");
/// ```
pub struct WriterPanicked {
buf: Vec<u8>,
}
impl WriterPanicked {
/// 返回可能未写入的数据。
/// 其中一些数据可能是由 panic 调用写入底层编写器的,因此简单地再次编写它并不是一个好主意。
#[must_use = "`self` will be dropped if the result is not used"]
#[stable(feature = "bufwriter_into_parts", since = "1.56.0")]
pub fn into_inner(self) -> Vec<u8> {
self.buf
}
const DESCRIPTION: &'static str =
"BufWriter inner writer panicked, what data remains unwritten is not known";
}
#[stable(feature = "bufwriter_into_parts", since = "1.56.0")]
impl error::Error for WriterPanicked {
#[allow(deprecated, deprecated_in_future)]
fn description(&self) -> &str {
Self::DESCRIPTION
}
}
#[stable(feature = "bufwriter_into_parts", since = "1.56.0")]
impl fmt::Display for WriterPanicked {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", Self::DESCRIPTION)
}
}
#[stable(feature = "bufwriter_into_parts", since = "1.56.0")]
impl fmt::Debug for WriterPanicked {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WriterPanicked")
.field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity()))
.finish()
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<W: Write> Write for BufWriter<W> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// 使用 < 而不是 <= 以避免在某些情况下通过缓冲区进行不必要的缓冲行程。
// 有关详细信息,请参见 `write_cold`。
if buf.len() < self.spare_capacity() {
// SAFETY: 以上条件安全。
unsafe {
self.write_to_buffer_unchecked(buf);
}
Ok(buf.len())
} else {
self.write_cold(buf)
}
}
#[inline]
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
// 使用 < 而不是 <= 以避免在某些情况下通过缓冲区进行不必要的缓冲行程。
// 有关详细信息,请参见 `write_all_cold`。
if buf.len() < self.spare_capacity() {
// SAFETY: 以上条件安全。
unsafe {
self.write_to_buffer_unchecked(buf);
}
Ok(())
} else {
self.write_all_cold(buf)
}
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
// FIXME: 考虑应用已应用于 `write` 和 `write_all` 的 `#[inline]`/`#[inline(never)]` 优化。性能优势可能非常显着。
// 请参见 #79930。
if self.get_ref().is_write_vectored() {
// 我们必须处理缓冲区总长度溢出 `usize` 的可能性 (即使只有在多个 IoSlice 引用同一个底层缓冲区时才会发生这种情况,否则缓冲区将无法放入内存中)。
// 如果计算溢出,那么输入肯定无法放入我们的缓冲区,因此我们转发到内部 writer 的 `write_vectored` 方法让它适当地处理它。
//
//
//
let saturated_total_len =
bufs.iter().fold(0usize, |acc, b| acc.saturating_add(b.len()));
if saturated_total_len > self.spare_capacity() {
// 如果输入的总长度超过缓冲区的空闲容量,则刷新。
// 如果我们已经溢出了,这种情况也成立,我们需要刷新。
self.flush_buf()?;
}
if saturated_total_len >= self.buf.capacity() {
// 如果输入的总长度大于或等于我们的缓冲区容量,则转发到我们的内部 writer。
// 如果我们会溢出,则此条件也成立,我们将推送到内部 writer。
//
self.panicked = true;
let r = self.get_mut().write_vectored(bufs);
self.panicked = false;
r
} else {
// `saturated_total_len < self.buf.capacity()` 意味着我们没有饱和。
// SAFETY: 我们在上面检查了备用容量是否足够大。
// 如果是这样,那么我们已经安全了。
// 如果不是,我们刷新,为任何输入 <= 缓冲区大小 (包括此输入) 留出足够的空间。
unsafe {
bufs.iter().for_each(|b| self.write_to_buffer_unchecked(b));
};
Ok(saturated_total_len)
}
} else {
let mut iter = bufs.iter();
let mut total_written = if let Some(buf) = iter.by_ref().find(|&buf| !buf.is_empty()) {
// 这是第一个要写入的非空切片,因此,如果它不适合缓冲区,我们仍然可以刷新并继续。
//
if buf.len() > self.spare_capacity() {
self.flush_buf()?;
}
if buf.len() >= self.buf.capacity() {
// 切片至少与缓冲容量一样大,因此最好绕过缓冲区直接将其写入。
//
self.panicked = true;
let r = self.get_mut().write(buf);
self.panicked = false;
return r;
} else {
// SAFETY: 我们在上面检查了备用容量是否足够大。
// 如果是这样,那么我们已经安全了。
// 如果不是,我们刷新,为任何输入 <= 缓冲区大小 (包括此输入) 留出足够的空间。
unsafe {
self.write_to_buffer_unchecked(buf);
}
buf.len()
}
} else {
return Ok(0);
};
debug_assert!(total_written != 0);
for buf in iter {
if buf.len() <= self.spare_capacity() {
// SAFETY: 以上条件安全。
unsafe {
self.write_to_buffer_unchecked(buf);
}
// 这不能溢出 `usize`。
// 如果我们在这里,到目前为止我们已经将所有字节写入我们的缓冲区,并且我们确保我们永远不会超过缓冲区的容量。
// 所以,`total_written` <= `self.buf.capacity()` <= `usize::MAX`。
total_written += buf.len();
} else {
break;
}
}
Ok(total_written)
}
}
fn is_write_vectored(&self) -> bool {
true
}
fn flush(&mut self) -> io::Result<()> {
self.flush_buf().and_then(|()| self.get_mut().flush())
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<W: Write> fmt::Debug for BufWriter<W>
where
W: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("BufWriter")
.field("writer", &self.inner)
.field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity()))
.finish()
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<W: Write + Seek> Seek for BufWriter<W> {
/// 寻找底层 writer 中的偏移量(以字节为单位)。
///
/// 寻找总是在寻找之前写出内部缓冲区。
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.flush_buf()?;
self.get_mut().seek(pos)
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<W: Write> Drop for BufWriter<W> {
fn drop(&mut self) {
if !self.panicked {
// dtors 不应该出现 panic,所以我们忽略失败的刷新
let _r = self.flush_buf();
}
}
}