@@ -12,13 +12,15 @@ use std::os::unix::io::{AsRawFd, RawFd};
1212#[ cfg( windows) ]
1313use std:: os:: windows:: io:: { AsRawSocket , RawSocket } ;
1414use postgres_protocol:: message:: frontend;
15+ use postgres_protocol:: message:: backend:: { self , ParseResult } ;
1516
1617use TlsMode ;
1718use params:: { ConnectParams , ConnectTarget } ;
1819use error:: ConnectError ;
1920use io:: TlsStream ;
2021
2122const DEFAULT_PORT : u16 = 5432 ;
23+ const MESSAGE_HEADER_SIZE : usize = 5 ;
2224
2325pub struct MessageStream {
2426 stream : BufStream < Box < TlsStream > > ,
@@ -43,6 +45,27 @@ impl MessageStream {
4345 self . stream . write_all ( & self . buf )
4446 }
4547
48+ pub fn read_message < ' a > ( & ' a mut self ) -> io:: Result < backend:: Message < ' a > > {
49+ self . buf . resize ( MESSAGE_HEADER_SIZE , 0 ) ;
50+ try!( self . stream . read_exact ( & mut self . buf ) ) ;
51+
52+ let len = match try!( backend:: Message :: parse ( & self . buf ) ) {
53+ // FIXME this is dumb but an explicit return runs into borrowck issues :(
54+ ParseResult :: Complete { .. } => None ,
55+ ParseResult :: Incomplete { required_size } => Some ( required_size. unwrap ( ) ) ,
56+ } ;
57+
58+ if let Some ( len) = len {
59+ self . buf . resize ( len, 0 ) ;
60+ try!( self . stream . read_exact ( & mut self . buf [ MESSAGE_HEADER_SIZE ..] ) ) ;
61+ } ;
62+
63+ match try!( backend:: Message :: parse ( & self . buf ) ) {
64+ ParseResult :: Complete { message, .. } => Ok ( message) ,
65+ ParseResult :: Incomplete { .. } => unreachable ! ( ) ,
66+ }
67+ }
68+
4669 pub fn flush ( & mut self ) -> io:: Result < ( ) > {
4770 self . stream . flush ( )
4871 }
0 commit comments