本文介绍了不能在 pyspark 中应用 pandas_udf的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在连接到 AWS EMR 实例的 jupyter notebook 上尝试一些与 pyspark 相关的实验.我有一个 spark 数据框,它从 s3 读取数据,然后过滤掉一些东西.使用 df1.printSchema() 输出模式打印如下:

I'm trying out some pyspark related experiments on jupyter notebook attached to an AWS EMR instance. I've a spark dataframe which reads data from s3, and then filters out some stuffs. Printing the schema using df1.printSchema() outputs like this:

root
 |-- idvalue: string (nullable = true)
 |-- locationaccuracyhorizontal: float (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- is_weekend: boolean (nullable = true)
 |-- locationlatrad: float (nullable = true)
 |-- locationlonrad: float (nullable = true)
 |-- epochtimestamp: integer (nullable = true)

我正在尝试在此数据帧上应用 pandas_udf(示例 此处).我的 udf 是:

I'm trying to apply a pandas_udf on this dataframe (examples here). My udf being:

@pandas_udf(df1.schema, PandasUDFType.GROUPED_MAP)
def normalize(pdf):
    hour = pdf.hour
    return pdf.assign(hour=(hour - hour.mean()) / hour.std())

调用是这样的:

df2 = df1.groupBy('idvalue') \
        .apply(normalize).show()   

不幸的是,这是抛出错误,说:

Unfortunately this is throwing error, saying:

    An error occurred while calling o522.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task 0.3 in stage 11.0 (TID 31, x.x.x.x, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 256, in _make_accessor
    return maybe_to_datetimelike(data)
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 82, in maybe_to_datetimelike
    "datetimelike index".format(type(data)))
TypeError: cannot convert an object of type <class 'pandas.core.series.Series'> to a datetimelike index

During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/serializers.py", line 283, in dump_stream
    for series in iterator:
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/serializers.py", line 301, in load_stream
    yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/serializers.py", line 301, in <listcomp>
    yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/serializers.py", line 271, in arrow_to_pandas
    s = _check_series_convert_date(s, from_arrow_type(arrow_column.type))
  File "/mnt1/yarn/usercache/livy/appcache/application_1555045880196_0210/container_1555045880196_0210_01_000013/pyspark.zip/pyspark/sql/types.py", line 1692, in _check_series_convert_date
    return series.dt.date
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/generic.py", line 3610, in __getattr__
    return object.__getattribute__(self, name)
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/accessor.py", line 54, in __get__
    return self.construct_accessor(instance)
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 258, in _make_accessor
    raise AttributeError("Can only use .dt accessor with "
AttributeError: Can only use .dt accessor with datetimelike values

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 256, in _make_accessor
    return maybe_to_datetimelike(data)
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/indexes/accessors.py", line 82, in maybe_to_datetimelike
    "datetimelike index".format(type(data)))
TypeError: cannot convert an object of type <class 'pandas.core.series.Series'> to a datetimelike index

我不明白为什么它会抛出与日期时间相关的错误.我正在做的所有操作都与此无关.任何帮助表示赞赏.

I'm not understanding why it's throwing datetime related error. None of the operations that I'm doing is related to that. Any help is appreciated.

推荐答案

我认为 pandas_udf 尚不支持所有 spark 类型,而且您的 date_time 列似乎有问题.

I think pandas_udf doesn't support all the spark types yet, and it seems like it's having trouble with your date_time column.

任何 udf 的一个问题是所有数据都必须为您的 udf 实体化,即使 udf 忽略这些值,这可能会导致这样的问题,或最低限度的性能下降.在其他条件相同的情况下,您应该尝试减少传入 udf 的列数.例如,在 groupby 之前添加一个选择.

One issue with any udf is that all the data has to be materialized for your udf, even if the udf ignores the values, which can result in issues like this, or at minimum performance degradation. All else being equal, you should try to reduce the number of columns you pass into your udf. For example, by adding a select before your groupby.

df2 = df1.select('idvalue', 'hour').groupBy('idvalue').apply(normalize).show()

这篇关于不能在 pyspark 中应用 pandas_udf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-22 08:36