Skip to content

Commit 60825d9

Browse files
committed
Allow custom executors
1 parent 2a80118 commit 60825d9

1 file changed

Lines changed: 73 additions & 21 deletions

File tree

postgres/src/config.rs

Lines changed: 73 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,81 @@
1+
use futures::future::Executor;
12
use futures::sync::oneshot;
23
use futures::Future;
34
use log::error;
5+
use std::fmt;
46
use std::path::Path;
57
use std::str::FromStr;
8+
use std::sync::Arc;
69
use std::time::Duration;
710
use tokio_postgres::config::{SslMode, TargetSessionAttrs};
811
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
912
use tokio_postgres::{Error, Socket};
1013

1114
use crate::{Client, RUNTIME};
1215

13-
#[derive(Debug, Clone, PartialEq)]
14-
pub struct Config(tokio_postgres::Config);
16+
#[derive(Clone)]
17+
pub struct Config {
18+
config: tokio_postgres::Config,
19+
executor: Option<Arc<Executor<Box<Future<Item = (), Error = ()> + Send>>>>,
20+
}
21+
22+
impl fmt::Debug for Config {
23+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
24+
fmt.debug_struct("Config")
25+
.field("config", &self.config)
26+
.finish()
27+
}
28+
}
1529

1630
impl Default for Config {
1731
fn default() -> Config {
18-
Config(tokio_postgres::Config::default())
32+
Config::new()
1933
}
2034
}
2135

2236
impl Config {
2337
pub fn new() -> Config {
24-
Config(tokio_postgres::Config::new())
38+
Config {
39+
config: tokio_postgres::Config::new(),
40+
executor: None,
41+
}
2542
}
2643

2744
pub fn user(&mut self, user: &str) -> &mut Config {
28-
self.0.user(user);
45+
self.config.user(user);
2946
self
3047
}
3148

3249
pub fn password<T>(&mut self, password: T) -> &mut Config
3350
where
3451
T: AsRef<[u8]>,
3552
{
36-
self.0.password(password);
53+
self.config.password(password);
3754
self
3855
}
3956

4057
pub fn dbname(&mut self, dbname: &str) -> &mut Config {
41-
self.0.dbname(dbname);
58+
self.config.dbname(dbname);
4259
self
4360
}
4461

4562
pub fn options(&mut self, options: &str) -> &mut Config {
46-
self.0.options(options);
63+
self.config.options(options);
4764
self
4865
}
4966

5067
pub fn application_name(&mut self, application_name: &str) -> &mut Config {
51-
self.0.application_name(application_name);
68+
self.config.application_name(application_name);
5269
self
5370
}
5471

5572
pub fn ssl_mode(&mut self, ssl_mode: SslMode) -> &mut Config {
56-
self.0.ssl_mode(ssl_mode);
73+
self.config.ssl_mode(ssl_mode);
5774
self
5875
}
5976

6077
pub fn host(&mut self, host: &str) -> &mut Config {
61-
self.0.host(host);
78+
self.config.host(host);
6279
self
6380
}
6481

@@ -67,35 +84,43 @@ impl Config {
6784
where
6885
T: AsRef<Path>,
6986
{
70-
self.0.host_path(host);
87+
self.config.host_path(host);
7188
self
7289
}
7390

7491
pub fn port(&mut self, port: u16) -> &mut Config {
75-
self.0.port(port);
92+
self.config.port(port);
7693
self
7794
}
7895

7996
pub fn connect_timeout(&mut self, connect_timeout: Duration) -> &mut Config {
80-
self.0.connect_timeout(connect_timeout);
97+
self.config.connect_timeout(connect_timeout);
8198
self
8299
}
83100

84101
pub fn keepalives(&mut self, keepalives: bool) -> &mut Config {
85-
self.0.keepalives(keepalives);
102+
self.config.keepalives(keepalives);
86103
self
87104
}
88105

89106
pub fn keepalives_idle(&mut self, keepalives_idle: Duration) -> &mut Config {
90-
self.0.keepalives_idle(keepalives_idle);
107+
self.config.keepalives_idle(keepalives_idle);
91108
self
92109
}
93110

94111
pub fn target_session_attrs(
95112
&mut self,
96113
target_session_attrs: TargetSessionAttrs,
97114
) -> &mut Config {
98-
self.0.target_session_attrs(target_session_attrs);
115+
self.config.target_session_attrs(target_session_attrs);
116+
self
117+
}
118+
119+
pub fn executor<E>(&mut self, executor: E) -> &mut Config
120+
where
121+
E: Executor<Box<Future<Item = (), Error = ()> + Send>> + 'static + Sync + Send,
122+
{
123+
self.executor = Some(Arc::new(executor));
99124
self
100125
}
101126

@@ -106,19 +131,46 @@ impl Config {
106131
T::Stream: Send,
107132
<T::TlsConnect as TlsConnect<Socket>>::Future: Send,
108133
{
109-
let connect = self.0.connect(tls_mode);
110-
let (client, connection) = oneshot::spawn(connect, &RUNTIME.executor()).wait()?;
134+
let (tx, rx) = oneshot::channel();
135+
let connect = self
136+
.config
137+
.connect(tls_mode)
138+
.then(|r| tx.send(r).map_err(|_| ()));
139+
self.with_executor(|e| e.execute(Box::new(connect)))
140+
.unwrap();
141+
let (client, connection) = rx.wait().unwrap()?;
142+
111143
let connection = connection.map_err(|e| error!("postgres connection error: {}", e));
112-
RUNTIME.executor().spawn(connection);
144+
self.with_executor(|e| e.execute(Box::new(connection)))
145+
.unwrap();
113146

114147
Ok(Client::from(client))
115148
}
149+
150+
fn with_executor<F, T>(&self, f: F) -> T
151+
where
152+
F: FnOnce(&Executor<Box<Future<Item = (), Error = ()> + Send>>) -> T,
153+
{
154+
match &self.executor {
155+
Some(e) => f(&**e),
156+
None => f(&RUNTIME.executor()),
157+
}
158+
}
116159
}
117160

118161
impl FromStr for Config {
119162
type Err = Error;
120163

121164
fn from_str(s: &str) -> Result<Config, Error> {
122-
s.parse().map(Config)
165+
s.parse::<tokio_postgres::Config>().map(Config::from)
166+
}
167+
}
168+
169+
impl From<tokio_postgres::Config> for Config {
170+
fn from(config: tokio_postgres::Config) -> Config {
171+
Config {
172+
config,
173+
executor: None,
174+
}
123175
}
124176
}

0 commit comments

Comments
 (0)