微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink实战八十一:flink-sql使用八Flink 与 hive 结合使用二打包运行

声明:本系列博客是根据SGG的视频整理而成,非常适合大家入门学习。

《2021年最新版大数据面试题全面开启更新》

注意 1. Flink使用1.11.0版本、HIVE使用3.1.2版本、Hadoop使用3.1.3版本

注意 2. 将hive-site.xml文件放在maven项目的resource目录下。

注意 3. 不编写脚本的话要执行 export HADOOP_CLAsspATH=`hadoop classpath` 语句

第一步:pom依赖

复制代码

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.11.0</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <log4j.version>2.12.1</log4j.version>
        <hive.version>3.1.2</hive.version>
        <hadoop.version>3.1.3</hadoop.version>
    </properties>
<dependencies>
    <!-- Flink Dependency -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
        <!--            <scope>provided</scope>-->
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-hive_2.11</artifactId>
        <version>1.11.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.11</artifactId>
        <version>1.11.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
        <version>1.11.0</version>
        <scope>provided</scope>
    </dependency>
    <!-- Hive Dependency -->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>3.1.2</version>
        <scope>provided</scope>
    </dependency>

    </dependencies>

复制代码

第二步:编写代码如下
事先利用flink-sql-client 建立好表格

复制代码

SET table.sql-dialect=hive;
CREATE TABLE hive_table_2 (
  log_info STRING
) PARTITIONED BY (
  dt STRING,
  hr STRING
) STORED AS PARQUET 
LOCATION 'hdfs://localhost:9820/warehouse/gmall/test99'
TBLPROPERTIES (
  'sink.partition-commit.trigger'='partition-time',
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.delay'='1 h',
  'sink.rolling-policy.check-interval'='30s',
  'sink.rolling-policy.rollover-interval'='1min',
  'sink.partition-commit.policy.kind'='metastore,success-file'
)

复制代码

复制代码

SET table.sql-dialect=default;
 CREATE TABLE kafka_table (
         log_info STRING,
         log_ts TIMESTAMP(3),
         WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
        )WITH (
            'connector' = 'kafka',
            'topic' = 'ods_event_test',
            'properties.bootstrap.servers' = 'http://localhost:9092',
            'properties.group.id' = 'flink_hive_test',
            'scan.startup.mode' = 'latest-offset',
            'format' = 'json',
            'json.fail-on-missing-field' = 'false',
            'json.ignore-parse-errors' = 'true'
          )

复制代码

实际代码如下

复制代码

package com.atguigu.flink.hive

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.catalog.hive.HiveCatalog

object InsertData {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.enableCheckpointing(10000)

    val settings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()



    val tEnv = StreamTableEnvironment.create(env,settings)
    val name = "myhive"
    val defaultDatabase = "gmall"
    val hiveconfdir = "/opt/module/hive/conf" // a local path
    val hive = new HiveCatalog(name, defaultDatabase, hiveconfdir)
    tEnv.registerCatalog("myhive", hive)
    tEnv.useCatalog("myhive")

    tEnv.executesql("INSERT INTO hive_table_2 SELECT log_info, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table")


  }

}

复制代码

第三步:打包提交到服务器

复制代码

    <build>
        <plugins>
            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the meta-inf folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>meta-inf/*.SF</exclude>
                                        <exclude>meta-inf/*.DSA</exclude>
                                        <exclude>meta-inf/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.atguigu.flink.hive.InsertData</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <args>
                        <arg>-nobootcp</arg>
                    </args>
                    <addScalacArgs>-target:jvm-1.8</addScalacArgs>
                </configuration>
            </plugin>

            <!-- Eclipse Scala Integration -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <version>2.8</version>
                <configuration>
                    <downloadSources>true</downloadSources>
                    <projectnatures>
                        <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
                        <projectnature>org.eclipse.jdt.core.javanature</projectnature>
                    </projectnatures>
                    <buildcommands>
                        <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
                    </buildcommands>
                    <classpathContainers>
                        <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                    </classpathContainers>
                    <excludes>
                        <exclude>org.scala-lang:scala-library</exclude>
                        <exclude>org.scala-lang:scala-compiler</exclude>
                    </excludes>
                    <sourceIncludes>
                        <sourceInclude>**/*.scala</sourceInclude>
                        <sourceInclude>**/*.java</sourceInclude>
                    </sourceIncludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>1.7</version>
                <executions>
                    <!-- Add src/main/scala to eclipse build path -->
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/main/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                    <!-- Add src/test/scala to eclipse build path -->
                    <execution>
                        <id>add-test-source</id>
                        <phase>generate-test-sources</phase>
                        <goals>
                            <goal>add-test-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/test/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

复制代码

第四步 提交
利用shell提交将包上传到hadoop102机器上的/opt/module/flink/examples 文件夹下
atguigu@hadoop102:/opt/module/flink$ bin/flink run -c com.atguigu.flink.hive.InsertData examples/TableFlink1113-1.0-SNAPSHOT.jar
第五步 遇到第一个错误

java.lang.NoClassDefFoundError: org/apache/flink/table/catalog/hive/HiveCatalog
         at com.tal.flink.hive.StreamMain.main(StreamMain.java:50)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:498)
         at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
         at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
         at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
         at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
         at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
         at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
         at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
         at java.security.AccessController.doPrivileged(Native Method)
         at javax.security.auth.Subject.doAs(Subject.java:422)
         at org.apache.hadoop.security.UserGroupinformation.doAs(UserGroupinformation.java:1893)
         at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
         at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.classNotFoundException: org.apache.flink.table.catalog.hive.HiveCatalog
         at java.net.urlclassloader.findClass(urlclassloader.java:382)
         at java.lang.classLoader.loadClass(ClassLoader.java:424)
         at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
         at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
         at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
         at java.lang.classLoader.loadClass(ClassLoader.java:357)
         ... 17 more

 
第六步 下载驱动包到 Flink的lib目录 解决一个错误

cd /export/servers/nc/flink/lib
下载flink-sql-connector-hive包到flink的lib文件夹下

第七步 再次提交作业-任务提交成功

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