微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

MS Azure数据湖扫描目录并使用异步反应性API读取文件

如何解决MS Azure数据湖扫描目录并使用异步反应性API读取文件

我想递归读取Azure Data Lake中的所有文件并处理文件内容。 下面是我想出的代码,如果这是一个好主意或可以做得更好,希望获得您的反馈。

不胜感激!

目前,我有一些我无法解释的副作用。可能与此有关。

  1. 我担心背压。目标是暂停扫描,请参阅filequeue.put()。
  2. 不确定是否处理了所有级别的异常。
  3. 输出找到了多少个文件的总计数器。 autoConnect()似乎有点过分。
  4. 如何控制并行度?它正在使用多个线程,不是吗?

这是代码。 filequeue是Java BlockingQueue,也是轮询过程从中获取一个数据的目标。必须是轮询。队列通常包含

    BlockingQueue<Tuple2<byte[],PathItem>> filequeue = new ArrayBlockingQueue<>(100);

    filenamescanner = asyncfsclient
        .listPaths(options) // read the directory recursive
        .filter(item -> {
            return !item.isDirectory(); // only interested in files,not dirs
        })
        .subscribe(item -> {
            DataLakeFileAsyncclient c = asyncfsclient.getFileAsyncclient(item.getName());
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            c.read().doOnNext(piece -> {
                 try {
                     out.write(piece.array());
                 } catch (IOException ex) {
                     System.out.print(ex.getMessage());
                 }
            }).doOnComplete(() -> {
                filequeue.put(Tuples.of(out.toByteArray(),item));
            })
            .subscribe(null,e -> System.out.println("ERROR reading file: " + e));
        },e -> System.out.println("ERROR listing dir: " + e));

只要队列具有条目或filenamescanner.isdisposed()== false,轮询过程就会读取数据。

如您所见,我目前缺乏经验,这是我想到的最好的代码

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。