本文介绍了无法在pyspark中应用pandas_udf的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在附加到AWS EMR实例的jupyter笔记本上进行一些与pyspark相关的实验.我有一个spark数据框,它从s3读取数据,然后过滤掉一些东西.使用df1.printSchema()输出打印模式,如下所示:

I'm trying out some pyspark related experiments on jupyter notebook attached to an AWS EMR instance. I've a spark dataframe which reads data from s3, and then filters out some stuffs. Printing the schema using df1.printSchema() outputs like this:

root
 |-- idvalue: string (nullable = true)
 |-- locationaccuracyhorizontal: float (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- is_weekend: boolean (nullable = true)
 |-- locationlatrad: float (nullable = true)
 |-- locationlonrad: float (nullable = true)
 |-- epochtimestamp: integer (nullable = true)

我正在尝试在此数据帧上应用pandas_udf(示例此处).我的udf是:

I'm trying to apply a pandas_udf on this dataframe (examples here). My udf being:

@pandas_udf(df1.schema, PandasUDFType.GROUPED_MAP)
def normalize(pdf):
    hour = pdf.hour
    return pdf.assign(hour=(hour - hour.mean()) / hour.std())

调用如下:

df2 = df1.groupBy('idvalue') \
        .apply(normalize).show()   

不幸的是,这引发了错误,说:

Unfortunately this is throwing error, saying:

    An error occurred while calling o522.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task 0.3 in stage 11.0 (TID 31, x.x.x.x, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 256, in _make_accessor
    return maybe_to_datetimelike(data)
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 82, in maybe_to_datetimelike
    "datetimelike index".format(type(data)))
TypeError: cannot convert an object of type <class 'pandas.core.series.Series'> to a datetimelike index

During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/serializers.py", line 283, in dump_stream
    for series in iterator:
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/serializers.py", line 301, in load_stream
    yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/serializers.py", line 301, in <listcomp>
    yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/serializers.py", line 271, in arrow_to_pandas
    s = _check_series_convert_date(s, from_arrow_type(arrow_column.type))
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/sql/types.py", line 1692, in _check_series_convert_date
    return series.dt.date
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/generic.py", line 3610, in __getattr__
    return object.__getattribute__(self, name)
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/accessor.py", line 54, in __get__
    return self.construct_accessor(instance)
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 258, in _make_accessor
    raise AttributeError("Can only use .dt accessor with "
AttributeError: Can only use .dt accessor with datetimelike values

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 256, in _make_accessor
    return maybe_to_datetimelike(data)
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 82, in maybe_to_datetimelike
    "datetimelike index".format(type(data)))
TypeError: cannot convert an object of type <class 'pandas.core.series.Series'> to a datetimelike index

我不明白为什么会引发与日期时间相关的错误.我正在执行的任何操作都与此无关.任何帮助表示赞赏.

I'm not understanding why it's throwing datetime related error. None of the operations that I'm doing is related to that. Any help is appreciated.

推荐答案

我认为pandas_udf尚不支持所有的spark类型,似乎date_time列有问题.

I think pandas_udf doesn't support all the spark types yet, and it seems like it's having trouble with your date_time column.

任何udf的一个问题是,即使udf忽略了这些值,也必须为您的udf实现所有数据,否则可能会导致此类问题,或导致性能最低程度地下降.在所有其他条件相同的情况下,您应该尝试减少传递到udf中的列数.例如,通过在groupby之前添加一个选择.

One issue with any udf is that all the data has to be materialized for your udf, even if the udf ignores the values, which can result in issues like this, or at minimum performance degradation. All else being equal, you should try to reduce the number of columns you pass into your udf. For example, by adding a select before your groupby.

df2 = df1.select('idvalue', 'hour').groupBy('idvalue').apply(normalize).show()

这篇关于无法在pyspark中应用pandas_udf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-22 08:35