大家好,欢迎来到IT知识分享网。
背景介绍
- 业务场景:spark批量写入es,基于es-hadoop组件实现
- 批处理任务定时调度
- cdh5.5.3集群,spark2.3,elasticsearch6.4.3
- es中对应索引的_id由程序控制,保证全局唯一
- 仅测试环境出现,且偶尔出现
问题描述
完整报错信息如下:
19/05/20 11:08:54 ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 24.0 failed 4 times, most recent failure: Lost task 2.3 in stage 24.0 (TID 849, p016d052n01, executor 6): org.elasticsearch.hadoop.EsHadoopException: Could not write all entries for bulk operation [24/1000]. Error sample (first [5] error messages): org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][OZVIK_2462056_2019-05-18]: version conflict, document already exists (current version [1]) {"update":{"_id":"OZVIK_2462056_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"OZVIK_2462056_2019-05-18","product_no":"OZVIK","cust_id":"2462056","p106":32,"p107":61,"p108":55,"p109":"YGM6E","p110":1,"p111":46,"p112":11126,"p113":189,"p114":70,"p115":6,"p116":60,"p117":"male","p118":"gg","p119":19,"p120":2,"p121":1544025600000,"p122":69,"p123":"FL0SS","dt":"2019-05-18","absum01":71,"testday01":76,"testday02":11202,"testday03":"7611202","testday04":"70male","testday04_2":22404,"testday05":"761120270male761120222404","amount01":"YGM6E2462056","amount02":22252,"amount03":"OZVIK","aa":11197,"testb21":93,"fix_const_999_0222":999,"0304tf":"999 2462056 YGM6E","0305test_long":11173,"hello":87,"datetest":"2019-05-18","binarytest":32,"nestedtest":"YGM6E","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"OZVIK","floattest02":1,"__namelist_54":"0"}}
org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][OZWTC_148752_2019-05-18]: version conflict, document already exists (current version [1])
{"update":{"_id":"OZWTC_148752_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"OZWTC_148752_2019-05-18","product_no":"OZWTC","cust_id":"148752","p106":88,"p107":20,"p108":13,"p109":"3BIW6","p110":1,"p111":79,"p112":15107,"p113":183,"p114":62,"p115":85,"p116":68,"p117":"female","p118":"nn","p119":51,"p120":80,"p121":1534867200000,"p122":87,"p123":"VOG2J","dt":"2019-05-18","absum01":63,"testday01":147,"testday02":15254,"testday03":"14715254","testday04":"62female","testday04_2":30508,"testday05":"1471525462female1471525430508","amount01":"3BIW6148752","amount02":30214,"amount03":"OZWTC","aa":15170,"testb21":108,"fix_const_999_0222":999,"0304tf":"999 148752 3BIW6","0305test_long":15187,"hello":101,"datetest":"2019-05-18","binarytest":88,"nestedtest":"3BIW6","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"OZWTC","floattest02":1,"__namelist_54":"0"}}
org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][P08Y7_3310671_2019-05-18]: version conflict, document already exists (current version [1])
{"update":{"_id":"P08Y7_3310671_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"P08Y7_3310671_2019-05-18","product_no":"P08Y7","cust_id":"3310671","p106":27,"p107":62,"p108":40,"p109":"5JPCP","p110":0,"p111":93,"p112":17036,"p113":185,"p114":68,"p115":54,"p116":24,"p117":"female","p118":"aa","p119":43,"p120":88,"p121":1536508800000,"p122":43,"p123":"HI31Q","dt":"2019-05-18","absum01":68,"testday01":122,"testday02":17158,"testday03":"12217158","testday04":"68female","testday04_2":34316,"testday05":"1221715868female1221715834316","amount01":"5JPCP3310671","amount02":34072,"amount03":"P08Y7","aa":17104,"testb21":89,"fix_const_999_0222":999,"0304tf":"999 3310671 5JPCP","0305test_long":17129,"hello":67,"datetest":"2019-05-18","binarytest":27,"nestedtest":"5JPCP","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"P08Y7","floattest02":0,"__namelist_54":"0"}}
org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][P0TI9_8523_2019-05-18]: version conflict, document already exists (current version [1])
{"update":{"_id":"P0TI9_8523_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"P0TI9_8523_2019-05-18","product_no":"P0TI9","cust_id":"8523","p106":20,"p107":68,"p108":36,"p109":"YIP72","p110":0,"p111":24,"p112":13632,"p113":197,"p114":73,"p115":70,"p116":90,"p117":"male","p118":"aa","p119":75,"p120":11,"p121":1532361600000,"p122":82,"p123":"8KUUS","dt":"2019-05-18","absum01":73,"testday01":143,"testday02":13775,"testday03":"14313775","testday04":"73male","testday04_2":27550,"testday05":"1431377573male1431377527550","amount01":"YIP728523","amount02":27264,"amount03":"P0TI9","aa":13705,"testb21":88,"fix_const_999_0222":999,"0304tf":"999 8523 YIP72","0305test_long":13656,"hello":56,"datetest":"2019-05-18","binarytest":20,"nestedtest":"YIP72","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"P0TI9","floattest02":0,"__namelist_54":"0"}}
org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][P1J8O_2619118_2019-05-18]: version conflict, document already exists (current version [1])
{"update":{"_id":"P1J8O_2619118_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"P1J8O_2619118_2019-05-18","product_no":"P1J8O","cust_id":"2619118","p106":99,"p107":57,"p108":53,"p109":"NR3QD","p110":1,"p111":83,"p112":17171,"p113":157,"p114":55,"p115":8,"p116":20,"p117":"male","p118":"oo","p119":42,"p120":4,"p121":1516636800000,"p122":62,"p123":"FO4IS","dt":"2019-05-18","absum01":56,"testday01":63,"testday02":17234,"testday03":"6317234","testday04":"55male","testday04_2":34468,"testday05":"631723455male631723434468","amount01":"NR3QD2619118","amount02":34342,"amount03":"P1J8O","aa":17227,"testb21":156,"fix_const_999_0222":999,"0304tf":"999 2619118 NR3QD","0305test_long":17255,"hello":152,"datetest":"2019-05-18","binarytest":99,"nestedtest":"NR3QD","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"P1J8O","floattest02":1,"__namelist_54":"0"}}
Bailing out...
at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.flush(BulkProcessor.java:519)
at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.add(BulkProcessor.java:127)
at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:192)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:172)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
IT知识分享网
es客户端在写入es时,数据现有的版本号与它所持有的版本号不一致,即有别的client已经修改过数据。
解决过程
1)首先思考:确保了_id全局唯一,正常情况下同一个_id的数据,仅会被一个spark task执行一次。而es基于乐观锁进行控制,只有其他client在当前client读写之间进行了数据的更改才会导致当前client报版本冲突错误。于是思考,是什么原因导致会有至少两个client去写同一条数据呢?
可能一:spark的动态资源分配
spark的动态资源分配,在CDH中确实会导致executor数量成倍增长,然后将task调度到新的executor执行,但这不会导致同一个task对应的数据(partition)多个task执行,故排除。
可能二:task的推测执行
推测执行机制为了防止某个task拖慢task set整体的执行进度,会为同一份数据启动多个task,哪个task最先执行完就以该task的结果为准,并杀掉其他task。该种情况确实会产生多个client写同一条数据产生版本冲突,但spark默认并未开启该机制,程序也没有手动设置,所以也要排除。
2)debug源代码,因为问题很难复现问题,也没有获得足够有用的信息。
3)这个时候突然发现ui界面除了有大量版本冲突的报错信息,在某个角落还有一种EsHadoopNoNodesLeftException: Connection error的错误信息,再结合spark的task重试机制,貌似已经有了答案。由于网络原因,es连接异常,但已经写入的数据却无法回滚,spark重新调度该任务,新任务以数据的版本号为0进行写入,但实际已经写入的数据版本已经被自增为1了,这时报版本冲突。
4)首先解决版本冲突问题。因为只要保证数据不丢失,所以版本冲突时只需忽略该条数据即可。
结合官网配置如下错误处理器
IT知识分享网public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
@Override
public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector)
throws Exception
{
if (entry.getResponseCode() == 409) {
return HandlerResult.HANDLED;
}
return collector.pass("Not a conflict response code.");
}
}
经验证,确实不会再出现版本冲突的错误,ui界面只能看到EsHadoopNoNodesLeftException: Connection error
5)解决EsHadoopNoNodesLeftException: Connection error
由于集群使用docker虚拟机搭建,并且elasticsearch与cdh集群部署在一起,整体性能较差;并且集群中默认开启了spark的动态资源分配,导致写入并行度成倍增长。以上原因导致连接异常报错。 解决:使用–conf spark.dynamicAllocation.enabled=false 禁用动态资源分配,同时调整并行度,即控制同时写入es的client数量。
经验证,连接异常不再出现。
源码验证
由 dataframe.saveToEs(to, map) 开始,调用链如下:
SparkDataFrameFunctions#saveToEs
EsSparkSQL#saveToEs
SparkContext#runJob
忽略dag划分、task调度等细节,关注runJob方法 sparkCtx.runJob(srdd.toDF().rdd, new EsDataFrameWriter(srdd.schema, esCfg.save()).write _)
EsDataFrameWriter的write方法转换为函数作为参数传递到runJob中,在后续调用
def write(taskContext: TaskContext, data: Iterator[T]) {
val writer = RestService.createWriter(settings, taskContext.partitionId.toLong, -1, log)
taskContext.addTaskCompletionListener((TaskContext) => writer.close())
if (runtimeMetadata) {
writer.repository.addRuntimeFieldExtractor(metaExtractor)
}
while (data.hasNext) {
writer.repository.writeToIndex(processData(data))
}
}
调用链如下: RestRepository#writeToIndex
RestRepository#doWriteToIndex
BulkProcessor#add
BulkProcessor#flush
BulkProcessor#tryFlush
RestClient#bulk
NetworkClient#execute
核心方法:
IT知识分享网public Response execute(Request request) {
Response response = null;
boolean newNode;
do {
SimpleRequest routedRequest = new SimpleRequest(request.method(), null, request.path(), request.params(), request.body());
newNode = false;
try {
response = currentTransport.execute(routedRequest);
ByteSequence body = routedRequest.body();
if (body != null) {
stats.bytesSent += body.length();
}
} catch (Exception ex) {
if (ex instanceof EsHadoopIllegalStateException) {
throw (EsHadoopException) ex;
}
// issues with the SSL handshake, bail out instead of retry, for security reasons
if (ex instanceof javax.net.ssl.SSLException) {
throw new EsHadoopTransportException(ex);
}
// check for fatal, non-recoverable network exceptions
if (ex instanceof BindException) {
throw new EsHadoopTransportException(ex);
}
if (log.isTraceEnabled()) {
log.trace(
String.format(
"Caught exception while performing request [%s][%s] - falling back to the next node in line...",
currentNode, request.path()), ex);
}
String failed = currentNode;
failedNodes.put(failed, ex);
newNode = selectNextNode();
log.error(String.format("Node [%s] failed (%s); "
+ (newNode ? "selected next node [" + currentNode + "]" : "no other nodes left - aborting..."),
failed, ex.getMessage()));
if (!newNode) {
throw new EsHadoopNoNodesLeftException(failedNodes);
}
}
} while (newNode);
return response;
}
在此抛出 EsHadoopNoNodesLeftException
总结建议
再次验证一个道理:读报错信息一定要有耐心,以免误入歧途。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/13189.html