微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

简历—面试题2

1、Kafka

kafka是一个分布式消息系统。具有高性能、持久化、多副本备份、横向扩展能力。将消息保存在磁盘中,以顺序读写方式访问磁盘,避免随机读写导致性能瓶颈。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。

 

Kafka 集群包含一个或多个服务器,服务器节点称为broker,broker存储topic的数据。broker可分为Controller与follower。Controller管理集群broker的上下线,所有topic的分区副本分配和leader Partition选举等工作

每条发布到Kafka集群的消息都有一个类别Topic,Topic像一个消息队列,每个 topic 包含一个或多个partition,Kafka分配的单位是partition。每个partition有多个副本,其中有且仅有一个作为leaderleader是当前负责数据的读写的partition,其他partition为flower作为备用选主。当Follower与leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

  • offset:消费者在对应分区上已经消费的消息数(位置),

    • kafka0.8 版本 之前offset保存在zookeeper上。之后offset保存在kafka集群上。

 

调整Producer,consumer,broker的各项参数,保证Kafka内部数据不丢失


producer:acks参数、retry参数

1.高可用型
  配置:acks = all,retries > 0 retry.backoff.ms=100(毫秒) (并根据实际情况设置retry可能恢复的间隔时间)
  优点:这样保证了producer端每发送一条消息都要成功,如果不成功并将消息缓存起来,等异常恢复后再次发送。
  缺点:这样保证了高可用,但是这会导致集群的吞吐量不是很高,因为数据发送到broker之后,leader要将数据同步到fllower上,如果网络带宽、不稳定等情况时,ack响应时间会更长
2.折中型
  配置:acks = 1 retries > 0 retries 时间间隔设置 (并根据实际情况设置retries可能恢复的间隔时间)
  优点:保证了消息的可靠性和吞吐量,是个折中的方案
  缺点:性能处于2者中间
3.高吞吐型
  配置:acks = 0
  优点:可以相对容忍一些数据的丢失,吞吐量大,可以接收大量请求
  缺点:不知道发送的消息是 否成功
Consumer: group.id 、auto.offset.reset 、enable.auto.commit

1设置consumer group分组的id,group.id:如果为空,则会报异常

2设置从何处开始进行消费 auto.offset.reset = earliest(最早) /latest(最晚)

3设置是否开启自动提交消费位移的功能,认开启 enable.auto.commit= true/false(认true)

broker:replication-factor、min.insync.replicas、unclean.leander.election.enable

1.replication-factor >=2
在创建topic时会通过replication-factor来创建副本的个数,它提高了kafka的高可用性,同时,它允许n-1台broker挂掉,设置好合理的副本因子对kafka整体性能是非常有帮助的,通常是3个,极限是5个,如果多了也会影响开销。
2.min.insync.replicas = 2
分区ISR队列集合中最少有多少个副本,认值是1
3.unclean.leander.election.enable = false     
是否允许从ISR队列中选举leader副本,认值是false,如果设置成true,则可能会造成数据丢失。

 

 kafka调优,提升生产者的吞吐量


1)设置发送消息的缓冲区,

buffer.memory:认值是33554432,就是32MB
如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住


2)设置压缩

compression.type,认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,
压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销。


3)设置batch的大小,

batch.size,认值是:16384,就是16kb,也就是一个batch满了16kb就发送出去,一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量,可以自己压测一下。
如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里


4)设置消息的发送延迟

linger.ms,这个值认是0,意思就是消息必须立即被发送,但是这是不对的,
一般设置一个100毫秒之内的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去。但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。

kafka的文件存储机制


