Skip to content

Commit 6071706

Browse files
committed
More work towards row limits
cc rust-postgres#13
1 parent e82c887 commit 6071706

2 files changed

Lines changed: 84 additions & 52 deletions

File tree

src/lib.rs

Lines changed: 82 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -387,11 +387,11 @@ impl<'self> Drop for PostgresStatement<'self> {
387387
fn drop(&self) {
388388
do io_error::cond.trap(|_| {}).inside {
389389
self.conn.write_messages([
390-
&Close {
391-
variant: 'S' as u8,
392-
name: self.name.as_slice()
393-
},
394-
&Sync]);
390+
&Close {
391+
variant: 'S' as u8,
392+
name: self.name.as_slice()
393+
},
394+
&Sync]);
395395
loop {
396396
match_read_message!(self.conn, {
397397
ReadyForQuery {_} => break,
@@ -403,8 +403,8 @@ impl<'self> Drop for PostgresStatement<'self> {
403403
}
404404
405405
impl<'self> PostgresStatement<'self> {
406-
fn execute(&self, portal_name: &str, params: &[&ToSql])
407-
-> Option<PostgresDbError> {
406+
fn bind(&self, portal_name: &str, params: &[&ToSql])
407+
-> Option<PostgresDbError> {
408408
let mut formats = ~[];
409409
let mut values = ~[];
410410
for (&param, &ty) in params.iter().zip(self.param_types.iter()) {
@@ -416,24 +416,24 @@ impl<'self> PostgresStatement<'self> {
416416
let result_formats = [];
417417
418418
self.conn.write_messages([
419-
&Bind {
420-
portal: portal_name,
421-
statement: self.name.as_slice(),
422-
formats: formats,
423-
values: values,
424-
result_formats: result_formats
425-
},
426-
&Execute {
427-
portal: portal_name.as_slice(),
428-
max_rows: 0
429-
},
430-
&Sync]);
419+
&Bind {
420+
portal: portal_name,
421+
statement: self.name.as_slice(),
422+
formats: formats,
423+
values: values,
424+
result_formats: result_formats
425+
},
426+
&Sync]);
431427
432-
match_read_message!(self.conn, {
428+
let ret = match_read_message!(self.conn, {
433429
BindComplete => None,
434430
ErrorResponse { fields } => Some(PostgresDbError::new(fields)),
435431
resp => fail!("Bad response: %?", resp.to_str())
436-
})
432+
});
433+
434+
self.conn.wait_for_ready();
435+
436+
ret
437437
}
438438
439439
pub fn update(&self, params: &[&ToSql]) -> uint {
@@ -446,14 +446,20 @@ impl<'self> PostgresStatement<'self> {
446446
pub fn try_update(&self, params: &[&ToSql])
447447
-> Result<uint, PostgresDbError> {
448448
// The unnamed portal is automatically cleaned up at sync time
449-
match self.execute("", params) {
449+
match self.bind("", params) {
450450
Some(err) => {
451-
self.conn.wait_for_ready();
452451
return Err(err);
453452
}
454453
None => ()
455454
}
456455
456+
self.conn.write_messages([
457+
&Execute {
458+
portal: &"",
459+
max_rows: 0
460+
},
461+
&Sync]);
462+
457463
let num;
458464
loop {
459465
match_read_message!(self.conn, {
@@ -496,56 +502,43 @@ impl<'self> PostgresStatement<'self> {
496502
let portal_name = format!("{:s}_portal_{}", self.name.as_slice(), id);
497503
self.next_portal_id.put_back(id + 1);
498504
499-
match self.execute(portal_name, params) {
505+
match self.bind(portal_name, params) {
500506
Some(err) => {
501507
self.conn.wait_for_ready();
502508
return Err(err);
503509
}
504510
None => ()
505511
}
506512
507-
let mut data = ~[];
508-
loop {
509-
match_read_message!(self.conn, {
510-
EmptyQueryResponse => break,
511-
DataRow { row } => data.push(row),
512-
CommandComplete {_} => break,
513-
NoticeResponse {_} => (),
514-
ErrorResponse { fields } => {
515-
self.conn.wait_for_ready();
516-
return Err(PostgresDbError::new(fields));
517-
},
518-
resp => fail!("Bad response: %?", resp.to_str())
519-
})
520-
}
521-
self.conn.wait_for_ready();
522-
523-
// we're going to be popping rows off
524-
data.reverse();
525-
Ok(PostgresResult {
513+
let mut result = PostgresResult {
526514
stmt: self,
527515
name: portal_name,
528-
data: data
529-
})
516+
data: ~[],
517+
more_rows: true
518+
};
519+
result.execute();
520+
521+
Ok(result)
530522
}
531523
}
532524
533525
pub struct PostgresResult<'self> {
534526
priv stmt: &'self PostgresStatement<'self>,
535527
priv name: ~str,
536-
priv data: ~[~[Option<~[u8]>]]
528+
priv data: ~[~[Option<~[u8]>]],
529+
priv more_rows: bool
537530
}
538531
539532
#[unsafe_destructor]
540533
impl<'self> Drop for PostgresResult<'self> {
541534
fn drop(&self) {
542535
do io_error::cond.trap(|_| {}).inside {
543536
self.stmt.conn.write_messages([
544-
&Close {
545-
variant: 'P' as u8,
546-
name: self.name.as_slice()
547-
},
548-
&Sync]);
537+
&Close {
538+
variant: 'P' as u8,
539+
name: self.name.as_slice()
540+
},
541+
&Sync]);
549542
loop {
550543
match_read_message!(self.stmt.conn, {
551544
ReadyForQuery {_} => break,
@@ -559,13 +552,50 @@ impl<'self> Drop for PostgresResult<'self> {
559552
impl<'self> Iterator<PostgresRow> for PostgresResult<'self> {
560553
fn next(&mut self) -> Option<PostgresRow> {
561554
if self.data.is_empty() {
562-
return None;
555+
if self.more_rows {
556+
self.execute();
557+
} else {
558+
return None;
559+
}
563560
}
564561
565562
Some(PostgresRow { data: self.data.pop() })
566563
}
567564
}
568565
566+
impl<'self> PostgresResult<'self> {
567+
fn execute(&mut self) {
568+
assert!(self.data.is_empty());
569+
570+
self.stmt.conn.write_messages([
571+
&Execute {
572+
portal: self.name,
573+
max_rows: 0
574+
},
575+
&Sync]);
576+
577+
loop {
578+
match_read_message!(self.stmt.conn, {
579+
EmptyQueryResponse |
580+
CommandComplete {_} => {
581+
self.more_rows = false;
582+
break;
583+
},
584+
DataRow { row } => self.data.push(row),
585+
PortalSuspended => {
586+
self.more_rows = true;
587+
break;
588+
},
589+
resp => fail!("Bad response: %?", resp.to_str())
590+
})
591+
}
592+
self.stmt.conn.wait_for_ready();
593+
594+
// we're going to be popping rows off
595+
self.data.reverse();
596+
}
597+
}
598+
569599
pub struct PostgresRow {
570600
priv data: ~[Option<~[u8]>]
571601
}

src/message.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub enum BackendMessage {
4545
value: ~str
4646
},
4747
ParseComplete,
48+
PortalSuspended,
4849
ReadyForQuery {
4950
state: u8
5051
},
@@ -261,6 +262,7 @@ impl<R: Reader> ReadMessage for R {
261262
'n' => NoData,
262263
'N' => NoticeResponse { fields: read_fields(&mut buf) },
263264
'R' => read_auth_message(&mut buf),
265+
's' => PortalSuspended,
264266
'S' => ParameterStatus {
265267
parameter: buf.read_string(),
266268
value: buf.read_string()

0 commit comments

Comments
 (0)