Skip to content

Commit c0d70e9

Browse files
committed
Implement transactional lazy queries
Closes rust-postgres#13
1 parent 47d1458 commit c0d70e9

2 files changed

Lines changed: 83 additions & 24 deletions

File tree

src/lib.rs

Lines changed: 81 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
extern mod extra;
66

7+
use extra::container::Deque;
78
use extra::digest::Digest;
9+
use extra::ringbuf::RingBuf;
810
use extra::md5::Md5;
911
use extra::url::{UserInfo, Url};
1012
use std::cell::Cell;
@@ -308,6 +310,7 @@ impl PostgresConnection {
308310
name: stmt_name,
309311
param_types: param_types,
310312
result_desc: result_desc,
313+
next_portal_id: Cell::new(0)
311314
})
312315
}
313316
@@ -423,6 +426,7 @@ pub struct NormalPostgresStatement<'self> {
423426
priv name: ~str,
424427
priv param_types: ~[Oid],
425428
priv result_desc: ~[RowDescriptionEntry],
429+
priv next_portal_id: Cell<uint>
426430
}
427431
428432
#[unsafe_destructor]
@@ -446,7 +450,7 @@ impl<'self> Drop for NormalPostgresStatement<'self> {
446450
}
447451
448452
impl<'self> NormalPostgresStatement<'self> {
449-
fn execute(&self, portal_name: &str, params: &[&ToSql])
453+
fn execute(&self, portal_name: &str, row_limit: uint, params: &[&ToSql])
450454
-> Option<PostgresDbError> {
451455
let mut formats = ~[];
452456
let mut values = ~[];
@@ -470,7 +474,7 @@ impl<'self> NormalPostgresStatement<'self> {
470474
},
471475
&Execute {
472476
portal: portal_name,
473-
max_rows: 0
477+
max_rows: row_limit as i32
474478
},
475479
&Sync]);
476480
@@ -491,33 +495,29 @@ impl<'self> NormalPostgresStatement<'self> {
491495
}
492496
}
493497
494-
fn try_lazy_query<'a>(&'a self, _row_limit: uint, params: &[&ToSql])
498+
fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql])
495499
-> Result<PostgresResult<'a>, PostgresDbError> {
496-
match self.execute("", params) {
500+
let id = self.next_portal_id.take();
501+
let portal_name = format!("{}_portal_{}", self.name, id);
502+
self.next_portal_id.put_back(id + 1);
503+
504+
match self.execute(portal_name, row_limit, params) {
497505
Some(err) => {
498506
return Err(err);
499507
}
500508
None => ()
501509
}
502510
503-
let mut data = ~[];
504-
loop {
505-
match_read_message_or_fail!(self.conn, {
506-
EmptyQueryResponse |
507-
CommandComplete {_} => {
508-
break;
509-
},
510-
DataRow { row } => data.push(row)
511-
})
512-
}
513-
self.conn.wait_for_ready();
514-
515-
// we're going to be popping off
516-
data.reverse();
517-
Ok(PostgresResult {
511+
let mut result = PostgresResult {
518512
stmt: self,
519-
data: data,
520-
})
513+
name: portal_name,
514+
data: RingBuf::new(),
515+
row_limit: row_limit,
516+
more_rows: true
517+
};
518+
result.load_rows();
519+
520+
Ok(result)
521521
}
522522
}
523523
@@ -535,7 +535,7 @@ impl<'self> PostgresStatement for NormalPostgresStatement<'self> {
535535
536536
fn try_update(&self, params: &[&ToSql])
537537
-> Result<uint, PostgresDbError> {
538-
match self.execute("", params) {
538+
match self.execute("", 0, params) {
539539
Some(err) => {
540540
return Err(err);
541541
}
@@ -632,12 +632,69 @@ impl<'self> TransactionalPostgresStatement<'self> {
632632
633633
pub struct PostgresResult<'self> {
634634
priv stmt: &'self NormalPostgresStatement<'self>,
635-
priv data: ~[~[Option<~[u8]>]]
635+
priv name: ~str,
636+
priv data: RingBuf<~[Option<~[u8]>]>,
637+
priv row_limit: uint,
638+
priv more_rows: bool
639+
}
640+
641+
#[unsafe_destructor]
642+
impl<'self> Drop for PostgresResult<'self> {
643+
fn drop(&self) {
644+
do io_error::cond.trap(|_| {}).inside {
645+
self.stmt.conn.write_messages([
646+
&Close {
647+
variant: 'P' as u8,
648+
name: self.name.as_slice()
649+
},
650+
&Sync]);
651+
loop {
652+
match_read_message!(self.stmt.conn, {
653+
ReadyForQuery {_} => break,
654+
_ => ()
655+
})
656+
}
657+
}
658+
}
659+
}
660+
661+
impl<'self> PostgresResult<'self> {
662+
fn load_rows(&mut self) {
663+
loop {
664+
match_read_message_or_fail!(self.stmt.conn, {
665+
EmptyQueryResponse |
666+
CommandComplete {_} => {
667+
self.more_rows = false;
668+
break;
669+
},
670+
PortalSuspended => {
671+
self.more_rows = true;
672+
break;
673+
},
674+
DataRow { row } => self.data.push_back(row)
675+
})
676+
}
677+
self.stmt.conn.wait_for_ready();
678+
}
679+
680+
fn pull_rows(&mut self) {
681+
self.stmt.conn.write_messages([
682+
&Execute {
683+
portal: self.name,
684+
max_rows: self.row_limit as i32
685+
},
686+
&Sync]);
687+
self.load_rows();
688+
}
636689
}
637690
638691
impl<'self> Iterator<PostgresRow<'self>> for PostgresResult<'self> {
639692
fn next(&mut self) -> Option<PostgresRow<'self>> {
640-
do self.data.pop_opt().map_move |row| {
693+
if self.data.is_empty() && self.more_rows {
694+
self.pull_rows();
695+
}
696+
697+
do self.data.pop_front().map_move |row| {
641698
PostgresRow {
642699
stmt: self.stmt,
643700
data: row

src/message.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub enum BackendMessage {
4545
value: ~str
4646
},
4747
ParseComplete,
48+
PortalSuspended,
4849
ReadyForQuery {
4950
state: u8
5051
},
@@ -261,6 +262,7 @@ impl<R: Reader> ReadMessage for R {
261262
'n' => NoData,
262263
'N' => NoticeResponse { fields: read_fields(&mut buf) },
263264
'R' => read_auth_message(&mut buf),
265+
's' => PortalSuspended,
264266
'S' => ParameterStatus {
265267
parameter: buf.read_string(),
266268
value: buf.read_string()

0 commit comments

Comments
 (0)