一个topic下,有多个不同的partition,每个partition为一个目录。partition命名的规则是,topic的名称加上一个序号,序号从0开始。
一个partition目录下的文件,被平均切割成大小相等的数据文件(,每一个数据文件都被称为一个段(segment file)
一个segment 段消息,数量不一定相等,使得老的segment可以被快速清除。认保留7天的数据,每次满1G后,在写入到一个新的文件中。
另外每个partition只需要支持顺序读写就可以,也就是说它只会往文件的末尾追加数据,这就是顺序写的过程,生产者只会对每一个partition做数据的追加(写操作)。

 

在partition目录下,有两类文件,一类是以log为后缀的文件,一类是以index为后缀的文件,每一个log文件一个index文件相对应,这一对文件就是一个segment file,也就是一个段。

log文件:就是数据文件,里面存放的就是消息, index文件:是索引文件,记录元数据信息。


元数据指向,对应的数据文件(log文件)中消息的物理偏移地址。log文件达到1个G后滚动重新生成新的log文件

sparkStreaming整合kafka

sparkStreaming对接kafka两种方式:

1. Receiver模式,由kafka将数据发送数据,Spark Streaming被动接收数据;

2. Direct模式,由Spark Streaming主动去kafka中拉取数据。

    receiver:在spark的executor当中启动了一些receiver的线程,专门去kafka拉取数据,拉取回来的数据这些receiver不会处理,然后另外一些线程专门来处理数据,基于kafka的high level API进行消费,offset自动保存到了zk当中去了,不用我们主动去维护offset的值
        问题:拉取数据的线程以及处理数据的线程互相不会通信,造成问题:处理数据线程挂掉了,拉取数据的线程还在继续拉取数据,数据全部都堆积在execotr里面了      
        
    direct:不再单独启动线程去拉取数据,获取到的数据也不用保存在executor内存里面了,获取到的数据直接就进行处理
    问题:使用kafka的low  level API进行消费,需要我们自己手动的维护offset值

sparkStreaming整合kafka官网提供两个jar包
一个是基于0.8版本整合:提供两种方式整合,receiverdirect方式

一个是基于0.10版本整合:只提供了direct方式整合

 

 

8、spark

熟悉Spark的整体架构、工作原理和RDD概念,具有Spark的调优经验,阅读过Spark Core的源码内容,对Spark的Shuffle机制有深刻理解,熟练使用Scala编写Spark程序

spark是针对于大规模数据处理的统一分析引擎,它是基于内存计算框架,计算速度非常之快,但是它仅仅只是涉及到计算,并没有涉及到数据的存储,后期需要使用spark对接外部的数据源,比如hdfs。

 

1、速度快 :job的输出结果可以保存在内存 、spark任务以线程的方式运行在进程中
2、易用性:可以快速去编写spark程序通过 java/scala/python/R/sql等不同语言
3、通用性:一个生态系统,包含了很多模块,
sparksql:通过sql去开发spark程序做一些离线分析
sparkStreaming:主要是用来解决公司有实时计算的这种场景
Mlib:它封装了一些机器学习的算法库
Graphx:图计算
4、兼容性
spark程序就是一个计算逻辑程序,这个任务要运行就需要计算资源(内存、cpu、磁盘),
哪里可以给当前这个任务提供计算资源,就可以把spark程序提交到哪里去运行
standAlone(后期使用较多)
它是spark自带的独立运行模式,整个任务的资源分配由spark集群的老大Master负责
yarn(后期使用较多)
可以把spark程序提交到yarn中运行,整个任务的资源分配由yarn中的老大ResourceManager负责
mesos
它也是apache开源的一个类似于yarn的资源调度平台
 

spark集群的架构

  • Cluster Manager:在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器
  • Worker节点:从节点,负责控制计算节点,启动Executor或者Driver。
  • Driver: 运行Application 的main()函数
  • Executor:执行器,是为某个Application运行在worker node上的一个进程

spark架构 = spark应用程序(左边的DriverProgram) + spark 集群(右边的ClusterManager和另外两个Worker Node)

程序代码 -> Driver -> 调用main() -> 创建sparkContext -> 与spark集群交互

sparkContext -> 连接ClusterManager -> 申请资源 -> 解析成多个task -> 分发给workNode -> 执行task -> 执行完毕释放资源

 

        当前Driver启动以后,会去执行应用程序的main方法,并构建sparkConext对象。sparkContext与ClusterManager连接交互,并且sparkContext将程序代码解析成多个task,将task发送给workNode,workNode又会把task丢给任务执行器executor去执行,executor会启动线程池开始执行task。当所有的task执行完毕,spark向ClusterManager注销,并释放资源。

 

 

在这里插入图片描述

  1. 构建Spark 应用程序的运行环境,启动SparkContext,SparkContext向资源管理器(可以是Standalone,Mesos,Yarn)申请运行Executor资源,并启动StandaloneExecutorbackend,
  2. Executor向SparkContext申请Task,SparkContext将应用程序分发给Executor
  3. SparkContext构建成DAG图,将DAG图分解成Stage、将Taskset发送给Task Scheduler,最后由Task Scheduler将Task发送给Executor运行
  4. Task在Executor上运行,运行完释放所有资源

1、注册并申请资源:Driver端向资源管理器Master发送注册和申请计算资源的请求, 

2、分配资源:Master通知对应的worker节点启动executor进程(计算资源), executor进程向Driver端发送注册并且申请task请求。

3、注册并申请task:

Driver端运行客户端的main方法,构建SparkContext对象,在SparkContext对象内部依次构建DAGScheduler和TaskScheduler

4、executer运行task:

按照客户端代码rdd的一系列操作顺序,生成DAG有向无环图。DAGScheduler拿到DAG有向无环图之后,按照宽依赖进行stage的划分。每一个stage内部有很多可以并行运行的task,最后封装在一个一个的taskSet集合中,然后把taskSet发送给TaskScheduler。TaskScheduler得到taskSet集合之后,依次遍历取出每一个task提交到worker节点上的executor进程中运行

4、注销:所有task运行完成,Driver端向Master发送注销请求,Master通知Worker关闭executor进程,Worker上的计算资源得到释放,最后整个任务也就结束了。
 

 

RDD(Resilient distributed Dataset)叫做弹性 分布式 数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合.
Dataset: 就是一个集合,存储很多数据.
distributed:它内部的元素进行了分布式存储,方便于后期进行分布式计算.
Resilient: 表示弹性,rdd的数据是可以保存在内存或者是磁盘中.
 

RDD五大特性:

1、有一个分区列表

一个rdd有很多分区,每一个分区内部是包含了该rdd的部分数据, spark中任务是以task线程的方式运行, 一个分区就对应一个task线程,分区数决定了并行计算的力度。可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用认值,认值就是程序所分配到的cpu Core的数目。
 

2、每个分区都会有计算函数

 Spark的RDD的计算函数是以分片为基本单位的,每个RDD都会实现 compute函数,对具体的分片进行计算,RDD中的分片是并行的,所以是分布式并行计算。

 

3、一个rdd依赖其他rdd

窄依赖RDD会形成类似流水线一样的前后依赖关系,宽依赖后面的RDD具体的数据分片会依赖前面所有的RDD的所有数据分片

 

4、key-value型的RDD,可以设置分区函数

类似于mapreduce当中的paritioner接口,控制Key分到哪个reduce。

当前Spark中实现了两种类型的分片函数一个是基于哈希的HashPartitioner,另外一个基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量

5、每个分区都有一个优先位置列表

移动数据不如移动计算,数据在哪台机器上,任务就启在哪个机器上,数据在本地上,不用走网络。不过数据进行最后汇总的时候就要走网络。(进行任务调度时会尽可能地将任务分配到处理数据的数据块所在的具体位置。

 

 

在这里插入图片描述

 

在这里插入图片描述

 

Shuffle就是对数据进行重组,

在DAG调度的过程中,Stage阶段的划分是根据是否有shuffle过程,也就是是否存在宽依赖的时候。spark 划分 stage 的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个 stage;遇到窄依赖就将这个 RDD 加入该 stage 中。

需要进行shuffle,这时候会将作业job划分成多个Stage,每一个stage内部有很多可以并行运行的task,stage与stage之间的过程就是shuffle阶段

在Spark的中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager随着Spark的发展有两种实现的方式,分别为HashShuffleManager(spark1.2之前使用)SortShuffleManager(spark1.2之后使用)

https://blog.csdn.net/qichangjian/article/details/88039576

spark1.2版本以前:hashShuffleManager 未经优化的hashShuffleManager 经过优化的hashShuffleManager

spark1.2版本以后:SortShuffleManager 普通机制 ByPass机制

  • HashShuffleManager的运行机制主要分成两种,Hash shuffle是不具有排序的Shuffle。
    • 一种是普通运行机制
    • 另一种是合并的运行机制。
      • 合并机制主要是通过复用buffer来优化Shuffle过程中产生的小文件数量

SortShuffleManager的运行机制主要分成两种,

  • 一种是普通运行机制
  • 另一种是bypass运行机制
//todo: 利用scala语言开发spark程序实现单词统计
object WordCount {
  def main(args: Array[String]): Unit = {
    //1、构建sparkConf对象 设置application名称和master地址
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")

    //2、构建sparkContext对象,该对象非常重要,它是所有spark程序的执行入口
    // 它内部会构建  DAGScheduler和 TaskScheduler 对象
    val sc = new SparkContext(sparkConf)

    //设置日志输出级别
    sc.setLogLevel("warn")

    //3、读取数据文件
    val data: RDD[String] = sc.textFile("E:\\words.txt")

    //4、 切分每一行,获取所有单词
    val words: RDD[String] = data.flatMap(x=>x.split(" "))

    //5、每个单词计为1
    val wordAndOne: RDD[(String, Int)] = words.map(x => (x,1))

    //6、相同单词出现的1累加
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey((x,y)=>x+y)

    //按照单词出现的次数降序排列  第二个参数认是true表示升序,设置为false表示降序
    val sortedRDD: RDD[(String, Int)] = result.sortBy( x=> x._2,false)

    //7、收集数据打印
    val finalResult: Array[(String, Int)] = sortedRDD.collect()
    finalResult.foreach(println)


    //8、关闭sc
    sc.stop()

  }
}

熟悉Flink的整体架构和编程模型,熟练掌握Window的使用、Exactly-Once的原理和WaterMark解决乱序以及延迟数据的方法,知道Flink和Kafka的整合

3、flink

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

●熟悉Flume日志采集、Sqoop数据迁移、Azkaban工作流任务调度器等工具的使用;


●熟悉相关模型搭建与训练过程,理解主流机器学习算法(SVM、Adaboost、GBDT、KNN、K-Means、CNN、RNN等)的原理及应用,能够利用训练模型,优化模型。
 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