如何解决缓慢启动Spark Streaming Receiver?
我必须在我的Spark Streaming应用程序中启动400个接收器,但是启动接收器花费的时间很长,这导致一个问题,许多处理数据排队,并且在应用程序时间到期后无法处理代码:
Map<DataFile,List<String>> blockDataFileMap = new HashMap<>();
if (dataFileMap != null) {
// make dataFile divided into several(num of vu) blocks
for (Entry<DataFile,List<String>> entry : dataFileMap.entrySet()) {
DataFile dataFile = entry.getKey();
List<String> lines = entry.getValue();
if (lines.size() < vusers) {
LOG.warn(dataFile + " is too small to split. Whole file will be used for each vuser.");
blockDataFileMap.put(dataFile,lines);
} else {
// make every block only get the elements which index is in i st col of indexList
List<String> selectedLines = new ArrayList<>();
int block = lines.size() / vusers;
for (int j = 0,k = indexList.get(0).get(i); j < block; j++,k = j * vusers
+ indexList.get(j).get(i)) {
selectedLines.add(lines.get(k));
}
// add the element in the end of DataFile
if (block * vusers + i < lines.size()) {
selectedLines.add(lines.get(block * vusers + i));
}
blockDataFileMap.put(dataFile,selectedLines);
}
}
}
StreamingReceiver receiver = new StreamingReceiver(StorageLevel.MEMORY_AND_disK(),batchTime,blockDataFileMap,oneOffdataFiles,reportId,serverUrl,new SerializableConfiguration(conf),serviceWrapper);
list.add(jssc.receiverStream(receiver));
}
@SuppressWarnings("unchecked")
JavaDStream<Result>[] javaDStreams = (JavaDStream<Result>[]) new JavaDStream<?>[list.size()];
int len = list.size();
for (int i = 0; i < len; i++) {
javaDStreams[i] = list.get(i);
}
JavaDStream<Result> lines = jssc.union(javaDStreams);
// List<(chainId,Result)>
JavaPairDStream<String,Result> pointRdd = lines.mapToPair(new PairFunction<Result,String,Result>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String,Result> call(Result result) throws Exception {
return new Tuple2<>(result.getChain(),result);
}
});
// shuffle when groupByKey
pointRdd.groupByKey().map(
new Function<Tuple2<String,Iterable<Result>>,Tuple2<String,Iterator<Tuple2<Long,Iterator<Result>>>>>() {
private static final long serialVersionUID = 5928985493323358322L;
// Arg: (chainId,List<Result>)
// Return: (chainId,List<(timeInSecond,List<Result>)>)
@Override
public Tuple2<String,Iterator<Result>>>> call(
Tuple2<String,Iterable<Result>> v1) throws Exception {
String chain = v1._1;
Iterator<Result> results = v1._2.iterator();
Map<Long,List<Result>> map = new HashMap<>();
List<Tuple2<Long,Iterator<Result>>> list = new ArrayList<>();
while (results.hasNext()) {
Result tmp = results.next();
Long time = tmp.getTime();
if (!map.containsKey(time)) {
map.put(time,new ArrayList<Result>());
}
map.get(time).add(tmp);
}
for (Entry<Long,List<Result>> tmp : map.entrySet()) {
list.add(new Tuple2<>(tmp.getKey(),tmp.getValue().iterator()));
}
return new Tuple2<>(chain,list.iterator());
}
}).mapPartitions(new CalculateFunction(broadcast))
.foreachRDD(new VoidFunction<JavaRDD<Tuple2<String,Map<String,Map<Long,Integer>>>>>() {
@Override
public void call(JavaRDD<Tuple2<String,Integer>>>> tuple2JavaRDD)
throws Exception {
List<Tuple2<String,Integer>>>> resList = tuple2JavaRDD.collect();
if (resList.size() > 0) {
for (Tuple2<String,Integer>>> resTuple : resList) {
String chain = resTuple._1();
Map<String,Integer>> histogramResultWithTag = resTuple._2();
//merge histogram data from MapFunction into the HashMap of the final result
if (histogramFinalResultMap.containsKey(chain)) {
Map<String,Integer>> histogramFinalResultWithTag =
histogramFinalResultMap.get(chain);
for (Entry<String,Integer>> tmpEntryWithTag
: histogramResultWithTag.entrySet()) {
String tag = tmpEntryWithTag.getKey();
Map<Long,Integer> histogramResult = tmpEntryWithTag.getValue();
if (histogramFinalResultWithTag.containsKey(tag)) {
Map<Long,Integer> histogramFinalResult =
histogramFinalResultWithTag.get(tag);
for (Entry<Long,Integer> tmpEntry : histogramResult.entrySet()) {
Long rt = tmpEntry.getKey();
Integer count = tmpEntry.getValue();
if (histogramFinalResult.containsKey(rt)) {
histogramFinalResult.put(rt,count + histogramFinalResult.get(rt));
} else {
histogramFinalResult.put(rt,count);
}
}
} else {
histogramFinalResultWithTag.put(tag,histogramResult);
}
}
} else {
histogramFinalResultMap.put(chain,histogramResultWithTag);
}
}
}
}
});
jssc.start();
jssc.awaitTerminationorTimeout(timeLimit * 1000);
jssc.close();
在我的应用程序中,我将初始400个接收器。
我不知道为什么接收器启动这么慢(请给我一些可能的原因)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。