Skip to content

Commit 290d572

Browse files
committed
Redesign COPY IN execution setup.
The old Iterator of Iterators setup basically required all of the rows to sit in memory at the same time, which defeats the entire purpose of that configuration. There's probably a library that defines `StreamIterator`, so this may jump to that.
1 parent 9e260fd commit 290d572

2 files changed

Lines changed: 75 additions & 32 deletions

File tree

src/lib.rs

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -906,25 +906,6 @@ impl Connection {
906906
///
907907
/// These statements provide a method to efficiently bulk-upload data to
908908
/// the database.
909-
///
910-
/// ## Example
911-
///
912-
/// ```rust,no_run
913-
/// # use postgres::{Connection, SslMode, Error};
914-
/// # use postgres::types::ToSql;
915-
/// # fn f() -> Result<(), Error> {
916-
/// # let conn = Connection::connect("", &SslMode::None).unwrap();
917-
/// try!(conn.execute("CREATE TABLE foo (
918-
/// bar INT PRIMARY KEY,
919-
/// baz VARCHAR
920-
/// )", &[]));
921-
///
922-
/// let stmt = try!(conn.prepare_copy_in("foo", &["bar", "baz"]));
923-
/// let data: &[&[&ToSql]] = &[&[&0i32, &"blah"],
924-
/// &[&1i32, &None::<String>]];
925-
/// try!(stmt.execute(data.iter().map(|r| r.iter().map(|&e| e))));
926-
/// # Ok(()) };
927-
/// ```
928909
pub fn prepare_copy_in<'a>(&'a self, table: &str, rows: &[&str])
929910
-> Result<CopyInStatement<'a>> {
930911
let mut conn = self.conn.borrow_mut();
@@ -1700,6 +1681,45 @@ impl<'a> Drop for CopyInStatement<'a> {
17001681
}
17011682
}
17021683

