Skip to content

Commit a1fc412

Browse files
committed
Simplify sync copy_in
1 parent ec680b1 commit a1fc412

1 file changed

Lines changed: 13 additions & 86 deletions

File tree

postgres/src/client.rs

Lines changed: 13 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use bytes::{Buf, Bytes};
22
use futures::stream;
3-
use futures::sync::mpsc;
4-
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
3+
use futures::{Async, Future, Poll, Stream};
54
use std::io::{self, BufRead, Cursor, Read};
65
use std::marker::PhantomData;
76
use tokio_postgres::types::{ToSql, Type};
@@ -68,17 +67,9 @@ impl Client {
6867
R: Read,
6968
{
7069
let statement = query.__statement(self)?;
71-
let (sender, receiver) = mpsc::channel(1);
72-
let future = self.0.copy_in(&statement.0, params, CopyInStream(receiver));
73-
74-
CopyInFuture {
75-
future,
76-
sender,
77-
reader,
78-
pending: None,
79-
done: false,
80-
}
81-
.wait()
70+
self.0
71+
.copy_in(&statement.0, params, CopyInStream(reader))
72+
.wait()
8273
}
8374

8475
pub fn copy_out<T>(
@@ -125,84 +116,20 @@ impl From<tokio_postgres::Client> for Client {
125116
}
126117
}
127118

128-
enum CopyData {
129-
Data(Vec<u8>),
130-
Error(io::Error),
131-
Done,
132-
}
133-
134-
struct CopyInStream(mpsc::Receiver<CopyData>);
119+
struct CopyInStream<R>(R);
135120

136-
impl Stream for CopyInStream {
137-
type Item = Vec<u8>;
138-
type Error = io::Error;
139-
140-
fn poll(&mut self) -> Poll<Option<Vec<u8>>, io::Error> {
141-
match self.0.poll().expect("mpsc::Receiver can't error") {
142-
Async::Ready(Some(CopyData::Data(buf))) => Ok(Async::Ready(Some(buf))),
143-
Async::Ready(Some(CopyData::Error(e))) => Err(e),
144-
Async::Ready(Some(CopyData::Done)) => Ok(Async::Ready(None)),
145-
Async::Ready(None) => Err(io::Error::new(io::ErrorKind::Other, "writer disconnected")),
146-
Async::NotReady => Ok(Async::NotReady),
147-
}
148-
}
149-
}
150-
151-
struct CopyInFuture<R> {
152-
future: tokio_postgres::CopyIn<CopyInStream>,
153-
sender: mpsc::Sender<CopyData>,
154-
reader: R,
155-
pending: Option<CopyData>,
156-
done: bool,
157-
}
158-
159-
impl<R> Future for CopyInFuture<R>
121+
impl<R> Stream for CopyInStream<R>
160122
where
161123
R: Read,
162124
{
163-
type Item = u64;
164-
type Error = Error;
165-
166-
fn poll(&mut self) -> Poll<u64, Error> {
167-
loop {
168-
if let Async::Ready(n) = self.future.poll()? {
169-
return Ok(Async::Ready(n));
170-
}
171-
172-
let data = match self.pending.take() {
173-
Some(pending) => pending,
174-
None => {
175-
if self.done {
176-
continue;
177-
}
178-
179-
let mut buf = vec![];
180-
match self.reader.by_ref().take(4096).read_to_end(&mut buf) {
181-
Ok(0) => {
182-
self.done = true;
183-
CopyData::Done
184-
}
185-
Ok(_) => CopyData::Data(buf),
186-
Err(e) => {
187-
self.done = true;
188-
CopyData::Error(e)
189-
}
190-
}
191-
}
192-
};
125+
type Item = Vec<u8>;
126+
type Error = io::Error;
193127

194-
match self.sender.start_send(data) {
195-
Ok(AsyncSink::Ready) => {}
196-
Ok(AsyncSink::NotReady(pending)) => {
197-
self.pending = Some(pending);
198-
return Ok(Async::NotReady);
199-
}
200-
// the future's hung up on its end of the channel, so we'll wait for it to error
201-
Err(_) => {
202-
self.done = true;
203-
return Ok(Async::NotReady);
204-
}
205-
}
128+
fn poll(&mut self) -> Poll<Option<Vec<u8>>, io::Error> {
129+
let mut buf = vec![];
130+
match self.0.by_ref().take(4096).read_to_end(&mut buf)? {
131+
0 => Ok(Async::Ready(None)),
132+
_ => Ok(Async::Ready(Some(buf))),
206133
}
207134
}
208135
}

0 commit comments

Comments
 (0)