forked from rust-postgres/rust-postgres
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquery.rs
More file actions
124 lines (109 loc) · 3.63 KB
/
Copy pathquery.rs
File metadata and controls
124 lines (109 loc) · 3.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::{IsNull, ToSql};
use crate::{Error, Row, Statement};
use futures::{ready, Stream, TryFutureExt};
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
pub fn query(
client: Arc<InnerClient>,
statement: Statement,
buf: Result<Vec<u8>, Error>,
) -> impl Stream<Item = Result<Row, Error>> {
start(client, buf)
.map_ok(|responses| Query {
statement,
responses,
})
.try_flatten_stream()
}
pub async fn execute(client: Arc<InnerClient>, buf: Result<Vec<u8>, Error>) -> Result<u64, Error> {
let mut responses = start(client, buf).await?;
loop {
match responses.next().await? {
Message::DataRow(_) => {}
Message::CommandComplete(body) => {
let rows = body
.tag()
.map_err(Error::parse)?
.rsplit(' ')
.next()
.unwrap()
.parse()
.unwrap_or(0);
return Ok(rows);
}
Message::EmptyQueryResponse => return Ok(0),
_ => return Err(Error::unexpected_message()),
}
}
}
async fn start(client: Arc<InnerClient>, buf: Result<Vec<u8>, Error>) -> Result<Responses, Error> {
let buf = buf?;
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
match responses.next().await? {
Message::BindComplete => {}
_ => return Err(Error::unexpected_message()),
}
Ok(responses)
}
pub fn encode<'a, I>(statement: &Statement, params: I) -> Result<Vec<u8>, Error>
where
I: IntoIterator<Item = &'a dyn ToSql>,
I::IntoIter: ExactSizeIterator,
{
let params = params.into_iter();
assert!(
statement.params().len() == params.len(),
"expected {} parameters but got {}",
statement.params().len(),
params.len()
);
let mut buf = vec![];
let mut error_idx = 0;
let r = frontend::bind(
"",
statement.name(),
Some(1),
params.zip(statement.params()).enumerate(),
|(idx, (param, ty)), buf| match param.to_sql_checked(ty, buf) {
Ok(IsNull::No) => Ok(postgres_protocol::IsNull::No),
Ok(IsNull::Yes) => Ok(postgres_protocol::IsNull::Yes),
Err(e) => {
error_idx = idx;
Err(e)
}
},
Some(1),
&mut buf,
);
match r {
Ok(()) => {}
Err(frontend::BindError::Conversion(e)) => return Err(Error::to_sql(e, error_idx)),
Err(frontend::BindError::Serialization(e)) => return Err(Error::encode(e)),
}
frontend::execute("", 0, &mut buf).map_err(Error::encode)?;
frontend::sync(&mut buf);
Ok(buf)
}
struct Query {
statement: Statement,
responses: Responses,
}
impl Stream for Query {
type Item = Result<Row, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(self.responses.poll_next(cx)?) {
Message::DataRow(body) => {
Poll::Ready(Some(Ok(Row::new(self.statement.clone(), body)?)))
}
Message::EmptyQueryResponse | Message::CommandComplete(_) => Poll::Ready(None),
Message::ErrorResponse(body) => Poll::Ready(Some(Err(Error::db(body)))),
_ => Poll::Ready(Some(Err(Error::unexpected_message()))),
}
}
}