use bytes::{Buf, Bytes}; use futures::stream; use futures::{Async, Future, Poll, Stream}; use std::io::{self, BufRead, Cursor, Read}; use std::marker::PhantomData; use tokio_postgres::types::{ToSql, Type}; use tokio_postgres::Error; #[cfg(feature = "runtime")] use tokio_postgres::{MakeTlsMode, Socket, TlsMode}; #[cfg(feature = "runtime")] use crate::Builder; use crate::{Query, Statement, ToStatement, Transaction}; pub struct Client(tokio_postgres::Client); impl Client { #[cfg(feature = "runtime")] pub fn connect(params: &str, tls_mode: T) -> Result where T: MakeTlsMode + 'static + Send, T::TlsMode: Send, T::Stream: Send, T::Future: Send, >::Future: Send, { params.parse::()?.connect(tls_mode) } #[cfg(feature = "runtime")] pub fn builder() -> Builder { Builder::new() } pub fn prepare(&mut self, query: &str) -> Result { self.0.prepare(query).wait().map(Statement) } pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result { self.0.prepare_typed(query, types).wait().map(Statement) } pub fn execute(&mut self, query: &T, params: &[&dyn ToSql]) -> Result where T: ?Sized + ToStatement, { let statement = query.__statement(self)?; self.0.execute(&statement.0, params).wait() } pub fn query(&mut self, query: &T, params: &[&dyn ToSql]) -> Result, Error> where T: ?Sized + ToStatement, { let statement = query.__statement(self)?; Ok(Query::new(self.0.query(&statement.0, params))) } pub fn copy_in( &mut self, query: &T, params: &[&dyn ToSql], reader: R, ) -> Result where T: ?Sized + ToStatement, R: Read, { let statement = query.__statement(self)?; self.0 .copy_in(&statement.0, params, CopyInStream(reader)) .wait() } pub fn copy_out( &mut self, query: &T, params: &[&dyn ToSql], ) -> Result, Error> where T: ?Sized + ToStatement, { let statement = query.__statement(self)?; let mut stream = self.0.copy_out(&statement.0, params).wait(); let cur = match stream.next() { Some(Ok(cur)) => cur, Some(Err(e)) => return Err(e), None => Bytes::new(), }; Ok(CopyOutReader { stream, cur: Cursor::new(cur), _p: PhantomData, }) } pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> { self.0.batch_execute(query).wait() } pub fn transaction(&mut self) -> Result, Error> { self.batch_execute("BEGIN")?; Ok(Transaction::new(self)) } pub fn is_closed(&self) -> bool { self.0.is_closed() } pub fn get_ref(&self) -> &tokio_postgres::Client { &self.0 } pub fn get_mut(&mut self) -> &mut tokio_postgres::Client { &mut self.0 } pub fn into_inner(self) -> tokio_postgres::Client { self.0 } } impl From for Client { fn from(c: tokio_postgres::Client) -> Client { Client(c) } } struct CopyInStream(R); impl Stream for CopyInStream where R: Read, { type Item = Vec; type Error = io::Error; fn poll(&mut self) -> Poll>, io::Error> { let mut buf = vec![]; match self.0.by_ref().take(4096).read_to_end(&mut buf)? { 0 => Ok(Async::Ready(None)), _ => Ok(Async::Ready(Some(buf))), } } } pub struct CopyOutReader<'a> { stream: stream::Wait, cur: Cursor, _p: PhantomData<&'a mut ()>, } // no-op impl to extend borrow until drop impl<'a> Drop for CopyOutReader<'a> { fn drop(&mut self) {} } impl<'a> Read for CopyOutReader<'a> { fn read(&mut self, buf: &mut [u8]) -> io::Result { let b = self.fill_buf()?; let len = usize::min(buf.len(), b.len()); buf[..len].copy_from_slice(&b[..len]); self.consume(len); Ok(len) } } impl<'a> BufRead for CopyOutReader<'a> { fn fill_buf(&mut self) -> io::Result<&[u8]> { if self.cur.remaining() == 0 { match self.stream.next() { Some(Ok(cur)) => self.cur = Cursor::new(cur), Some(Err(e)) => return Err(io::Error::new(io::ErrorKind::Other, e)), None => {} }; } Ok(Buf::bytes(&self.cur)) } fn consume(&mut self, amt: usize) { self.cur.advance(amt); } }