Skip to content

Commit 89501f6

Browse files
committed
Start on std::futures rewrite
connect_raw works!
1 parent d91f9d8 commit 89501f6

12 files changed

Lines changed: 1633 additions & 103 deletions

File tree

tokio-postgres/Cargo.toml

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ circle-ci = { repository = "sfackler/rust-postgres" }
2121

2222
[features]
2323
default = ["runtime"]
24-
runtime = ["tokio-tcp", "tokio-timer", "tokio-uds", "tokio-threadpool", "lazy_static"]
24+
runtime = ["tokio/rt-full", "tokio/tcp", "tokio/uds", "tokio-threadpool", "lazy_static"]
2525

2626
"with-bit-vec-0_5" = ["bit-vec-05"]
2727
"with-chrono-0_4" = ["chrono-04"]
@@ -34,17 +34,15 @@ with-serde_json-1 = ["serde-1", "serde_json-1"]
3434
antidote = "1.0"
3535
bytes = "0.4"
3636
fallible-iterator = "0.2"
37+
futures-preview = "0.3.0-alpha.17"
3738
log = "0.4"
3839
percent-encoding = "1.0"
3940
phf = "0.7.23"
4041
postgres-protocol = { version = "0.4.1", path = "../postgres-protocol" }
41-
tokio-codec = { git = "https://github.com/tokio-rs/tokio" }
42-
tokio-io = { git = "https://github.com/tokio-rs/tokio" }
42+
tokio = { git = "https://github.com/tokio-rs/tokio", default-features = false, features = ["io", "codec"] }
4343

44-
tokio-tcp = { git = "https://github.com/tokio-rs/tokio", optional = true }
4544
tokio-threadpool = { git = "https://github.com/tokio-rs/tokio", optional = true }
4645
lazy_static = { version = "1.0", optional = true }
47-
tokio-timer = { git = "https://github.com/tokio-rs/tokio", optional = true }
4846

4947
bit-vec-05 = { version = "0.5", package = "bit-vec", optional = true }
5048
chrono-04 = { version = "0.4", package = "chrono", optional = true }
@@ -54,9 +52,6 @@ serde-1 = { version = "1.0", package = "serde", optional = true }
5452
serde_json-1 = { version = "1.0", package = "serde_json", optional = true }
5553
uuid-07 = { version = "0.7", package = "uuid", optional = true }
5654

57-
[target.'cfg(unix)'.dependencies]
58-
tokio-uds = { git = "https://github.com/tokio-rs/tokio", optional = true }
59-
6055
[dev-dependencies]
6156
tokio = { git = "https://github.com/tokio-rs/tokio" }
6257
env_logger = "0.5"

tokio-postgres/src/client.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use crate::connection::Request;
2+
use futures::channel::mpsc;
3+
4+
pub struct Client {
5+
sender: mpsc::UnboundedSender<Request>,
6+
process_id: i32,
7+
secret_key: i32,
8+
}
9+
10+
impl Client {
11+
pub(crate) fn new(
12+
sender: mpsc::UnboundedSender<Request>,
13+
process_id: i32,
14+
secret_key: i32,
15+
) -> Client {
16+
Client {
17+
sender,
18+
process_id,
19+
secret_key,
20+
}
21+
}
22+
}

tokio-postgres/src/codec.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use bytes::{Buf, BytesMut};
2+
use fallible_iterator::FallibleIterator;
3+
use postgres_protocol::message::backend;
4+
use postgres_protocol::message::frontend::CopyData;
5+
use std::io;
6+
use tokio::codec::{Decoder, Encoder};
7+
8+
pub enum FrontendMessage {
9+
Raw(Vec<u8>),
10+
CopyData(CopyData<Box<dyn Buf + Send>>),
11+
}
12+
13+
pub enum BackendMessage {
14+
Normal {
15+
messages: BackendMessages,
16+
request_complete: bool,
17+
},
18+
Async(backend::Message),
19+
}
20+
21+
pub struct BackendMessages(BytesMut);
22+
23+
impl BackendMessages {
24+
pub fn empty() -> BackendMessages {
25+
BackendMessages(BytesMut::new())
26+
}
27+
}
28+
29+
impl FallibleIterator for BackendMessages {
30+
type Item = backend::Message;
31+
type Error = io::Error;
32+
33+
fn next(&mut self) -> io::Result<Option<backend::Message>> {
34+
backend::Message::parse(&mut self.0)
35+
}
36+
}
37+
38+
pub struct PostgresCodec;
39+
40+
impl Encoder for PostgresCodec {
41+
type Item = FrontendMessage;
42+
type Error = io::Error;
43+
44+
fn encode(&mut self, item: FrontendMessage, dst: &mut BytesMut) -> io::Result<()> {
45+
match item {
46+
FrontendMessage::Raw(buf) => dst.extend_from_slice(&buf),
47+
FrontendMessage::CopyData(data) => data.write(dst),
48+
}
49+
50+
Ok(())
51+
}
52+
}
53+
54+
impl Decoder for PostgresCodec {
55+
type Item = BackendMessage;
56+
type Error = io::Error;
57+
58+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BackendMessage>, io::Error> {
59+
let mut idx = 0;
60+
let mut request_complete = false;
61+
62+
while let Some(header) = backend::Header::parse(&src[idx..])? {
63+
let len = header.len() as usize + 1;
64+
if src[idx..].len() < len {
65+
break;
66+
}
67+
68+
match header.tag() {
69+
backend::NOTICE_RESPONSE_TAG
70+
| backend::NOTIFICATION_RESPONSE_TAG
71+
| backend::PARAMETER_STATUS_TAG => {
72+
if idx == 0 {
73+
let message = backend::Message::parse(src)?.unwrap();
74+
return Ok(Some(BackendMessage::Async(message)));
75+
} else {
76+
break;
77+
}
78+
}
79+
_ => {}
80+
}
81+
82+
idx += len;
83+
84+
if header.tag() == backend::READY_FOR_QUERY_TAG {
85+
request_complete = true;
86+
break;
87+
}
88+
}
89+
90+
if idx == 0 {
91+
Ok(None)
92+
} else {
93+
Ok(Some(BackendMessage::Normal {
94+
messages: BackendMessages(src.split_to(idx)),
95+
request_complete,
96+
}))
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)