微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用Google Cloud Dataflow清除Cloud Memorystore中的缓存后如何插入数据?

如何解决使用Google Cloud Dataflow清除Cloud Memorystore中的缓存后如何插入数据?

我正在执行一项任务,如果要由数据流处理的输入文件包含数据,则清除内存存储的缓存。这意味着,如果输入文件中没有记录,则不会清除内存存储,但是输入文件中甚至有一条记录,都应清除内存存储,然后处理输入文件

我的数据流应用程序是一个多管道应用程序,可以读取,处理数据,然后将其存储在memorystore中。管道正在成功执行。但是,内存存储的刷新工作正常,但是刷新之后,不会进行插入。

我编写了一个函数,用于检查输入文件中是否有记录后刷新内存存储。

FlushingMemorystore.java

package com.click.example.functions;

import afu.org.checkerframework.checker.nullness.qual.Nullable;
import com.google.auto.value.Autovalue;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

public class FlushingMemorystore {


    private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemorystore.class);

    public static FlushingMemorystore.Read read() {
        return (new Autovalue_FlushingMemorystore_Read.Builder())
                .setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
    }

    @Autovalue
    public abstract static class Read extends PTransform<PCollection<Long>,PDone> {

        public Read() {
        }

        @Nullable
        abstract RedisConnectionConfiguration connectionConfiguration();

        @Nullable
        abstract Long expireTime();
        abstract FlushingMemorystore.Read.Builder toBuilder();

        public FlushingMemorystore.Read withEndpoint(String host,int port) {
            Preconditions.checkArgument(host != null,"host cannot be null");
            Preconditions.checkArgument(port > 0,"port cannot be negative or 0");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
        }

        public FlushingMemorystore.Read withAuth(String auth) {
            Preconditions.checkArgument(auth != null,"auth cannot be null");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
        }

        public FlushingMemorystore.Read withTimeout(int timeout) {
            Preconditions.checkArgument(timeout >= 0,"timeout cannot be negative");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
        }

        public FlushingMemorystore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument(connectionConfiguration != null,"connection cannot be null");
            return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) {
            Preconditions.checkArgument(expireTimeMillis != null,"expireTimeMillis cannot be null");
            Preconditions.checkArgument(expireTimeMillis > 0L,"expireTimeMillis cannot be negative or 0");
            return this.toBuilder().setExpireTime(expireTimeMillis).build();
        }

        public PDone expand(PCollection<Long> input) {
            Preconditions.checkArgument(this.connectionConfiguration() != null,"withConnectionConfiguration() is required");
            input.apply(ParDo.of(new FlushingMemorystore.Read.ReadFn(this)));
            return PDone.in(input.getPipeline());
        }

        private static class ReadFn extends DoFn<Long,String> {
            private static final int DEFAULT_BATCH_SIZE = 1000;
            private final FlushingMemorystore.Read spec;
            private transient Jedis jedis;
            private transient Pipeline pipeline;
            private int batchCount;

            public ReadFn(FlushingMemorystore.Read spec) {
                this.spec = spec;
            }

            @Setup
            public void setup() {
                this.jedis = this.spec.connectionConfiguration().connect();
            }

            @StartBundle
            public void startBundle() {
                this.pipeline = this.jedis.pipelined();
                this.pipeline.multi();
                this.batchCount = 0;
            }

            @ProcessElement
            public void processElement(DoFn<Long,String>.ProcessContext c) {
                Long count = c.element();
                batchCount++;

                if(count==null && count < 0) {
                    LOGGER.info("No Records are there in the input file");
                } else {
                    if (pipeline.isInMulti()) {
                        pipeline.exec();
                        pipeline.sync();
                        jedis.flushDB();
                    }
                    LOGGER.info("*****The memorystore is flushed*****");
                }
            }

            @FinishBundle
            public void finishBundle() {
                if (this.pipeline.isInMulti()) {
                    this.pipeline.exec();
                    this.pipeline.sync();
                }
                this.batchCount=0;
            }

            @Teardown
            public void teardown() {
                this.jedis.close();
            }

        }

        @Autovalue.Builder
        abstract static class Builder {

            Builder() {
            }

            abstract FlushingMemorystore.Read.Builder setExpireTime(Long expireTimeMillis);

            abstract FlushingMemorystore.Read build();

            abstract FlushingMemorystore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);

        }

    }

}

