@@ -6,11 +6,10 @@ use std::collections::{HashMap, VecDeque};
66use std:: io;
77use tokio_codec:: Framed ;
88
9- use disconnected;
10- use error:: { self , Error } ;
9+ use error:: { self , DbError , Error } ;
1110use proto:: codec:: PostgresCodec ;
1211use tls:: TlsStream ;
13- use { bad_response, CancelData } ;
12+ use { bad_response, disconnected , AsyncMessage , CancelData , Notification } ;
1413
1514pub struct Request {
1615 pub messages : Vec < u8 > ,
@@ -71,10 +70,10 @@ impl Connection {
7170 self . stream . poll ( )
7271 }
7372
74- fn poll_read ( & mut self ) -> Result < ( ) , Error > {
73+ fn poll_read ( & mut self ) -> Result < Option < AsyncMessage > , Error > {
7574 if self . state != State :: Active {
7675 trace ! ( "poll_read: done" ) ;
77- return Ok ( ( ) ) ;
76+ return Ok ( None ) ;
7877 }
7978
8079 loop {
@@ -85,14 +84,22 @@ impl Connection {
8584 }
8685 Async :: NotReady => {
8786 trace ! ( "poll_read: waiting on response" ) ;
88- return Ok ( ( ) ) ;
87+ return Ok ( None ) ;
8988 }
9089 } ;
9190
9291 let message = match message {
93- Message :: NoticeResponse ( _) | Message :: NotificationResponse ( _) => {
94- // FIXME handle these
95- continue ;
92+ Message :: NoticeResponse ( body) => {
93+ let error = DbError :: new ( & mut body. fields ( ) ) ?;
94+ return Ok ( Some ( AsyncMessage :: Notice ( error) ) ) ;
95+ }
96+ Message :: NotificationResponse ( body) => {
97+ let notification = Notification {
98+ process_id : body. process_id ( ) ,
99+ channel : body. channel ( ) ?. to_string ( ) ,
100+ payload : body. message ( ) ?. to_string ( ) ,
101+ } ;
102+ return Ok ( Some ( AsyncMessage :: Notification ( notification) ) ) ;
96103 }
97104 Message :: ParameterStatus ( body) => {
98105 self . parameters
@@ -127,7 +134,7 @@ impl Connection {
127134 self . responses . push_front ( sender) ;
128135 self . pending_response = Some ( message) ;
129136 trace ! ( "poll_read: waiting on socket" ) ;
130- return Ok ( ( ) ) ;
137+ return Ok ( None ) ;
131138 }
132139 }
133140 }
@@ -225,18 +232,26 @@ impl Connection {
225232 Err ( e) => Err ( Error :: from ( e) ) ,
226233 }
227234 }
235+
236+ pub fn poll_message ( & mut self ) -> Poll < Option < AsyncMessage > , Error > {
237+ let message = self . poll_read ( ) ?;
238+ let want_flush = self . poll_write ( ) ?;
239+ if want_flush {
240+ self . poll_flush ( ) ?;
241+ }
242+ match message {
243+ Some ( message) => Ok ( Async :: Ready ( Some ( message) ) ) ,
244+ None => self . poll_shutdown ( ) . map ( |r| r. map ( |( ) | None ) ) ,
245+ }
246+ }
228247}
229248
230249impl Future for Connection {
231250 type Item = ( ) ;
232251 type Error = Error ;
233252
234253 fn poll ( & mut self ) -> Poll < ( ) , Error > {
235- self . poll_read ( ) ?;
236- let want_flush = self . poll_write ( ) ?;
237- if want_flush {
238- self . poll_flush ( ) ?;
239- }
240- self . poll_shutdown ( )
254+ while let Some ( _) = try_ready ! ( self . poll_message( ) ) { }
255+ Ok ( Async :: Ready ( ( ) ) )
241256 }
242257}
0 commit comments