11extern crate fallible_iterator;
22extern crate futures;
3+ extern crate futures_state_stream;
34extern crate postgres_shared;
45extern crate postgres_protocol;
56extern crate tokio_core;
@@ -9,6 +10,7 @@ extern crate tokio_uds;
910use fallible_iterator:: FallibleIterator ;
1011use futures:: { Future , IntoFuture , BoxFuture , Stream , Sink , Poll , StartSend } ;
1112use futures:: future:: Either ;
13+ use futures_state_stream:: { StreamEvent , StateStream , BoxStateStream , FutureExt } ;
1214use postgres_protocol:: authentication;
1315use postgres_protocol:: message:: { backend, frontend} ;
1416use postgres_protocol:: message:: backend:: { ErrorResponseBody , ErrorFields } ;
@@ -562,6 +564,30 @@ impl Connection {
562564 . boxed ( )
563565 }
564566
567+ fn read_row ( self ) -> BoxFuture < ( Option < RowData > , Connection ) , Error > {
568+ self . 0 . read ( )
569+ . map_err ( Error :: Io )
570+ . and_then ( |( m, s) | {
571+ let c = Connection ( s) ;
572+ match m {
573+ backend:: Message :: DataRow ( body) => {
574+ Either :: A ( body. values ( )
575+ . collect ( )
576+ . map ( |r| ( Some ( r) , c) )
577+ . map_err ( Error :: Io )
578+ . into_future ( ) )
579+ }
580+ backend:: Message :: EmptyQueryResponse |
581+ backend:: Message :: CommandComplete ( _) => Either :: A ( Ok ( ( None , c) ) . into_future ( ) ) ,
582+ backend:: Message :: ErrorResponse ( body) => {
583+ Either :: B ( c. ready_err ( body) )
584+ }
585+ _ => Either :: A ( Err ( bad_message ( ) ) . into_future ( ) ) ,
586+ }
587+ } )
588+ . boxed ( )
589+ }
590+
565591 pub fn prepare ( mut self , query : & str ) -> BoxFuture < ( Statement , Connection ) , Error > {
566592 let id = self . 0 . next_stmt_id ;
567593 self . 0 . next_stmt_id += 1 ;
@@ -585,6 +611,34 @@ impl Connection {
585611 . boxed ( )
586612 }
587613
614+ pub fn query ( self ,
615+ statement : & Statement ,
616+ params : & [ & ToSql ] )
617+ -> BoxStateStream < Row , Connection , Error > {
618+ let columns = statement. columns . clone ( ) ;
619+ self . raw_execute ( & statement. name , "" , & statement. params , params)
620+ . map ( |c| {
621+ futures_state_stream:: unfold ( ( c, columns) , |( c, columns) | {
622+ c. read_row ( )
623+ . and_then ( |( r, c) | {
624+ match r {
625+ Some ( data) => {
626+ let row = Row {
627+ columns : columns. clone ( ) ,
628+ data : data,
629+ } ;
630+ let event = StreamEvent :: Next ( ( row, ( c, columns) ) ) ;
631+ Either :: A ( Ok ( event) . into_future ( ) )
632+ } ,
633+ None => Either :: B ( c. ready ( ( ) ) . map ( |( ( ) , c) | StreamEvent :: Done ( c) ) ) ,
634+ }
635+ } )
636+ } )
637+ } )
638+ . flatten_state_stream ( )
639+ . boxed ( )
640+ }
641+
588642 pub fn close ( self ) -> BoxFuture < ( ) , Error > {
589643 let mut terminate = vec ! [ ] ;
590644 frontend:: terminate ( & mut terminate) ;
0 commit comments