我正在使用Starter Pipeline代码中的功能

正在使用该功能的启动程序管道的代码段:

 StoragetoRedisOptions options = PipelineOptionsFactory.fromArgs(args)
                .withValidation()
                .as(StoragetoRedisOptions.class);

        Pipeline p = Pipeline.create(options);

        PCollection<String> lines = p.apply(
                "ReadLines",TextIO.read().from(options.getInputFile()));

        /**
         * Flushing the Memorystore if there are records in the input file
         */
        lines.apply("Checking Data in input file",Count.globally())
                .apply("Flushing the data store",FlushingMemorystore.read()
                        .withConnectionConfiguration(RedisConnectionConfiguration
                        .create(options.getRedisHost(),options.getRedisPort())));

清除缓存后要插入的已处理数据的代码段:

 dataset.apply(SOME_DATASET_TRANSFORMATION,RedisIO.write()
                .withMethod(RedisIO.Write.Method.SADD)
                .withConnectionConfiguration(RedisConnectionConfiguration
                        .create(options.getRedisHost(),options.getRedisPort())));

数据流执行正常,并且也刷新了内存存储,但此后插入不起作用。您能指出我要去哪里了吗? 任何解决此问题的解决方案都值得赞赏。预先感谢!

编辑:

根据评论的要求提供其他信息

使用的运行时是Java 11,并且使用的是Apache Beam SDK for 2.24.0

如果输入文件具有记录,它将以某种逻辑处理数据。例如,如果输入文件具有以下数据:

abcabc|Bruce|Wayne|2000
abbabb|Tony|Stark|3423

在这种情况下,数据流将计算2条记录的数量,并根据逻辑处理id,名字等,然后将其存储在memorystore中。该输入文件每天都会到来,因此,如果输入文件中有记录,则应清除(或清除)内存存储。

尽管管道没有中断,但我认为我缺少了一些东西。

解决方法

我怀疑这里的问题是,您需要确保RedisIO.write步骤发生之前,“ Flush”步骤运行(并完成)。 Beam具有Wait.on变换,可用于此变换。

要完成此操作,我们可以将刷新PTransform的输出用作刷新数据库的信号-并且只有在完成刷新后才写入数据库。刷新DoFn的process调用看起来像这样:

@ProcessElement
public void processElement(DoFn<Long,String>.ProcessContext c) {
    Long count = c.element();

    if(count==null && count < 0) {
       LOGGER.info("No Records are there in the input file");
    } else {
       if (pipeline.isInMulti()) {
           pipeline.exec();
           pipeline.sync();
           jedis.flushDB();
       }
       LOGGER.info("*****The memorystore is flushed*****");
   }
   c.output("READY");
}

一旦我们有一个信号指出数据库已被刷新,我们可以使用它来等待,然后再向其中写入新数据:

Pipeline p = Pipeline.create(options);

PCollection<String> lines = p.apply(
        "ReadLines",TextIO.read().from(options.getInputFile()));

/**
 * Flushing the Memorystore if there are records in the input file
 */
PCollection<String> flushedSignal = lines
     .apply("Checking Data in input file",Count.globally())
     .apply("Flushing the data store",FlushingMemorystore.read()
                     .withConnectionConfiguration(RedisConnectionConfiguration
                     .create(options.getRedisHost(),options.getRedisPort())));

// Then we use the flushing signal to start writing to Redis:

dataset
    .apply(Wait.on(flushedSignal))
    .apply(SOME_DATASET_TRANSFORMATION,RedisIO.write()
                .withMethod(RedisIO.Write.Method.SADD)
                .withConnectionConfiguration(RedisConnectionConfiguration
                        .create(options.getRedisHost(),options.getRedisPort())));
