Skip to content

Commit db771e8

Browse files
committed
Switch copy_in to use Buf
1 parent 43e6598 commit db771e8

3 files changed

Lines changed: 20 additions & 9 deletions

File tree

tokio-postgres/src/lib.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ extern crate log;
1616
#[macro_use]
1717
extern crate state_machine_future;
1818

19-
use bytes::Bytes;
19+
use bytes::{Bytes, IntoBuf};
2020
use futures::{Async, Future, Poll, Stream};
2121
use postgres_shared::rows::RowIndex;
2222
use std::error::Error as StdError;
@@ -90,7 +90,8 @@ impl Client {
9090
pub fn copy_in<S>(&mut self, statement: &Statement, params: &[&ToSql], stream: S) -> CopyIn<S>
9191
where
9292
S: Stream,
93-
S::Item: AsRef<[u8]>,
93+
S::Item: IntoBuf,
94+
<S::Item as IntoBuf>::Buf: Send,
9495
// FIXME error type?
9596
S::Error: Into<Box<StdError + Sync + Send>>,
9697
{
@@ -283,13 +284,15 @@ pub struct Portal(proto::Portal);
283284
pub struct CopyIn<S>(proto::CopyInFuture<S>)
284285
where
285286
S: Stream,
286-
S::Item: AsRef<[u8]>,
287+
S::Item: IntoBuf,
288+
<S::Item as IntoBuf>::Buf: Send,
287289
S::Error: Into<Box<StdError + Sync + Send>>;
288290

289291
impl<S> Future for CopyIn<S>
290292
where
291293
S: Stream,
292-
S::Item: AsRef<[u8]>,
294+
S::Item: IntoBuf,
295+
<S::Item as IntoBuf>::Buf: Send,
293296
S::Error: Into<Box<StdError + Sync + Send>>,
294297
{
295298
type Item = u64;

tokio-postgres/src/proto/client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use antidote::Mutex;
2+
use bytes::IntoBuf;
23
use futures::sync::mpsc;
34
use futures::{AsyncSink, Sink, Stream};
45
use postgres_protocol;
@@ -163,7 +164,8 @@ impl Client {
163164
pub fn copy_in<S>(&self, statement: &Statement, params: &[&ToSql], stream: S) -> CopyInFuture<S>
164165
where
165166
S: Stream,
166-
S::Item: AsRef<[u8]>,
167+
S::Item: IntoBuf,
168+
<S::Item as IntoBuf>::Buf: Send,
167169
S::Error: Into<Box<StdError + Sync + Send>>,
168170
{
169171
let (mut sender, receiver) = mpsc::channel(0);

tokio-postgres/src/proto/copy_in.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use bytes::{Buf, IntoBuf};
12
use futures::sink;
23
use futures::sync::mpsc;
34
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
@@ -63,7 +64,8 @@ impl Stream for CopyInReceiver {
6364
pub enum CopyIn<S>
6465
where
6566
S: Stream,
66-
S::Item: AsRef<[u8]>,
67+
S::Item: IntoBuf,
68+
<S::Item as IntoBuf>::Buf: Send,
6769
S::Error: Into<Box<StdError + Sync + Send>>,
6870
{
6971
#[state_machine_future(start, transitions(ReadCopyInResponse))]
@@ -103,7 +105,8 @@ where
103105
impl<S> PollCopyIn<S> for CopyIn<S>
104106
where
105107
S: Stream,
106-
S::Item: AsRef<[u8]>,
108+
S::Item: IntoBuf,
109+
<S::Item as IntoBuf>::Buf: Send,
107110
S::Error: Into<Box<StdError + Sync + Send>>,
108111
{
109112
fn poll_start<'a>(state: &'a mut RentToOwn<'a, Start<S>>) -> Poll<AfterStart<S>, Error> {
@@ -151,7 +154,9 @@ where
151154
None => match try_ready!(state.stream.poll().map_err(Error::copy_in_stream)) {
152155
Some(data) => {
153156
let mut buf = vec![];
154-
frontend::copy_data(data.as_ref(), &mut buf).map_err(Error::encode)?;
157+
// FIXME avoid collect
158+
frontend::copy_data(&data.into_buf().collect::<Vec<_>>(), &mut buf)
159+
.map_err(Error::encode)?;
155160
CopyMessage::Data(buf)
156161
}
157162
None => {
@@ -213,7 +218,8 @@ where
213218
impl<S> CopyInFuture<S>
214219
where
215220
S: Stream,
216-
S::Item: AsRef<[u8]>,
221+
S::Item: IntoBuf,
222+
<S::Item as IntoBuf>::Buf: Send,
217223
S::Error: Into<Box<StdError + Sync + Send>>,
218224
{
219225
pub fn new(

0 commit comments

Comments
 (0)