如何解决使用Java Stream消耗数据库游标 驱动程序分页手动分页
我想使用Java Stream消耗数据库游标。我希望Java流根据需要提取和处理行,避免先将所有500万行加载到内存中,然后再进行处理。
是否可以在不将整个表加载到RAM的情况下使用它?
到目前为止,我的代码如下:
Cursor<Product> products = DAO.selectCursor(...);
// 1. Initialize variables
long count = 0;
...
for (Iterator<Product> it = products.iterator(); it.hasNext();) {
Product p = it.next();
// 2. Processing each row
...
}
// 3. Concluding (processing totals,stats,etc.)
double avg = total / count;
...
它的确运行良好,但是有点麻烦,我想利用Stream API。
解决方法
首先,我们必须讨论如何从数据库中获取数据。如果您打算查看大量记录,而又不想一次全部加载到内存中,则有两种选择:
- 对结果分页。
- 让驱动程序对结果进行分页。
如果您已经有一个基于Cursor
的迭代器,可以根据需要检索分页数据,则可以使用JDK API中的Spliterators
和StreamSupport
实用工具类将其转换为{ {1}}。
Stream
否则,您将必须构建自己的东西。
驱动程序分页
如果JDBC驱动程序支持fetch size属性,则可以执行以下操作:
Stream<Product> products = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(cursor.iterator(),Spliterator.NONNULL |
Spliterator.ORDERED |
Spliterator.IMMUTABLE),false)
这时,Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'",ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
包含1000条记录的第一次获取,在您阅读上一页之前,它不会从数据库中检索更多信息。
所有这方面的棘手部分是,在完成所有记录的读取之前,您将无法关闭任何资源(即连接,准备好的语句和结果集),并且由于默认情况下我们要构建的流是惰性的,因此意味着我们必须打开所有这些资源,直到完成流为止。
也许最简单的方法是围绕此逻辑构建一个Iterator,并且当Iterator实际上到达所有数据的末尾时,可以关闭所有资源(即rs
),或者另一种替代方法是关闭流(!rs.next()
)时的所有工作。
一旦有了迭代器,使用来自JDK API的Stream.onClose()
和Spliterators
实用工具类就可以很容易地构建出一个流。
我的基本实现看起来像这样。这仅出于说明目的。您可能希望对您的特殊情况给予更多的爱。
StreamSupport
这里的关键是要注意,我们返回的流应该运行某些public Stream<String> getUsers() {
DataSource ds = jdbcTemplate.getDataSource();
try {
Connection conn = ds.getConnection();
conn.setAutoCommit(false);
PreparedStatement stm = conn.prepareStatement("SELECT id FROM users",ResultSet.TYPE_FORWARD_ONLY);
//fetch size is what guarantees only 1000 records at the time
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
Iterator<String> sqlIter = new Iterator<>() {
@Override
public boolean hasNext() {
try {
return rs.next();
} catch (SQLException e) {
closeResources(conn,stm,rs);
throw new RuntimeException("Failed to read record from ResultSet",e);
}
}
@Override
public String next() {
try {
return rs.getString("id");
} catch (SQLException e) {
closeResources(conn,e);
}
}
};
//turn iterator into a stream
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(sqlIter,false
).onClose(() -> {
//make sure to close resources when done with the stream
closeResources(conn,rs);
});
} catch (SQLException e) {
logger.error("Failed to process data",e);
throw new RuntimeException(e);
}
}
private void closeResources(Connection conn,PreparedStatement ps,ResultSet rs) {
try (conn; ps; rs) {
logger.info("Resources successfully closed");
} catch (SQLException e) {
logger.warn("Failed to properly close database sources",e);
}
}
逻辑,因此当我们使用流时,必须确保在完成处理后执行onClose
确保我们关闭所有活着的资源(例如stream.close()
,conn
和stm
)。
最好的方法也许是使用try-with-resources,以便尝试将关闭流。
rs
手动分页
另一种方法是您自己对结果进行分页,这取决于数据库,但是使用诸如limit和offset这样的select子句,您可以请求特定的记录页,对其进行处理,然后再检索更多内容。
try(Stream<String> users = userRepo.getUsers()){
//print users to the main output retrieving 1K at the time
users.forEach(System.out::println);
}
在这种情况下,迭代器将占用所有页面,完成后,请求下一页,直到在最后一页中找不到更多记录为止。
这种另一种方法的优点是可以在迭代器本身中立即控制资源。
我不会举一个例子,留给你尝试。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。