如何解决MS Azure数据湖扫描目录并使用异步反应性API读取文件
我想递归读取Azure Data Lake中的所有文件并处理文件内容。 下面是我想出的代码,如果这是一个好主意或可以做得更好,希望获得您的反馈。
不胜感激!
目前,我有一些我无法解释的副作用。可能与此有关。
- 我担心背压。目标是暂停扫描,请参阅filequeue.put()。
- 不确定是否处理了所有级别的异常。
- 要输出找到了多少个文件的总计数器。 autoConnect()似乎有点过分。
- 如何控制并行度?它正在使用多个线程,不是吗?
这是代码。 filequeue是Java BlockingQueue,也是轮询过程从中获取下一个数据的目标。必须是轮询。队列通常包含
BlockingQueue<Tuple2<byte[],PathItem>> filequeue = new ArrayBlockingQueue<>(100);
filenamescanner = asyncfsclient
.listPaths(options) // read the directory recursive
.filter(item -> {
return !item.isDirectory(); // only interested in files,not dirs
})
.subscribe(item -> {
DataLakeFileAsyncclient c = asyncfsclient.getFileAsyncclient(item.getName());
ByteArrayOutputStream out = new ByteArrayOutputStream();
c.read().doOnNext(piece -> {
try {
out.write(piece.array());
} catch (IOException ex) {
System.out.print(ex.getMessage());
}
}).doOnComplete(() -> {
filequeue.put(Tuples.of(out.toByteArray(),item));
})
.subscribe(null,e -> System.out.println("ERROR reading file: " + e));
},e -> System.out.println("ERROR listing dir: " + e));
只要队列具有条目或filenamescanner.isdisposed()== false,轮询过程就会读取数据。
如您所见,我目前缺乏经验,这是我想到的最好的代码。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。