//! Utilities for working with the PostgreSQL binary copy format. use crate::types::{ToSql, Type}; use crate::{CopyInWriter, CopyOutReader, Error, Rt}; use fallible_iterator::FallibleIterator; use futures::StreamExt; use std::pin::Pin; #[doc(inline)] pub use tokio_postgres::binary_copy::BinaryCopyOutRow; use tokio_postgres::binary_copy::{self, BinaryCopyOutStream}; /// A type which serializes rows into the PostgreSQL binary copy format. /// /// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted. pub struct BinaryCopyInWriter<'a> { runtime: Rt<'a>, sink: Pin>, } impl<'a> BinaryCopyInWriter<'a> { /// Creates a new writer which will write rows of the provided types. pub fn new(writer: CopyInWriter<'a>, types: &[Type]) -> BinaryCopyInWriter<'a> { let stream = writer .sink .into_unpinned() .expect("writer has already been written to"); BinaryCopyInWriter { runtime: writer.runtime, sink: Box::pin(binary_copy::BinaryCopyInWriter::new(stream, types)), } } /// Writes a single row. /// /// # Panics /// /// Panics if the number of values provided does not match the number expected. pub fn write(&mut self, values: &[&(dyn ToSql + Sync)]) -> Result<(), Error> { self.runtime.block_on(self.sink.as_mut().write(values)) } /// A maximally-flexible version of `write`. /// /// # Panics /// /// Panics if the number of values provided does not match the number expected. pub fn write_raw<'b, I>(&mut self, values: I) -> Result<(), Error> where I: IntoIterator, I::IntoIter: ExactSizeIterator, { self.runtime.block_on(self.sink.as_mut().write_raw(values)) } /// Completes the copy, returning the number of rows added. /// /// This method *must* be used to complete the copy process. If it is not, the copy will be aborted. pub fn finish(mut self) -> Result { self.runtime.block_on(self.sink.as_mut().finish()) } } /// An iterator of rows deserialized from the PostgreSQL binary copy format. pub struct BinaryCopyOutIter<'a> { runtime: Rt<'a>, stream: Pin>, } impl<'a> BinaryCopyOutIter<'a> { /// Creates a new iterator from a raw copy out reader and the types of the columns being returned. pub fn new(reader: CopyOutReader<'a>, types: &[Type]) -> BinaryCopyOutIter<'a> { let stream = reader .stream .into_unpinned() .expect("reader has already been read from"); BinaryCopyOutIter { runtime: reader.runtime, stream: Box::pin(BinaryCopyOutStream::new(stream, types)), } } } impl FallibleIterator for BinaryCopyOutIter<'_> { type Item = BinaryCopyOutRow; type Error = Error; fn next(&mut self) -> Result, Error> { self.runtime.block_on(self.stream.next()).transpose() } }