本文介绍了将 ArrayBuffer 转换为 DataFrame 中的 HashSet 到 Hive 表中的 RDD 时的 GenericRowWithSchema 异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个镶木地板格式的 Hive 表,它是使用

I have a Hive table in parquet format that was generated using

create table myTable (var1 int, var2 string, var3 int, var4 string, var5 array<struct<a:int,b:string>>) stored as parquet;

我能够验证它是否已填充——这是一个示例值

I am able to verify that it was filled -- here is a sample value

[1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])]

我希望将其放入表单的 Spark RDD

I wish to put this into a Spark RDD of the form

((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello"))))

现在,使用 spark-shell(我在 spark-submit 中遇到了同样的问题),我用这些值做了一个测试 RDD

Now, using spark-shell (I get the same problem in spark-submit), I made a test RDD with these values

scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), ArrayBuffer[(Int,String)]((1,"hello"))))))
tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[44] at parallelize at <console>:85

使用迭代器,我可以将 ArrayBuffer 转换为以下新 RDD 中的 HashSet:

using an iterator, I can cast the ArrayBuffer as a HashSet in the following new RDD:

scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[46] at map at <console>:87

scala> tempRDD2.collect.foreach(println)
((1,abcdef),((2,ghijkl),Set((1,hello))))

但是当我尝试对带有 HiveContext/SQLContext 的 DataFrame 执行完全相同的操作时,我收到以下错误:

But when I attempt to do the EXACT SAME THING with a DataFrame with a HiveContext / SQLContext, I get the following error:

scala> val hc = new HiveContext(sc)
scala> import hc._
scala> import hc.implicits._

scala> val tempHiveQL = hc.sql("""select var1, var2, var3, var4, var5 from myTable""")

scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt, a(1).toString), ((a(2).toString.toInt, a(3).toString), a(4).asInstanceOf[ArrayBuffer[(Int,String)]] )))

scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD3: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[47] at map at <console>:91

scala> tempRDD3.collect.foreach(println)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage 14.0 (TID 5211, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
       at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(<console>:91)
       at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
       at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
       at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
       at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
       at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
       at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
       at scala.collection.AbstractIterator.to(Iterator.scala:1157)
       at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
       at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
       at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
       at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
       at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
       at org.apache.spark.scheduler.Task.run(Task.scala:64)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
       at java.lang.Thread.run(Thread.java:724)

Driver stacktrace:
       at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
       at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
       at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
       at scala.Option.foreach(Option.scala:236)
       at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

请注意,我遇到了同样的错误GenericRowWithSchema cannot be cast to scala.Tuple2";当我使用 spark-submit 在编译程序中运行它时.程序在 RUN TIME 遇到转换步骤时崩溃,我没有编译器错误.

Note that I get this same error "GenericRowWithSchema cannot be cast to scala.Tuple2" when I run this in a compiled program using spark-submit. The program crashes at RUN TIME when it encounters the conversion step, and I had no compiler errors.

我的人工生成的 RDDtempRDD"在我看来很奇怪.将与转换一起工作,而 Hive 查询 DataFrame->RDD 则没有.我查了一下,两个RDD的形式都一样:

It seems very strange to me that my artificially generated RDD "tempRDD" would work with the conversion, whereas the Hive query DataFrame->RDD did not. I checked, and both of the RDDs have the same form:

scala> tempRDD
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = MapPartitionsRDD[21] at map at DataFrame.scala:776

scala> tempRDDfromHive
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[25] at parallelize at <console>:70

唯一的区别是他们最后一步的起源.在运行 tempRDD2 和 tempRDD3 的步骤之前,我什至尝试持久化、检查点和物化这些 RDD.所有人都收到了相同的错误消息.

the only difference is where their last step originated. I even tried persisting, checkpointing, and materializing these RDDs before running the steps for tempRDD2 and tempRDD3. All got the same error message.

我还阅读了相关的 stackoverflow 问题和 Apache Spark Jira 问题,并且从这些问题中我尝试将 ArrayBuffer 转换为迭代器,但在第二步中也失败了,并出现了相同的错误.

I also read though related stackoverflow questions and Apache Spark Jira issues, and from those I attempted casting the ArrayBuffer as an Iterator instead, but that also failed on the second step with the same error.

有谁知道如何将 ArrayBuffers 正确转换为源自 Hive 表的 DataFrames 的 HashSets?由于错误似乎只针对 Hive 表版本,我很想认为这是 SparkSQL 中 Spark/Hive 集成的问题.

Does anyone know how to properly convert ArrayBuffers to HashSets for DataFrames originating from Hive tables? Since the error seems to be only for the Hive table version, I'm tempted to think that this is an issue with Spark/Hive integration in SparkSQL.

有什么想法吗?

我的 Spark 版本是 1.3.0 CDH.

My Spark version is 1.3.0 CDH.

这是printSchema的结果:

Here are the printSchema results:

scala> tempRDDfromHive.printSchema()
root
 |-- var1: integer (nullable = true)
 |-- var2: string (nullable = true)
 |-- var3: integer (nullable = true)
 |-- var4: string (nullable = true)
 |-- var5: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: integer (nullable = true)
 |    |    |-- b: string (nullable = true)

推荐答案

你在 map 阶段实际得到的不是 ArrayBuffer[(Int, String)] 而是一个 ArrayBuffer[Row] 因此错误.忽略您需要的其他列是这样的:

What you actually get during map phase is not an ArrayBuffer[(Int, String)] but an ArrayBuffer[Row] hence the error. Ignoring other columns what you need is something like this:

import org.apache.spark.sql.Row

tempHiveQL.map((a: Row) =>
    a.getAs[Seq[Row]](4).map{case Row(k: Int, v: String) => (k, v)}.toSet)

这个问题似乎已在 Spark 1.5.0 中修复.

It looks like this issue has been fixed in Spark 1.5.0.

这篇关于将 ArrayBuffer 转换为 DataFrame 中的 HashSet 到 Hive 表中的 RDD 时的 GenericRowWithSchema 异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-24 11:02