如何解决间隔任务导致R2DBC中的IDLE连接耗尽
我正在使用Reactor Java使用r2dbc对Postgres运行以下定期任务;
Flux.interval(Duration.ofMillis(1000)).doOnNext(i->{
System.out.print("TIME HAS TICKED\n");
Flux.range(0,10).flatMap(j->{
return service.getJob(this.consumerQueueName,this.filter).then();
}).subscribe();
}).subscribe();
大约5分钟后,它将停止处理作业,并且当我检查postgres连接都处于空闲状态时:
select datname as database_name,client_addr as client_address,application_name,backend_start,state,state_change
from pg_stat_activity;
integrity_service 10.0.73.1 r2dbc-postgresql 2020-09-18 04:11:07.786098 idle 2020-09-18 04:11:40.471893
integrity_service 10.0.73.1 r2dbc-postgresql 2020-09-18 04:11:07.785822 idle 2020-09-18 04:12:01.196558
integrity_service 10.0.73.1 r2dbc-postgresql 2020-09-18 04:11:07.785598 idle 2020-09-18 04:11:50.971738
integrity_service 10.0.73.1 r2dbc-postgresql 2020-09-18 04:11:07.785317 idle 2020-09-18 04:11:30.506207
integrity_service 10.0.73.1 r2dbc-postgresql 2020-09-18 04:11:07.665800 idle 2020-09-18 04:11:20.570714
如何适当地使用r2dbc和databaseClient定期从表中获取数据而不会导致此异常?
//ConnectionFactory Settings:
ConnectionFactories.get(
ConnectionFactoryOptions.builder()
.option(Option.valueOf("driver"),"pool")
.option(Option.valueOf("protocol"),"postgresql")
//.option(ConnectionFactoryOptions.DRIVER,"postgresql")
.option(ConnectionFactoryOptions.HOST,"localhost")
.option(ConnectionFactoryOptions.PORT,5432) // optional,defaults to 5432
.option(ConnectionFactoryOptions.USER,"db")
.option(ConnectionFactoryOptions.DATABASE,"integrity_service")
.option(MAX_SIZE,5)
.build());
private final String fetchJobFormat =
" WITH cte AS ( SELECT id FROM %s WHERE chain_id='%s' and is_complete=%b ORDER BY id ASC LIMIT 1\n" +
" )\n" +
" UPDATE queue q\n" +
" SET timestamp = extract(epoch from Now()),\n" +
" is_complete = TRUE\n" +
" FROM cte WHERE q.id = cte.id\n" +
" RETURNING q.id,q.chain_id,q.timestamp,q.is_complete,q.payload";
public Mono<Queue> getJob(String queue,String filter){
return databaseClient.execute(String.format(fetchJobFormat,queue,filter,false))
.fetch().all()
.flatMap((v) -> {
System.out.println("retrieved result" + v.get("id").toString());
Queue q = this.objectMapper.convertValue(v,Queue.class);
return Mono.just(q);
}).last();
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。