本文介绍了有没有一种方法可以在地图过程中跳过/丢弃/忽略Spark中的记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个非常标准的Spark作业,该作业从s3中读取日志文件,然后对其进行一些处理.非常基本的Spark内容...

We have a very standard Spark job which reads log files from s3 and then does some processing over them. Very basic Spark stuff...

val logs = sc.textFile(somePathTos3)
val mappedRows = logs.map(log => OurRowObject.parseLog(log.split("\t")))
val validRows = mappedRows.filter(log => log._1._1 != "ERROR")
...and continue processing

OurRowObject.parseLine在哪里获取原始日志行并将其映射到某些(键,值)对(例如,我们可以对其进行处理的( (1,2,3,4), (5,6,7) ).现在,如果parseLine遇到问题"日志(格式错误) ,空白等),它将返回一些前哨值(例如( ("ERROR", ...), (...) ),然后过滤步骤将其过滤掉.

Where OurRowObject.parseLine takes the raw log line and maps it to some (key, value) pair (e.g. ( (1,2,3,4), (5,6,7) ) that we can then do processing on. Now, if parseLine encounters a "problem" log (malformed, empty, etc...) it will return some sentinel value (e.g. ( ("ERROR", ...), (...) ) which the filter step then filters out.

现在,我一直试图找到一种解决方法,就是在地图过程中根本不包括问题行...以某种方式告诉火花嘿,这是一个空/格式错误的行,请跳过它且不包括一对",而不是额外的过滤步骤.

Now, what I have been trying to find a way to do is to simply not include the problem row(s) during the map...some way to tell spark "Hey this is an empty/malformed row, skip it and don't include a pair for it", instead of that additional filter step.

我还没有找到实现此目的的方法,并且发现该功能(AFAICanFind)不存在非常有趣.

I have not yet been able to find a way to do this, and find it very interesting that this functionality does not (AFAICanFind) exist.

谢谢

推荐答案

您可以使解析器返回Option [Value]而不是Value.这样,您可以使用flatMap将行映射到行并删除无效的行.

You could make the parser return an Option[Value] instead of a Value. That way you could use flatMap to map the lines to rows and remove those that were invalid.

大致来说是这样的:

def parseLog(line:String):Option[Array[String]] = {
    val splitted = log.split("\t")
    if (validate(splitted)) Some(splitted) else None
}

val validRows = logs.flatMap(OurRowObject.parseLog(_))

这篇关于有没有一种方法可以在地图过程中跳过/丢弃/忽略Spark中的记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-26 21:17