微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在反应式编程中使用增量ID在db中创建条目?

如何解决如何在反应式编程中使用增量ID在db中创建条目?

我是反应式编程的新手。需要帮助以了解行为。

我想实现什么?

我想创建50个ID应为递增顺序的条目。如果db中存在ID为1的条目,则应创建ID为2的条目。

我当前的实现如下:

// create entry 50 times 
 void createEntries() {
        LOGGER.info("going to create 50 entries);
        Flux.range(1,50)
            .flatMap(i -> createEntry(5))
            .subscribe();
    }

//method to create an entry in db with incremental id  
private Mono<Integer> createEntry(long retryInterval) {
    return (customrepository.findAllEntry()) //-->A db call which returns all entries flux<Entrys> 
            .map(entry -> entry.getEntryId())
            .sort()
            //get last existing entry id
            .last(0)          
            //try to create the entry with new incremented id
            .flatMap(id -> createEntry(id + 1,retryInterval));
}

private Mono<? extends Integer> createEntry(int newEntryId,long retryInterval) {
    return saveEntry(newEntryId) //--> return Mono<Boolean> true if saved false if id already exists
            .doOnNext(applied -> LOGGER.info("Successfully created entry with id: {} ? {} ",newEntryId,applied)) //--> Why this is called multiple times??
            .flatMap(applied -> !applied
                    //applied false shows id already exists,so try again recursively with new incremented id
                    ? createEntry(newEntryId + 1,retryInterval)
                    : Mono.just(newEntryId))
            .doOnError(e -> LOGGER.warn("Error creating entry with id {} ? {} : ",e));
            .retrywhen(Retry.anyOf(RuntimeException.class)
                            .exponentialBackoff(Duration.ofSeconds(retryInterval),Duration.ofSeconds(retryInterval))); //-->retry on creation if any exception 
}

上述实现为我带来了意外的行为,信息记录器“成功创建ID为:” 的对象被多次调用相同的ID。但是,我希望仅一次调用它。 注意:即使我删除retrywhen,该行为也保持不变。

解决方法

最后,我在代码中找出了问题。问题出在下面的代码片段中

// create entry 50 times 
 void createEntries() {
        LOGGER.info("going to create 50 entries);
        Flux.range(1,50)
            .flatMap(i -> createEntry(5))
            .subscribe();
    }

这是在saveEntry(newEntryId)完成之前调用方法50次。我通过使用repeat API来解决此问题,如下所示:

// create entry 50 times 
 void createEntries() {
        LOGGER.info("going to create 50 entries);
        createEntry(5).subscribe();
    }

//method to create an entry in db with incremental id  
private Flux<? extends Integer> createEntry(long retryInterval) {
    return (customRepository.findAllEntry()) //-->A db call which returns all entries flux<Entrys> 
            .map(entry -> entry.getEntryId())
            .sort()
            //get last existing entry id
            .last(0)          
            //try to create the entry with new incremented id
            .flatMap(id -> createEntry(id + 1,retryInterval))
            .repeat(49); //-->This fixes my issue will only be invoked  49 times again onComplete(). And hence will create 50 entries
}

private Mono<? extends Integer> createEntry(int newEntryId,long retryInterval) {
    return saveEntry(newEntryId) //--> return Mono<Boolean> true if saved false if id already exists
            .doOnNext(applied -> LOGGER.info("Successfully created entry with id: {} ? {} ",newEntryId,applied)) 
            .flatMap(applied -> !applied
                    //applied false shows id already exists,so try again recursively with new incremented id
                    ? createEntry(newEntryId + 1,retryInterval)
                    : Mono.just(newEntryId))
            .doOnError(e -> LOGGER.warn("Error creating entry with id {} ? {} : ",e));
            .retryWhen(Retry.anyOf(RuntimeException.class)
                            .exponentialBackoff(Duration.ofSeconds(retryInterval),Duration.ofSeconds(retryInterval))); //-->retry on creation if any exception 
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。