


We have a very standard Spark job which reads log files from s3 and then does some processing over them. Very basic Spark stuff...

val logs = sc.textFile(somePathTos3)
val mappedRows = logs.map(log => OurRowObject.parseLog(log.split("\t")))
val validRows = mappedRows.filter(log => log._1._1 != "ERROR")
...and continue processing

Where OurRowObject.parseLine takes the raw log line and maps it to some (key, value) pair (e.g. ( (1,2,3,4), (5,6,7) ) that we can then do processing on. Now, if parseLine encounters a "problem" log (malformed, empty, etc...) it will return some sentinel value (e.g. ( ("ERROR", ...), (...) ) which the filter step then filters out.


Now, what I have been trying to find a way to do is to simply not include the problem row(s) during the map...some way to tell spark "Hey this is an empty/malformed row, skip it and don't include a pair for it", instead of that additional filter step.


I have not yet been able to find a way to do this, and find it very interesting that this functionality does not (AFAICanFind) exist.



You could make the parser return an Option[Value] instead of a Value. That way you could use flatMap to map the lines to rows and remove those that were invalid.


def parseLog(line:String):Option[Array[String]] = {
    val splitted = log.split("\t")
    if (validate(splitted)) Some(splitted) else None

val validRows = logs.flatMap(OurRowObject.parseLog(_))


10-26 21:17