我尝试从HDFS中存储的静态文本文件中读取数据,将其内容存储到ArrayBuffer中,然后应通过sparkContext.broadcast将其作为BroadcastVariable进行广播。我正在使用cloudera的spark,spark版本1.6.0-cdh5.7.0和spark-streaming_2.10。

我使用spark-submit在 yarn 上启动应用程序:

spark-submit --class my.package.BroadcastStreamTest1 --master yarn --deploy-mode client --conf spark.executor.userClassPathFirst = true current.jar

当我这样做时,我得到一个
java.lang.ClassCastException:无法将scala.Some实例分配给org.apache.spark.Accumulator实例中类型为scala.Option的org.apache.spark.Accumulable.name字段
与硬编码ArrayBuffer一起使用的相同代码可以完美地工作,因此我认为它与静态文件资源有关。
有人知道我可能做错了什么吗?任何帮助表示赞赏。

这不起作用:

对象BroadcastStreamTest1 {

def main(args:Array [String]){
val sparkConf =新的SparkConf()
val StreamingContext = new StreamingContext(sparkConf,batchDuration = Seconds(10))

val content = streamingContext.sparkContext
.textFile(“hdfs:///data/someTextFile.txt”)
。搜集()
.toBuffer [String]

val broadCastVar = StreamingContext.sparkContext.broadcast(content)
broadCastVar.value.foreach(line => println(line))

streamingContext.start()
StreamingContext.awaitTermination()
}
}

这有效:

对象BroadcastStreamTest2 {

def main(args:Array [String]){
val sparkConf =新的SparkConf()
val StreamingContext = new StreamingContext(sparkConf,batchDuration = Seconds(10))

val内容=新的mutable.ArrayBuffer [String]
(1到50).foreach(i => content + =“line” + i)

val broadCastVar = StreamingContext.sparkContext.broadcast(content)
broadCastVar.value.foreach(line => println(line))

streamingContext.start()
StreamingContext.awaitTermination()
}
}

堆栈跟踪:

25/04/25 10:09:59错误scheduler.TaskSetManager:阶段0中的任务0失败4次;放弃工作
线程“主”中的异常org.apache.spark.SparkException:由于阶段失败而导致作业中止:阶段0中的任务0失败4次,最近一次失败:阶段0.0中的任务0.3丢失(TID 6,n525.hadoop.mxint。 net):java.io.IOException:java.lang.ClassCastException:无法将scala.Some分配给org.apache.spark.Accumulator实例中类型为scala.Option的org.apache.spark.Accumulable.name字段。
在org.apache.spark.util.Utils $ .tryOrIOException(Utils.scala:1208)
在org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
在sun.reflect.NativeMethodAccessorImpl.invoke0( native 方法)处
在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在java.lang.reflect.Method.invoke(Method.java:606)
在java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
在java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
在scala.collection.immutable。$ colon $ colon.readObject(List.scala:362)
在sun.reflect.NativeMethodAccessorImpl.invoke0( native 方法)处
在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在java.lang.reflect.Method.invoke(Method.java:606)
在java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
在java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
在org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
在org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:194)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)
在java.lang.Thread.run(Thread.java:745)
由以下原因引起:java.lang.ClassCastException:无法将scala.Some实例分配给org.apache.spark.Accumulator实例中类型为scala.Option的org.apache.spark.Accumulable.name字段。
在java.io.ObjectStreamClass $ FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
在java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
在java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
在org.apache.spark.Accumulable $$ anonfun $ readObject $ 1.apply $ mcV $ sp(Accumulators.scala:152)
在org.apache.spark.util.Utils $ .tryOrIOException(Utils.scala:1205)
...另外30个

驱动程序堆栈跟踪:
在org.apache.spark.scheduler.DAGScheduler.org上$ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1431)
在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1419)
在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1418)
在scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)
在scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
在org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
位于org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:799)
位于org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:799)
在scala.Option.foreach(Option.scala:236)
在org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
在org.apache.spark.util.EventLoop $$ anon $ 1.run(EventLoop.scala:48)
在org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
在org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
在org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
在org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
在org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
在org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1.apply(RDD.scala:927)
在org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150)
在org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111)
在org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
在org.apache.spark.rdd.RDD.collect(RDD.scala:926)
在net.meetrics.dada.streaming.application.BroadcastStreamTest1 $ .main(BroadcastStreamTest1.scala:14)
在net.meetrics.dada.streaming.application.BroadcastStreamTest1.main(BroadcastStreamTest1.scala)
在sun.reflect.NativeMethodAccessorImpl.invoke0( native 方法)处
在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在java.lang.reflect.Method.invoke(Method.java:606)
在org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:731)中
在org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:181)
在org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206)
在org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:121)
在org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
由以下原因引起:java.io.IOException:java.lang.ClassCastException:无法将scala.Some分配给org.apache.spark.Accumulator实例中类型为scala.Option的org.apache.spark.Accumulable.name字段。
在org.apache.spark.util.Utils $ .tryOrIOException(Utils.scala:1208)
在org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
在sun.reflect.NativeMethodAccessorImpl.invoke0( native 方法)处
在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在java.lang.reflect.Method.invoke(Method.java:606)
在java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
在java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
在scala.collection.immutable。$ colon $ colon.readObject(List.scala:362)
在sun.reflect.NativeMethodAccessorImpl.invoke0( native 方法)处
在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在java.lang.reflect.Method.invoke(Method.java:606)
在java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
在java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
在org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
在org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:194)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)
在java.lang.Thread.run(Thread.java:745)
由以下原因引起:java.lang.ClassCastException:无法将scala.Some实例分配给org.apache.spark.Accumulator实例中类型为scala.Option的org.apache.spark.Accumulable.name字段。
在java.io.ObjectStreamClass $ FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
在java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
在java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
在org.apache.spark.Accumulable $$ anonfun $ readObject $ 1.apply $ mcV $ sp(Accumulators.scala:152)
在org.apache.spark.util.Utils $ .tryOrIOException(Utils.scala:1205)
...另外30个

最佳答案

原因是与我提供的jar文件存在某种冲突。

没有设定

spark.executor.userClassPathFirst=true

它有效,很遗憾,我无法找到问题的确切原因。

关于scala - Spark流: Broadcast variables, java.lang.ClassCastException,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/36836834/

10-12 17:05