微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

flink消费kafka数据写入阿里hologres(java版)

一、需求背景概述

由于公司业务需要将kafka的数据写入到hologres中,但是发现flinksql那种简便的方式会各种报错(很有可能是我包引用的不对,但是找了阿里支持也没有解决),无奈只能使用flink-core进行编写。当然,代码并没有那么完美,方法也不一定是最优,有小伙伴有更好的方法和实践经验的话欢迎交流。

二、pom依赖

由于公司使用的版本是flink1.11且项目中还有别的代码,所以pom文件中会有多余的依赖,大家可以根据实际情况各取所需,至少这份代码使用下面的依赖没有发生冲突或者错误

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>******</groupId>
    <artifactId>******</artifactId>
    <version>1.0-SNAPSHOT</version>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.binary.version>2.11.8</scala.binary.version>
        <jdk.compile.version>1.8</jdk.compile.version>
        <flink.version>1.11.3</flink.version>
        <maven.version>3.3.3</maven.version>
        <hadoop.version>2.8.4</hadoop.version>
        <hbase.version>2.2.0</hbase.version>
        <hologres.client.version>2.1.1</hologres.client.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.8</version>
        </dependency>
        <!-- flink start -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_2.11</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_2.11</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.11.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        
		<dependency>
            <groupId>com.alibaba.hologres</groupId>
            <artifactId>hologres-connector-flink-1.11</artifactId>
            <version>1.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        
        <!-- flink end -->

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
            <version>3.4.5</version>
        </dependency>

        <dependency>
            <groupId>MysqL</groupId>
            <artifactId>MysqL-connector-java</artifactId>
            <version>8.0.30</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.6</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <includes>
                                <include>**/*.scala</include>
                                <include>**/*.java</include>
                            </includes>
                        </configuration>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the Meta-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>Meta-INF/*.SF</exclude>
                                        <exclude>meta-inf/*.DSA</exclude>
                                        <exclude>meta-inf/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

三、代码实现

为了方便我把所有代码放到了一起,这样也方便查找,当然了,关键的信息我会隐去,大家可以根据自己的情况替换成自己的配置。(在代码中其实需要将kafka的数据进行转换然后写到holo)

代码依赖

在看别的博客的时候有时候发现有的没有依赖包,尤其在比较着急看成效的时候还挺麻烦的,所以为了方便自己和大家,我把import信息也加上

import com.alibaba.fastjson.JSON;
import com.alibaba.ververica.connectors.hologres.api.HologresTableSchema;
import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.Typeinformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsstateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.hologres.client.model.Record;
import com.alibaba.ververica.connectors.hologres.api.HologresRecordConverter;
import com.alibaba.ververica.connectors.hologres.config.HologresConfigs;
import com.alibaba.ververica.connectors.hologres.jdbc.HologresJDBCWriter;
import com.alibaba.ververica.connectors.hologres.sink.HologresOutputFormat;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.Properties;

正式代码

public class sensorLog2holo {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStateBackend(new FsstateBackend("checkpoint地址"));
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //kafka配置文件
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "******");
        properties.setProperty("group.id", "******");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "******");

        DataStreamSource<SensorLog> kafkaSource =
                env.addSource(new FlinkKafkaConsumer010<SensorLog>("topic name", new SensorLogSerializer(), properties));


        //holo配置
        Configuration configuration = new Configuration();
        configuration.set(HologresConfigs.ENDPOINT, "******");
        configuration.set(HologresConfigs.DATABASE, "******");
        configuration.set(HologresConfigs.USERNAME, "******");
        configuration.set(HologresConfigs.PASSWORD, ""******");");
        configuration.set(HologresConfigs.TABLE, "schema名称.表名");
        HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(configuration);

        /*
        *目标表table schema,下面是示例
        */
        TableSchema schema =
                TableSchema.builder()
                        .field("event", DataTypes.STRING())
                        .field("distinct_id", DataTypes.STRING())
                        .field("appname", DataTypes.STRING())
                        .build();

        kafkaSource.addSink(
                new OutputFormatSinkFunction<SensorLog>(
                        new HologresOutputFormat<>(
                                hologresConnectionParam,
                                new HologresJDBCWriter<>(
                                        hologresConnectionParam,
                                        schema,
                                        new RecordConverter(hologresConnectionParam)))));

       env.execute("sensorLog2holo");

    }


    /**
     * 将用户POJO数据转换至Hologres Record的实现.
     */
    public static class RecordConverter implements HologresRecordConverter<SensorLog, Record> {
        private HologresConnectionParam hologresConnectionParam;
        private HologresTableSchema tableSchema;

        public RecordConverter(HologresConnectionParam hologresConnectionParam) {
            this.hologresConnectionParam = hologresConnectionParam;
        }

        @Override
        public Record convertFrom(SensorLog record) {
            if (tableSchema == null) {
                this.tableSchema =
                        HologresTableSchema.get(hologresConnectionParam.getJdbcoptions());
            }
            Record result = new Record(tableSchema.get());
            result.setobject(0, record.getEvent());
            result.setobject(1, record.getdistinct_id());
            result.setobject(3, record.getProperties().get("appname"));
            return result;
        }

        @Override
        public SensorLog convertTo(Record record) {
            throw new UnsupportedOperationException("No need to implement");
        }

        @Override
        public Record convertToPrimaryKey(SensorLog record) {
            throw new UnsupportedOperationException("No need to implement");
        }
    }

    /**
     * 自定义序列化
     */
    static class SensorLogSerializer implements DeserializationSchema<SensorLog> {
        private static final long serialVersionUID = 1L;

        @Override
        public SensorLog deserialize(byte[] message) throws IOException {
            ByteBuffer buffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN);

            String mess = this.byteBuffertoString(buffer);
            //封装为POJO类
            SensorLog sensorLog = JSON.parSEObject(mess, SensorLog.class);
            return sensorLog;
        }

        @Override
        public boolean isEndOfStream(SensorLog nextElement) {
            return false;
        }

        @Override
        public Typeinformation<SensorLog> getProducedType() {
            return Typeinformation.of(SensorLog.class);
        }

        /**
         * 将ByteBuffer类型转换为String类型
         *
         * @param buffer
         * @return
         */
        static public String byteBuffertoString(ByteBuffer buffer) {
            Charset charset = null;
            CharsetDecoder decoder = null;
            CharBuffer charBuffer = null;
            try {
                charset = Charset.forName("UTF-8");
                decoder = charset.newDecoder();
                charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
                return charBuffer.toString();
            } catch (Exception ex) {
                ex.printstacktrace();
                return "";
            }
        }
    }


    /*
    *kafka数据对应的实体类,需要有getter&setter
    */
    static class SensorLog {
        *****
    }
}

原文地址:https://www.jb51.cc/wenti/3281922.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