本文介绍了从任务中调用Java/Scala函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

背景

我最初的问题是为什么在地图函数中使用DecisionTreeModel.predict会引发异常?并与如何使用MLlib在Spark上生成(原始标签,预测标签的)元组?

当我们使用Scala API时,推荐的方法DecisionTreeModel获取RDD[LabeledPoint]的预测的一种方法是简单地映射到RDD:

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

不幸的是,PySpark中的类似方法无法很好地发挥作用:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()

建议使用官方文档像这样的东西:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

那么这是怎么回事?此处和 Scala API 定义predict如下:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing a single data point
 * @return Double prediction from the trained model
 */
def predict(features: Vector): Double = {
  topNode.predict(features)
}

/**
 * Predict values for the given data set using the model trained.
 *
 * @param features RDD representing data points to be predicted
 * @return RDD of predictions for each of the given data points
 */
def predict(features: RDD[Vector]): RDD[Double] = {
  features.map(x => predict(x))
}

因此,至少乍一看,从动作或变换进行调用就不是问题,因为预测似乎是本地操作.

说明

经过一番挖掘,我发现问题的根源是JavaModelWrapper.call 方法="noreferrer"> DecisionTreeModel.predict .它访问 SparkContext调用Java函数所必需的:

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

问题

对于DecisionTreeModel.predict,有一个建议的解决方法,所有必需的代码已经是Scala API的一部分,但是一般来说,是否有任何优雅的方式来处理此类问题?

只有我现在能想到的解决方案才是重量级的:

  • 通过隐式转换扩展Spark类或添加某种包装器,将所有内容推送到JVM
  • 直接使用Py4j网关

解决方案

使用默认Py4J网关进行通讯是完全不可能的.要了解为什么我们必须看一下PySpark内部文档[1]中的下图:

由于Py4J网关在驱动程序上运行,Python解释器无法通过套接字与JVM工作者进行通信(例如,参见 PythonRDD / rdd.py ).

从理论上讲,可以为每个工作人员创建一个单独的Py4J网关,但实际上这不太可能有用.忽略诸如可靠性Py4J之类的问题根本不是为了执行数据密集型任务.

有没有解决方法?

  1. 使用使用Spark SQL数据源API 来包装JVM代码.

    专家:受支持的高级文件,不需要访问内部PySpark API

    缺点:相对冗长且没有很好的记录,主要限于输入数据

  2. 使用Scala UDF在DataFrame上进行操作.

    优点:易于实现(请参见 Spark:如何使用Scala或Java用户定义函数映射Python ?),如果数据已经存储在DataFrame中,则在Python和Scala之间不会进行数据转换,对Py4J的访问最少

    缺点:需要访问Py4J网关和内部方法,仅限Spark SQL,难以调试,不支持

  3. 以类似于在MLlib中完成的方式创建高级Scala界面.

    优点:灵活,能够执行任意复杂的代码.可以直接在RDD上进行操作(例如,参见 MLlib模型包装器)或与DataFrames一起使用(请参见如何使用Scala类在Pyspark内部).后一种解决方案似乎更加友好,因为所有序列化细节都已由现有API处理.

    缺点:低级,必需的数据转换,与UDF相同,要求访问Py4J和内部API,不支持

    可以在使用Scala转换PySpark RDD中找到一些基本示例

  4. 使用外部工作流管理工具在Python和Scala/Java作业之间切换,并将数据传递到DFS.

    优点:易于实施,对代码本身的更改最少

    缺点:读取/写入数据的成本( Alluxio ?)

  5. 使用共享的SQLContext(例如,参见 Apache Zeppelin Livy )使用已注册的临时表在来宾语言之间传递数据.

    优点:非常适合交互式分析

    缺点:对于批处理作业(Zeppelin)而言不多,或者可能需要其他编排(Livy)


  1. 约书亚·罗森(Joshua Rosen). (2014年8月4日) PySpark内部知识.取自 https://cwiki.apache.org/confluence/display/SPARK/PySpark +内部人员

Background

My original question here was Why using DecisionTreeModel.predict inside map function raises an exception? and is related to How to generate tuples of (original lable, predicted label) on Spark with MLlib?

When we use Scala API a recommended way of getting predictions for RDD[LabeledPoint] using DecisionTreeModel is to simply map over RDD:

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

Unfortunately similar approach in PySpark doesn't work so well:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()

Instead of that official documentation recommends something like this:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

So what is going on here? There is no broadcast variable here and Scala API defines predict as follows:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing a single data point
 * @return Double prediction from the trained model
 */
def predict(features: Vector): Double = {
  topNode.predict(features)
}

/**
 * Predict values for the given data set using the model trained.
 *
 * @param features RDD representing data points to be predicted
 * @return RDD of predictions for each of the given data points
 */
def predict(features: RDD[Vector]): RDD[Double] = {
  features.map(x => predict(x))
}

so at least at the first glance calling from action or transformation is not a problem since prediction seems to be a local operation.

Explanation

After some digging I figured out that the source of the problem is a JavaModelWrapper.call method invoked from DecisionTreeModel.predict. It access SparkContext which is required to call Java function:

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

Question

In case of DecisionTreeModel.predict there is a recommended workaround and all the required code is already a part of the Scala API but is there any elegant way to handle problem like this in general?

Only solutions I can think of right now are rather heavyweight:

  • pushing everything down to JVM either by extending Spark classes through Implicit Conversions or adding some kind of wrappers
  • using Py4j gateway directly

解决方案

Communication using default Py4J gateway is simply not possible. To understand why we have to take a look at the following diagram from the PySpark Internals document [1]:

Since Py4J gateway runs on the driver it is not accessible to Python interpreters which communicate with JVM workers through sockets (See for example PythonRDD / rdd.py).

Theoretically it could be possible to create a separate Py4J gateway for each worker but in practice it is unlikely to be useful. Ignoring issues like reliability Py4J is simply not designed to perform data intensive tasks.

Are there any workarounds?

  1. Using Spark SQL Data Sources API to wrap JVM code.

    Pros: Supported, high level, doesn't require access to the internal PySpark API

    Cons: Relatively verbose and not very well documented, limited mostly to the input data

  2. Operating on DataFrames using Scala UDFs.

    Pros: Easy to implement (see Spark: How to map Python with Scala or Java User Defined Functions?), no data conversion between Python and Scala if data is already stored in a DataFrame, minimal access to Py4J

    Cons: Requires access to Py4J gateway and internal methods, limited to Spark SQL, hard to debug, not supported

  3. Creating high level Scala interface in a similar way how it is done in MLlib.

    Pros: Flexible, ability to execute arbitrary complex code. It can be don either directly on RDD (see for example MLlib model wrappers) or with DataFrames (see How to use a Scala class inside Pyspark). The latter solution seems to be much more friendly since all ser-de details are already handled by existing API.

    Cons: Low level, required data conversion, same as UDFs requires access to Py4J and internal API, not supported

    Some basic examples can be found in Transforming PySpark RDD with Scala

  4. Using external workflow management tool to switch between Python and Scala / Java jobs and passing data to a DFS.

    Pros: Easy to implement, minimal changes to the code itself

    Cons: Cost of reading / writing data (Alluxio?)

  5. Using shared SQLContext (see for example Apache Zeppelin or Livy) to pass data between guest languages using registered temporary tables.

    Pros: Well suited for interactive analysis

    Cons: Not so much for batch jobs (Zeppelin) or may require additional orchestration (Livy)


  1. Joshua Rosen. (2014, August 04) PySpark Internals. Retrieved from https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals

这篇关于从任务中调用Java/Scala函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-22 22:50