1684+
/// An `Iterator` variant which returns borrowed values.
1685+
pub trait StreamIterator {
1686+
/// Returns the next value, or `None` if there is none.
1687+
fn next<'a>(&'a mut self) -> Option<&'a (ToSql + 'a)>;
1688+
}
1689+
1690+
/// An adapter type implementing `StreamIterator` for a `Vec<Box<ToSql>>`.
1691+
pub struct VecStreamIterator<'a> {
1692+
v: Vec<Box<ToSql + 'a>>,
1693+
idx: usize,
1694+
}
1695+
1696+
impl<'a> VecStreamIterator<'a> {
1697+
/// Creates a new `VecStreamIterator`.
1698+
pub fn new(v: Vec<Box<ToSql + 'a>>) -> VecStreamIterator<'a> {
1699+
VecStreamIterator {
1700+
v: v,
1701+
idx: 0,
1702+
}
1703+
}
1704+
1705+
/// Returns the underlying `Vec`.
1706+
pub fn into_inner(self) -> Vec<Box<ToSql + 'a>> {
1707+
self.v
1708+
}
1709+
}
1710+
1711+
impl<'a> StreamIterator for VecStreamIterator<'a> {
1712+
fn next<'b>(&'b mut self) -> Option<&'b (ToSql + 'b)> {
1713+
match self.v.get_mut(self.idx) {
1714+
Some(mut e) => {
1715+
self.idx += 1;
1716+
Some(&mut **e)
1717+
},
1718+
None => None,
1719+
}
1720+
}
1721+
}
1722+
17031723
impl<'a> CopyInStatement<'a> {
17041724
fn finish_inner(&mut self) -> Result<()> {
17051725
let mut conn = self.conn.conn.borrow_mut();
@@ -1714,12 +1734,14 @@ impl<'a> CopyInStatement<'a> {
17141734

17151735
/// Executes the prepared statement.
17161736
///
1717-
/// Each iterator returned by the `rows` iterator will be interpreted as
1718-
/// providing a single result row.
1737+
/// The `rows` argument is an `Iterator` returning `StreamIterator` values,
1738+
/// each one of which provides values for a row of input. This setup is
1739+
/// designed to allow callers to avoid having to maintain the entire row
1740+
/// set in memory.
17191741
///
17201742
/// Returns the number of rows copied.
17211743
pub fn execute<'b, I, J>(&self, mut rows: I) -> Result<usize>
1722-
where I: Iterator<Item=J>, J: Iterator<Item=&'b (ToSql + 'b)> {
1744+
where I: Iterator<Item=J>, J: StreamIterator {
17231745
let mut conn = self.conn.conn.borrow_mut();
17241746

17251747
try!(conn.write_messages(&[

tests/test.rs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use postgres::{NoticeHandler,
2323
ToSql,
2424
Error,
2525
ConnectError,
26-
DbError};
26+
DbError,
27+
VecStreamIterator};
2728
use postgres::SqlState::{SyntaxError,
2829
QueryCanceled,
2930
UndefinedTable,
@@ -786,12 +787,16 @@ fn test_copy_in() {
786787
or_panic!(conn.execute("CREATE TEMPORARY TABLE foo (id INT, name VARCHAR)", &[]));
787788

788789
let stmt = or_panic!(conn.prepare_copy_in("foo", &["id", "name"]));
789-
let data: &[&[&ToSql]] = &[&[&0i32, &"Steven".to_string()], &[&1i32, &None::<String>]];
790790

791-
assert_eq!(Ok(2), stmt.execute(data.iter().map(|r| r.iter().map(|&e| e))));
791+
let data = (0i32..2).map(|i| {
792+
VecStreamIterator::new(vec![Box::new(i) as Box<ToSql>,
793+
Box::new(format!("{}", i)) as Box<ToSql>])
794+
});
795+
796+
assert_eq!(Ok(2), stmt.execute(data));
792797

793798
let stmt = or_panic!(conn.prepare("SELECT id, name FROM foo ORDER BY id"));
794-
assert_eq!(vec![(0i32, Some("Steven".to_string())), (1, None)],
799+
assert_eq!(vec![(0i32, Some("0".to_string())), (1, Some("1".to_string()))],
795800
or_panic!(stmt.query(&[])).map(|r| (r.get(0), r.get(1))).collect::<Vec<_>>());
796801
}
797802

@@ -801,18 +806,28 @@ fn test_copy_in_bad_column_count() {
801806
or_panic!(conn.execute("CREATE TEMPORARY TABLE foo (id INT, name VARCHAR)", &[]));
802807

803808
let stmt = or_panic!(conn.prepare_copy_in("foo", &["id", "name"]));
804-
let data: &[&[&ToSql]] = &[&[&0i32, &"Steven".to_string()], &[&1i32]];
809+
let data = vec![
810+
VecStreamIterator::new(vec![Box::new(1i32) as Box<ToSql>,
811+
Box::new("Steven".to_string()) as Box<ToSql>]),
812+
VecStreamIterator::new(vec![Box::new(2i32) as Box<ToSql>]),
813+
].into_iter();
805814

806-
let res = stmt.execute(data.iter().map(|r| r.iter().map(|&e| e)));
815+
let res = stmt.execute(data);
807816
match res {
808817
Err(Error::DbError(ref err)) if err.message[].contains("Invalid column count") => {}
809818
Err(err) => panic!("unexpected error {:?}", err),
810819
_ => panic!("Expected error"),
811820
}
812821

813-
let data: &[&[&ToSql]] = &[&[&0i32, &"Steven".to_string()], &[&1i32, &"Steven".to_string(), &1i32]];
822+
let data = vec![
823+
VecStreamIterator::new(vec![Box::new(1i32) as Box<ToSql>,
824+
Box::new("Steven".to_string()) as Box<ToSql>]),
825+
VecStreamIterator::new(vec![Box::new(2i32) as Box<ToSql>,
826+
Box::new("Steven".to_string()) as Box<ToSql>,
827+
Box::new(3i64) as Box<ToSql>]),
828+
].into_iter();
814829

815-
let res = stmt.execute(data.iter().map(|r| r.iter().map(|&e| e)));
830+
let res = stmt.execute(data);
816831
match res {
817832
Err(Error::DbError(ref err)) if err.message[].contains("Invalid column count") => {}
818833
Err(err) => panic!("unexpected error {:?}", err),
@@ -828,9 +843,15 @@ fn test_copy_in_bad_type() {
828843
or_panic!(conn.execute("CREATE TEMPORARY TABLE foo (id INT, name VARCHAR)", &[]));
829844

830845
let stmt = or_panic!(conn.prepare_copy_in("foo", &["id", "name"]));
831-
let data: &[&[&ToSql]] = &[&[&0i32, &"Steven".to_string()], &[&1i32, &2i32]];
832846

833-
let res = stmt.execute(data.iter().map(|r| r.iter().map(|&e| e)));
847+
let data = vec![
848+
VecStreamIterator::new(vec![Box::new(1i32) as Box<ToSql>,
849+
Box::new("Steven".to_string()) as Box<ToSql>]),
850+
VecStreamIterator::new(vec![Box::new(2i32) as Box<ToSql>,
851+
Box::new(1i32) as Box<ToSql>]),
852+
].into_iter();
853+
854+
let res = stmt.execute(data);
834855
match res {
835856
Err(Error::DbError(ref err)) if err.message[].contains("Unexpected type Varchar") => {}
836857
Err(err) => panic!("unexpected error {:?}", err),

0 commit comments

Comments
 (0)