@@ -99,21 +99,22 @@ where
9999 } ;
100100 let mut responses = start ( client, buf) . await ?;
101101
102+ let mut rows = 0 ;
102103 loop {
103104 match responses. next ( ) . await ? {
104105 Message :: DataRow ( _) => { }
105106 Message :: CommandComplete ( body) => {
106- let rows = body
107+ rows = body
107108 . tag ( )
108109 . map_err ( Error :: parse) ?
109110 . rsplit ( ' ' )
110111 . next ( )
111112 . unwrap ( )
112113 . parse ( )
113114 . unwrap_or ( 0 ) ;
114- return Ok ( rows) ;
115115 }
116- Message :: EmptyQueryResponse => return Ok ( 0 ) ,
116+ Message :: EmptyQueryResponse => rows = 0 ,
117+ Message :: ReadyForQuery ( _) => return Ok ( rows) ,
117118 _ => return Err ( Error :: unexpected_message ( ) ) ,
118119 }
119120 }
@@ -203,15 +204,17 @@ impl Stream for RowStream {
203204
204205 fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
205206 let this = self . project ( ) ;
206- match ready ! ( this. responses. poll_next( cx) ?) {
207- Message :: DataRow ( body) => {
208- Poll :: Ready ( Some ( Ok ( Row :: new ( this. statement . clone ( ) , body) ?) ) )
207+ loop {
208+ match ready ! ( this. responses. poll_next( cx) ?) {
209+ Message :: DataRow ( body) => {
210+ return Poll :: Ready ( Some ( Ok ( Row :: new ( this. statement . clone ( ) , body) ?) ) )
211+ }
212+ Message :: EmptyQueryResponse
213+ | Message :: CommandComplete ( _)
214+ | Message :: PortalSuspended => { }
215+ Message :: ReadyForQuery ( _) => return Poll :: Ready ( None ) ,
216+ _ => return Poll :: Ready ( Some ( Err ( Error :: unexpected_message ( ) ) ) ) ,
209217 }
210- Message :: EmptyQueryResponse
211- | Message :: CommandComplete ( _)
212- | Message :: PortalSuspended => Poll :: Ready ( None ) ,
213- Message :: ErrorResponse ( body) => Poll :: Ready ( Some ( Err ( Error :: db ( body) ) ) ) ,
214- _ => Poll :: Ready ( Some ( Err ( Error :: unexpected_message ( ) ) ) ) ,
215218 }
216219 }
217220}
0 commit comments