Skip to content

Commit dec566e

Browse files
committed
Track stream desynchronization
If we hit an IO error talking to the server, message framing has probably been lost and we shouldn't try to keep talking.
1 parent d84acfd commit dec566e

3 files changed

Lines changed: 111 additions & 28 deletions

File tree

src/error.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use phf::PhfMap;
99
macro_rules! make_errors(
1010
($($code:expr => $error:ident),+) => (
1111
/// SQLSTATE error codes
12-
#[deriving(ToStr, Eq, Clone)]
12+
#[deriving(ToStr, Eq, Clone, Show)]
1313
#[allow(missing_doc)]
1414
pub enum PostgresSqlState {
1515
$($error,)+
@@ -378,7 +378,7 @@ pub enum PostgresConnectError {
378378
}
379379

380380
/// Represents the position of an error in a query
381-
#[deriving(ToStr)]
381+
#[deriving(ToStr, Show)]
382382
pub enum PostgresErrorPosition {
383383
/// A position in the original query
384384
Position(uint),
@@ -392,7 +392,7 @@ pub enum PostgresErrorPosition {
392392
}
393393

394394
/// Encapsulates a Postgres error or notice.
395-
#[deriving(ToStr)]
395+
#[deriving(ToStr, Show)]
396396
pub struct PostgresDbError {
397397
/// The field contents are ERROR, FATAL, or PANIC (in an error message),
398398
/// or WARNING, NOTICE, DEBUG, INFO, or LOG (in a notice message), or a
@@ -496,12 +496,15 @@ impl PostgresDbError {
496496
}
497497

498498
/// An error encountered when communicating with the Postgres server
499-
#[deriving(ToStr)]
499+
#[deriving(ToStr, Show)]
500500
pub enum PostgresError {
501501
/// An error reported by the Postgres server
502502
PgDbError(PostgresDbError),
503503
/// An error communicating with the Postgres server
504504
PgStreamError(IoError),
505+
/// The communication channel with the Postgres server has desynchronized
506+
/// due to an earlier communications error.
507+
PgStreamDesynchronized,
505508
}
506509

507510
impl PostgresError {
@@ -510,6 +513,9 @@ impl PostgresError {
510513
match *self {
511514
PgDbError(ref err) => err.pretty_error(query),
512515
PgStreamError(ref err) => format!("{}", *err),
516+
PgStreamDesynchronized =>
517+
~"The communication stream with the Postgres server has \
518+
become desynchronized due to an earlier communications error"
513519
}
514520
}
515521
}

src/lib.rs

Lines changed: 90 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ use error::{PostgresDbError,
103103
MissingPassword,
104104
PostgresError,
105105
PgStreamError,
106-
PgDbError};
106+
PgDbError,
107+
PgStreamDesynchronized};
107108
use message::{BackendMessage,
108109
AuthenticationOk,
109110
AuthenticationKerberosV5,
@@ -168,6 +169,26 @@ macro_rules! if_ok_pg(
168169
)
169170
)
170171

172+
macro_rules! if_ok_desync(
173+
($e:expr) => (
174+
match $e {
175+
Ok(ok) => ok,
176+
Err(err) => {
177+
self.desynchronized = true;
178+
return Err(err);
179+
}
180+
}
181+
)
182+
)
183+
184+
macro_rules! check_desync(
185+
($e:expr) => (
186+
if $e.is_desynchronized() {
187+
return Err(PgStreamDesynchronized);
188+
}
189+
)
190+
)
191+
171192
static DEFAULT_PORT: Port = 5432;
172193

173194
/// Trait for types that can handle Postgres notice messages
@@ -341,6 +362,7 @@ struct InnerPostgresConnection {
341362
notifications: RingBuf<PostgresNotification>,
342363
cancel_data: PostgresCancelData,
343364
unknown_types: HashMap<Oid, ~str>,
365+
desynchronized: bool,
344366
}
345367

346368
impl Drop for InnerPostgresConnection {
@@ -386,6 +408,7 @@ impl InnerPostgresConnection {
386408
notifications: RingBuf::new(),
387409
cancel_data: PostgresCancelData { process_id: 0, secret_key: 0 },
388410
unknown_types: HashMap::new(),
411+
desynchronized: false,
389412
};
390413

391414
args.push((~"client_encoding", ~"UTF8"));
@@ -426,26 +449,28 @@ impl InnerPostgresConnection {
426449
}
427450

428451
fn write_messages(&mut self, messages: &[FrontendMessage]) -> IoResult<()> {
452+
assert!(!self.desynchronized);
429453
for message in messages.iter() {
430-
if_ok!(self.stream.write_message(message));
454+
if_ok_desync!(self.stream.write_message(message));
431455
}
432-
self.stream.flush()
456+
Ok(if_ok_desync!(self.stream.flush()))
433457
}
434458

435459
fn read_message(&mut self) -> IoResult<BackendMessage> {
460+
assert!(!self.desynchronized);
436461
loop {
437-
match self.stream.read_message() {
438-
Ok(NoticeResponse { fields }) =>
462+
match if_ok_desync!(self.stream.read_message()) {
463+
NoticeResponse { fields } =>
439464
self.notice_handler.handle(PostgresDbError::new(fields)),
440-
Ok(NotificationResponse { pid, channel, payload }) =>
465+
NotificationResponse { pid, channel, payload } =>
441466
self.notifications.push_back(PostgresNotification {
442467
pid: pid,
443468
channel: channel,
444469
payload: payload
445470
}),
446-
Ok(ParameterStatus { parameter, value }) =>
471+
ParameterStatus { parameter, value } =>
447472
info!("Parameter {} = {}", parameter, value),
448-
val => return val
473+
val => return Ok(val)
449474
}
450475
}
451476
}
@@ -697,19 +722,33 @@ impl PostgresConnection {
697722
}
698723
}
699724

700-
/// Begins a new transaction.
725+
/// Attempts to begin a new transaction.
701726
///
702727
/// Returns a `PostgresTransaction` object which should be used instead of
703728
/// the connection for the duration of the transaction. The transaction
704729
/// is active until the `PostgresTransaction` object falls out of scope.
705730
/// A transaction will commit by default unless the task fails or the
706731
/// transaction is set to roll back.
707-
pub fn transaction<'a>(&'a self) -> PostgresTransaction<'a> {
708-
self.quick_query("BEGIN");
709-
PostgresTransaction {
732+
pub fn try_transaction<'a>(&'a self)
733+
-> Result<PostgresTransaction<'a>, PostgresError> {
734+
check_desync!(self);
735+
if_ok!(self.quick_query("BEGIN"));
736+
Ok(PostgresTransaction {
710737
conn: self,
711-
commit: RefCell::new(true),
738+
commit: Cell::new(true),
712739
nested: false
740+
})
741+
}
742+
743+
/// A convenience wrapper around `try_transaction`.
744+
///
745+
/// # Failure
746+
///
747+
/// Fails if there was an error beginning the transaction.
748+
pub fn transaction<'a>(&'a self) -> PostgresTransaction<'a> {
749+
match self.try_transaction() {
750+
Ok(trans) => trans,
751+
Err(err) => fail!("Error preparing transaction: {}", err)
713752
}
714753
}
715754

@@ -745,6 +784,15 @@ impl PostgresConnection {
745784
self.conn.with(|conn| conn.cancel_data)
746785
}
747786

787+
/// Returns whether or not the stream has been desynchronized due to an
788+
/// error in the communication channel with the server.
789+
///
790+
/// If this has occurred, all further queries will immediately return an
791+
/// error.
792+
pub fn is_desynchronized(&self) -> bool {
793+
self.conn.with(|conn| conn.desynchronized)
794+
}
795+
748796
fn quick_query(&self, query: &str) -> Result<~[~[Option<~str>]], PostgresError> {
749797
self.conn.with_mut(|conn| conn.quick_query(query))
750798
}
@@ -775,14 +823,14 @@ pub enum SslMode {
775823
/// Represents a transaction on a database connection
776824
pub struct PostgresTransaction<'conn> {
777825
priv conn: &'conn PostgresConnection,
778-
priv commit: RefCell<bool>,
826+
priv commit: Cell<bool>,
779827
priv nested: bool
780828
}
781829

782830
#[unsafe_destructor]
783831
impl<'conn> Drop for PostgresTransaction<'conn> {
784832
fn drop(&mut self) {
785-
if task::failing() || !self.commit.with(|x| *x) {
833+
if task::failing() || !self.commit.get() {
786834
if self.nested {
787835
self.conn.quick_query("ROLLBACK TO sp");
788836
} else {
@@ -828,13 +876,23 @@ impl<'conn> PostgresTransaction<'conn> {
828876
self.conn.execute(query, params)
829877
}
830878

831-
/// Like `PostgresConnection::transaction`.
832-
pub fn transaction<'a>(&self) -> PostgresTransaction<'conn> {
833-
self.conn.quick_query("SAVEPOINT sp");
834-
PostgresTransaction {
879+
/// Like `PostgresConnection::try_transaction`.
880+
pub fn try_transaction<'a>(&'a self)
881+
-> Result<PostgresTransaction<'a>, PostgresError> {
882+
check_desync!(self.conn);
883+
if_ok!(self.conn.quick_query("SAVEPOINT sp"));
884+
Ok(PostgresTransaction {
835885
conn: self.conn,
836-
commit: RefCell::new(true),
886+
commit: Cell::new(true),
837887
nested: true
888+
})
889+
}
890+
891+
/// Like `PostgresTransaction::transaction`.
892+
pub fn transaction<'a>(&'a self) -> PostgresTransaction<'a> {
893+
match self.try_transaction() {
894+
Ok(trans) => trans,
895+
Err(err) => fail!("Error preparing transaction: {}", err)
838896
}
839897
}
840898

@@ -843,19 +901,24 @@ impl<'conn> PostgresTransaction<'conn> {
843901
self.conn.notifications()
844902
}
845903

904+
/// Like `PostgresConnection::is_desynchronized`.
905+
pub fn is_desynchronized(&self) -> bool {
906+
self.conn.is_desynchronized()
907+
}
908+
846909
/// Determines if the transaction is currently set to commit or roll back.
847910
pub fn will_commit(&self) -> bool {
848-
self.commit.with(|x| *x)
911+
self.commit.get()
849912
}
850913

851914
/// Sets the transaction to commit at its completion.
852915
pub fn set_commit(&self) {
853-
self.commit.with_mut(|x| *x = true);
916+
self.commit.set(true);
854917
}
855918

856919
/// Sets the transaction to roll back at its completion.
857920
pub fn set_rollback(&self) {
858-
self.commit.with_mut(|x| *x = false);
921+
self.commit.set(false);
859922
}
860923
}
861924

@@ -1015,6 +1078,7 @@ impl<'conn> PostgresStatement for NormalPostgresStatement<'conn> {
10151078

10161079
fn try_execute(&self, params: &[&ToSql])
10171080
-> Result<uint, PostgresError> {
1081+
check_desync!(self.conn);
10181082
if_ok!(self.execute("", 0, params));
10191083

10201084
let num;
@@ -1047,6 +1111,7 @@ impl<'conn> PostgresStatement for NormalPostgresStatement<'conn> {
10471111

10481112
fn try_query<'a>(&'a self, params: &[&ToSql])
10491113
-> Result<PostgresResult<'a>, PostgresError> {
1114+
check_desync!(self.conn);
10501115
self.try_lazy_query(0, params)
10511116
}
10521117
}
@@ -1112,6 +1177,7 @@ impl<'conn> TransactionalPostgresStatement<'conn> {
11121177
/// the parameters of the statement.
11131178
pub fn try_lazy_query<'a>(&'a self, row_limit: uint, params: &[&ToSql])
11141179
-> Result<PostgresResult<'a>, PostgresError> {
1180+
check_desync!(self.stmt.conn);
11151181
self.stmt.try_lazy_query(row_limit, params)
11161182
}
11171183

@@ -1124,7 +1190,7 @@ impl<'conn> TransactionalPostgresStatement<'conn> {
11241190
-> PostgresResult<'a> {
11251191
match self.try_lazy_query(row_limit, params) {
11261192
Ok(result) => result,
1127-
Err(err) => fail!("Error executing query:\n{}", err.to_str())
1193+
Err(err) => fail!("Error executing query:\n{}", err)
11281194
}
11291195
}
11301196
}

src/pool.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,12 @@ impl PooledPostgresConnection {
139139
self.conn.get_ref().execute(query, params)
140140
}
141141

142+
/// Like `PostgresConnection::try_transaction`.
143+
pub fn try_transaction<'a>(&'a self)
144+
-> Result<PostgresTransaction<'a>, PostgresError> {
145+
self.conn.get_ref().try_transaction()
146+
}
147+
142148
/// Like `PostgresConnection::transaction`.
143149
pub fn transaction<'a>(&'a self) -> PostgresTransaction<'a> {
144150
self.conn.get_ref().transaction()
@@ -153,4 +159,9 @@ impl PooledPostgresConnection {
153159
pub fn cancel_data(&self) -> PostgresCancelData {
154160
self.conn.get_ref().cancel_data()
155161
}
162+
163+
/// Like `PostgresConnection::is_desynchronized`.
164+
pub fn is_desynchronized(&self) -> bool {
165+
self.conn.get_ref().is_desynchronized()
166+
}
156167
}

0 commit comments

Comments
 (0)