本文介绍了任务在Spark中产生了不可序列化的结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用cassandra驱动程序读取cassandra表.这是代码.

I am trying to read cassandra table using the cassandra driver to the spark.Here is the code.

val x = 1 to 2
val rdd = sc.parallelize(x)

val query = "Select data from testkeyspace.testtable where id=%d"

val cc = CassandraConnector(sc.getConf)

val res1 =
    rdd.map{ it =>
            cc.withSessionDo{ session =>
            session.execute( query.format(it))
        }
     }

res1.take(1).foreach(println)

但是我遇到了异常Task的结果无法序列​​化.

but I am getting the exception Task had a not serializable result.

  org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0 in stage 24.0 (TID 77) had a not serializable result: com.datastax.driver.core.ArrayBackedResultSet$SinglePage
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

如何解决?

推荐答案

在我们的转换中,不可序列化的对象是从Cassandra返回的结果,可对查询结果进行迭代.通常,您希望将该集合具体化为RDD.

The non-serializable object in our transformation is the result coming back from Cassandra, which is an iterable on the query result.You typically want to materialize that collection into the RDD.

一种方法是询问该查询产生的所有记录:

One way would be to ask all records resulting from that query:

session.execute( query.format(it)).all()

这篇关于任务在Spark中产生了不可序列化的结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-03 21:26