微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何计算Google Dataflow文件处理的输入文件中的行数?

如何解决如何计算Google Dataflow文件处理的输入文件中的行数?

我正在尝试计算输入文件中的行数,并且正在使用Cloud Dataflow Runner创建模板。在下面的代码中,我正在从GCS存储桶中读取文件,对其进行处理,然后将输出存储在Redis实例中。

但是我无法计算输入文件的行数。

主班

 public static void main(String[] args) {
    /**
     * Constructed StoragetoRedisOptions object using the method PipelineOptionsFactory.fromArgs to read options from command-line
     */
    StoragetoRedisOptions options = PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(StoragetoRedisOptions.class);

    Pipeline p = Pipeline.create(options);
    p.apply("Reading Lines...",TextIO.read().from(options.getInputFile()))
            .apply("Transforming data...",ParDo.of(new DoFn<String,String[]>() {
                        @ProcessElement
                        public void TransformData(@Element String line,OutputReceiver<String[]> out) {
                            String[] fields = line.split("\\|");
                            out.output(fields);
                        }
                    }))
            .apply("Processing data...",ParDo.of(new DoFn<String[],KV<String,String>>() {
                        @ProcessElement
                        public void ProcessData(@Element String[] fields,OutputReceiver<KV<String,String>> out) {
                            if (fields[RedisIndex.GUID.getValue()] != null) {

                                out.output(KV.of("firstname:"
                                        .concat(fields[RedisIndex.FirsTNAME.getValue()]),fields[RedisIndex.GUID.getValue()]));

                                out.output(KV.of("lastname:"
                                        .concat(fields[RedisIndex.LASTNAME.getValue()]),fields[RedisIndex.GUID.getValue()]));

                                out.output(KV.of("dob:"
                                        .concat(fields[RedisIndex.dob.getValue()]),fields[RedisIndex.GUID.getValue()]));

                                out.output(KV.of("postalcode:"
                                        .concat(fields[RedisIndex.POSTAL_CODE.getValue()]),fields[RedisIndex.GUID.getValue()]));

                            }
                        }
                    }))
            .apply("Writing field indexes into redis",RedisIO.write().withMethod(RedisIO.Write.Method.SADD)
                    .withEndpoint(options.getRedisHost(),options.getRedisPort()));
    p.run();

}

示例输入文件

xxxxxxxxxxxxxxxx|bruce|wayne|31051989|444444444444
yyyyyyyyyyyyyyyy|selina|thomas|01051989|222222222222
aaaaaaaaaaaaaaaa|clark|kent|31051990|666666666666

执行管道的命令

mvn compile exec:java \
  -Dexec.mainClass=com.viveknaskar.DataFlowPipelineForMemStore \
  -Dexec.args="--project=my-project-id \
  --jobName=dataflow-job \
  --inputFile=gs://my-input-bucket/*.txt \
  --redisHost=127.0.0.1 \
  --stagingLocation=gs://pipeline-bucket/stage/ \
  --dataflowJobFile=gs://pipeline-bucket/templates/dataflow-template \
  --runner=DataflowRunner"

我尝试使用StackOverflow solution中的以下代码,但对我不起作用。

PipelineOptions options = ...;
DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);
PCollection<Long> countPC =
    p.apply(TextIO.Read.from("gs://..."))
     .apply(Count.<String>globally());
DirectPipelineRunner.EvaluationResults results = runner.run(p);
long count = results.getPCollection(countPC).get(0);

我也阅读了Apache Beam文档,但没有发现任何帮助。在这方面的任何帮助将不胜感激。

解决方法

我通过在管道读取文件后添加Count.globally()并将其应用于PCollection<String>来解决此问题。

我添加了以下代码:

PCollection<String> lines = p.apply("Reading Lines...",TextIO.read().from(options.getInputFile()));

 lines.apply(Count.globally()).apply("Count the total records",ParDo.of(new RecordCount()));

我在其中创建了一个新类(RecordCount.java),该类扩展了只记录计数的DoFn

RecordCount.java

import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordCount extends DoFn<Long,Void> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RecordCount.class);

    @ProcessElement
    public void processElement(@Element Long count) {
       LOGGER.info("The total number of records in the input file is: ",count);

        }
    }

}
,

正确的方法是使用Beam连接器(或使用Beam ParDo)将计数写入存储系统。流水线结果不能直接提供给主程序,因为BeamRunner可以并行化计算,并且执行可能不在同一台计算机上进行。

例如(伪代码):

    p.apply(TextIO.Read.from("gs://..."))
     .apply(Count.<String>globally())
     .apply(ParDo(MyLongToStringParDo()))
     .apply(TextIO.Write.to("gs://..."));

如果需要直接在主程序中处理输出,则可以在Beam程序结束后使用客户端库从GCS中读取(在这种情况下,请确保指定p.run().waitUntilFinish())。另外,您可以将需要计数的计算移到Beam PTransform中,并将其作为管道的一部分。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。