5555#![ warn( missing_docs) ]
5656
5757extern crate fallible_iterator;
58- extern crate futures;
5958extern crate futures_state_stream;
6059extern crate postgres_shared;
6160extern crate postgres_protocol;
6261extern crate tokio_core;
6362extern crate tokio_dns;
6463
64+ #[ macro_use]
65+ extern crate futures;
66+
6567#[ cfg( unix) ]
6668extern crate tokio_uds;
6769
@@ -71,23 +73,24 @@ extern crate tokio_openssl;
7173extern crate openssl;
7274
7375use fallible_iterator:: FallibleIterator ;
74- use futures:: { Future , IntoFuture , BoxFuture , Stream , Sink , Poll , StartSend } ;
76+ use futures:: { Future , IntoFuture , BoxFuture , Stream , Sink , Poll , StartSend , Async } ;
7577use futures:: future:: Either ;
7678use futures_state_stream:: { StreamEvent , StateStream , BoxStateStream , FutureExt } ;
7779use postgres_protocol:: authentication;
7880use postgres_protocol:: message:: { backend, frontend} ;
7981use postgres_protocol:: message:: backend:: { ErrorResponseBody , ErrorFields } ;
8082use postgres_shared:: rows:: RowData ;
81- use std:: collections:: HashMap ;
83+ use std:: collections:: { HashMap , VecDeque } ;
8284use std:: fmt;
8385use std:: io;
8486use std:: sync:: Arc ;
8587use std:: sync:: atomic:: { AtomicUsize , ATOMIC_USIZE_INIT , Ordering } ;
8688use std:: sync:: mpsc:: { self , Sender , Receiver } ;
89+ use tokio_core:: io:: IoFuture ;
8790use tokio_core:: reactor:: Handle ;
8891
8992#[ doc( inline) ]
90- pub use postgres_shared:: { params, CancelData } ;
93+ pub use postgres_shared:: { params, CancelData , Notification } ;
9194
9295use error:: { ConnectError , Error , DbError , SqlState } ;
9396use params:: { ConnectParams , IntoConnectParams } ;
@@ -166,32 +169,35 @@ struct InnerConnection {
166169 close_sender : Sender < ( u8 , String ) > ,
167170 parameters : HashMap < String , String > ,
168171 types : HashMap < Oid , Other > ,
172+ notifications : VecDeque < Notification > ,
169173 cancel_data : CancelData ,
170174 has_typeinfo_query : bool ,
171175 has_typeinfo_enum_query : bool ,
172176 has_typeinfo_composite_query : bool ,
173177}
174178
175179impl InnerConnection {
176- fn read ( self ) -> BoxFuture < ( backend:: Message < Vec < u8 > > , InnerConnection ) , io :: Error > {
180+ fn read ( self ) -> IoFuture < ( backend:: Message < Vec < u8 > > , InnerConnection ) > {
177181 self . into_future ( )
178182 . map_err ( |e| e. 0 )
179183 . and_then ( |( m, mut s) | {
180184 match m {
181- Some ( backend:: Message :: ParameterStatus ( body) ) => {
182- let name = match body. name ( ) {
183- Ok ( name) => name. to_owned ( ) ,
185+ Some ( backend:: Message :: NotificationResponse ( body) ) => {
186+ let process_id = body. process_id ( ) ;
187+ let channel = match body. channel ( ) {
188+ Ok ( channel) => channel. to_owned ( ) ,
184189 Err ( e) => return Either :: A ( Err ( e) . into_future ( ) ) ,
185190 } ;
186- let value = match body. value ( ) {
187- Ok ( value ) => value . to_owned ( ) ,
191+ let message = match body. message ( ) {
192+ Ok ( channel ) => channel . to_owned ( ) ,
188193 Err ( e) => return Either :: A ( Err ( e) . into_future ( ) ) ,
189194 } ;
190- s. parameters . insert ( name, value) ;
191- Either :: B ( s. read ( ) )
192- }
193- Some ( backend:: Message :: NoticeResponse ( _) ) => {
194- // TODO forward the error
195+ let notification = Notification {
196+ process_id : process_id,
197+ channel : channel,
198+ payload : message,
199+ } ;
200+ s. notifications . push_back ( notification) ;
195201 Either :: B ( s. read ( ) )
196202 }
197203 Some ( m) => Either :: A ( Ok ( ( m, s) ) . into_future ( ) ) ,
@@ -210,7 +216,18 @@ impl Stream for InnerConnection {
210216 type Error = io:: Error ;
211217
212218 fn poll ( & mut self ) -> Poll < Option < backend:: Message < Vec < u8 > > > , io:: Error > {
213- self . stream . poll ( )
219+ loop {
220+ match try_ready ! ( self . stream. poll( ) ) {
221+ Some ( backend:: Message :: ParameterStatus ( body) ) => {
222+ let name = body. name ( ) ?. to_owned ( ) ;
223+ let value = body. value ( ) ?. to_owned ( ) ;
224+ self . parameters . insert ( name, value) ;
225+ }
226+ // TODO forward to a handler
227+ Some ( backend:: Message :: NoticeResponse ( _) ) => { }
228+ msg => return Ok ( Async :: Ready ( msg) ) ,
229+ }
230+ }
214231 }
215232}
216233
@@ -279,6 +296,7 @@ impl Connection {
279296 close_receiver : receiver,
280297 parameters : HashMap :: new ( ) ,
281298 types : HashMap :: new ( ) ,
299+ notifications : VecDeque :: new ( ) ,
282300 cancel_data : CancelData {
283301 process_id : 0 ,
284302 secret_key : 0 ,
@@ -1000,6 +1018,11 @@ impl Connection {
10001018 . boxed ( )
10011019 }
10021020
1021+ /// Returns a stream of asynchronus notifications receieved from the server.
1022+ pub fn notifications ( self ) -> Notifications {
1023+ Notifications ( self )
1024+ }
1025+
10031026 /// Returns information used to cancel pending queries.
10041027 ///
10051028 /// Used with the `cancel_query` function. The object returned can be used
@@ -1015,6 +1038,41 @@ impl Connection {
10151038 }
10161039}
10171040
1041+ /// A stream of asynchronous Postgres notifications.
1042+ pub struct Notifications ( Connection ) ;
1043+
1044+ impl Notifications {
1045+ /// Consumes the `Notifications`, returning the inner `Connection`.
1046+ pub fn into_inner ( self ) -> Connection {
1047+ self . 0
1048+ }
1049+ }
1050+
1051+ impl Stream for Notifications {
1052+ type Item = Notification ;
1053+
1054+ type Error = Error ;
1055+
1056+ fn poll ( & mut self ) -> Poll < Option < Notification > , Error > {
1057+ if let Some ( notification) = ( self . 0 ) . 0 . notifications . pop_front ( ) {
1058+ return Ok ( Async :: Ready ( Some ( notification) ) ) ;
1059+ }
1060+
1061+ match try_ready ! ( ( self . 0 ) . 0 . poll( ) ) {
1062+ Some ( backend:: Message :: NotificationResponse ( body) ) => {
1063+ let notification = Notification {
1064+ process_id : body. process_id ( ) ,
1065+ channel : body. channel ( ) ?. to_owned ( ) ,
1066+ payload : body. message ( ) ?. to_owned ( ) ,
1067+ } ;
1068+ Ok ( Async :: Ready ( Some ( notification) ) )
1069+ }
1070+ Some ( _) => Err ( bad_message ( ) ) ,
1071+ None => Ok ( Async :: Ready ( None ) ) ,
1072+ }
1073+ }
1074+ }
1075+
10181076fn connect_err ( fields : & mut ErrorFields ) -> ConnectError {
10191077 match DbError :: new ( fields) {
10201078 Ok ( err) => ConnectError :: Db ( Box :: new ( err) ) ,
0 commit comments