,

按Pablo的回答解释,在我应用Wait.on转换后,该问题已解决。但是,我不得不将我的FlushingMemorystore.java重写为flushSignal标志的PCollection。

功能如下:

package com.click.example.functions;

import afu.org.checkerframework.checker.nullness.qual.Nullable;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

public class FlushingMemorystore extends DoFn<Long,String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemorystore.class);

    public static FlushingMemorystore.Read read() {
        return (new AutoValue_FlushingMemorystore_Read.Builder())
                .setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
    }

    @AutoValue
    public abstract static class Read extends PTransform<PCollection<Long>,PCollection<String>> {

        public Read() {
        }

        @Nullable
        abstract RedisConnectionConfiguration connectionConfiguration();

        @Nullable
        abstract Long expireTime();
        abstract FlushingMemorystore.Read.Builder toBuilder();

        public FlushingMemorystore.Read withEndpoint(String host,int port) {
            Preconditions.checkArgument(host != null,"host cannot be null");
            Preconditions.checkArgument(port > 0,"port cannot be negative or 0");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
        }

        public FlushingMemorystore.Read withAuth(String auth) {
            Preconditions.checkArgument(auth != null,"auth cannot be null");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
        }

        public FlushingMemorystore.Read withTimeout(int timeout) {
            Preconditions.checkArgument(timeout >= 0,"timeout cannot be negative");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
        }

        public FlushingMemorystore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument(connectionConfiguration != null,"connection cannot be null");
            return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) {
            Preconditions.checkArgument(expireTimeMillis != null,"expireTimeMillis cannot be null");
            Preconditions.checkArgument(expireTimeMillis > 0L,"expireTimeMillis cannot be negative or 0");
            return this.toBuilder().setExpireTime(expireTimeMillis).build();
        }

        public PCollection<String> expand(PCollection<Long> input) {
            Preconditions.checkArgument(this.connectionConfiguration() != null,"withConnectionConfiguration() is required");
           return input.apply(ParDo.of(new FlushingMemorystore.Read.ReadFn(this)));
        }

        @Setup
        public Jedis setup() {
            return this.connectionConfiguration().connect();
        }

        private static class ReadFn extends DoFn<Long,String> {
            private static final int DEFAULT_BATCH_SIZE = 1000;
            private final FlushingMemorystore.Read spec;
            private transient Jedis jedis;
            private transient Pipeline pipeline;
            private int batchCount;

            public ReadFn(FlushingMemorystore.Read spec) {
                this.spec = spec;
            }

            @Setup
            public void setup() {
                this.jedis = this.spec.connectionConfiguration().connect();
            }

            @StartBundle
            public void startBundle() {
                this.pipeline = this.jedis.pipelined();
                this.pipeline.multi();
                this.batchCount = 0;
            }

            @ProcessElement
            public void processElement(@Element Long count,OutputReceiver<String> out) {
                batchCount++;

                if(count!=null && count > 0) {
                    if (pipeline.isInMulti()) {
                        pipeline.exec();
                        pipeline.sync();
                        jedis.flushDB();
                        LOGGER.info("*****The memorystore is flushed*****");
                    }
                    out.output("SUCCESS");
                } else {
                    LOGGER.info("No Records are there in the input file");
                    out.output("FAILURE");
                }

            }

            @FinishBundle
            public void finishBundle() {
                if (this.pipeline.isInMulti()) {
                    this.pipeline.exec();
                    this.pipeline.sync();
                }
                this.batchCount=0;
            }

            @Teardown
            public void teardown() {
                this.jedis.close();
            }

        }

        @AutoValue.Builder
        abstract static class Builder {

            Builder() {
            }

            abstract FlushingMemorystore.Read.Builder setExpireTime(Long expireTimeMillis);

            abstract FlushingMemorystore.Read build();

          abstract FlushingMemorystore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);

        }

    }

}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。