我正在尝试使用 Spark 编写迭代算法。该算法包含一个主循环,其中使用不同的 Spark 命令进行并行处理。如果在每次迭代中只使用一个 Spark 命令,则一切正常。当使用多个命令时,Spark 的行为变得非常奇怪。主要的问题是,一个包含 2 个项目的 RDD 上的 map 命令不会导致 2 个,而是导致许多函数调用。

从迭代 1 到迭代 x-1 的每个命令似乎都在迭代 x 中执行。但不仅在循环的最后一次迭代中,而且在循环的每一次迭代中!

我构建了一个小示例来重现该行为(使用 Java 1.8 和 Spark 1.6.1)

首先是RDD中使用的数据结构:

public class Data implements Serializable {
    private static final long serialVersionUID = -6367920689454127925L;
    private String id;
    private Integer value;

    public Data(final String id, final Integer value) {
        super();
        this.id = id;
        this.value = value;
    }

    public String getId() {
        return this.id;
    }

    public Integer getValue() {
        return this.value;
    }

    public void setValue(final Integer value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Data [id=" + this.id + ", value=" + this.value + "]";
    }
}

对于 max 命令,我们使用比较器:
public class MyComparator implements java.util.Comparator<Data>, Serializable {

    private static final long serialVersionUID = 1383816444011380318L;

    private static final double EPSILON = 0.001;

    public MyComparator() {
    }

    @Override
    public int compare(final Data x, final Data y) {
        if (Math.abs(x.getValue() - y.getValue()) < EPSILON) {
            return 0;
        } else if (x.getValue() < y.getValue()) {
            return -1;
        } else {
            return 1;
        }
    }

}

现在使用算法的主程序:
public class Job implements Serializable {

    private static final long serialVersionUID = -1828983500553835114L;

    // Spark Settings
    private static final String APPNAME = "DebugApp - Main";
    private static final String SPARKMASTER = "local[1]";
    private static final int MAX_ITERATIONS = 4;

    public Job() {
    }

    public static void main(final String[] args) {
        final Job job = new Job();
        job.run();
    }

    public void run() {
        final JavaSparkContext sparkContext = createSparkContext();
        final List<Data> dataSet = new ArrayList<Data>();
        dataSet.add(new Data("0", 0));
        dataSet.add(new Data("1", 0));

        JavaRDD<Data> dataParallel = sparkContext.parallelize(dataSet);

        // We use an accumulator to count the number of calls within the map command
        final Accumulator<Integer> accum = sparkContext.accumulator(0);

        final MyComparator comparator = new MyComparator();
        for (int iterations = 0; iterations < MAX_ITERATIONS; iterations++) {
            // If the item which should be updated is selected using the iteration counter everything works fine...
            // final String idToUpdate = new Integer(iterations % 2).toString();

            // ..., but if the element with the minimal value is selected the number of executions in the map command increases.
            final String idToUpdate = dataParallel.min(comparator).getId();
            dataParallel = dataParallel.map(data -> {
                accum.add(1); // Counting the number of function calls.
                return updateData(data, idToUpdate);
            });
        }

        final List<Data> resultData = dataParallel.collect();
        System.out.println("Accumulator: " + accum.value());
        for (Data data : resultData) {
            System.out.println(data.toString());
        }
    }

    private Data updateData(final Data data, final String id) {
        if (data.getId().equals(id)) {
            data.setValue(data.getValue() + 1);
        }
        return data;
    }

    private JavaSparkContext createSparkContext() {
        final SparkConf conf = new SparkConf().setAppName(APPNAME).setMaster(SPARKMASTER);
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.set("spark.kryo.registrator", "de.eprofessional.bidmanager2.engine.serialization.KryoRegistratorWrapper");
        return new JavaSparkContext(conf);

    }
}

我希望对于每次迭代,我们都会获得 2 个函数调用,如果使用迭代计数器选择要更新的项目(请参阅累加器结果 1),就会出现这种情况。但是如果使用 min 命令选择元素,我们会得到不同的结果(见累加器结果 2):
+----------------+----------------------+----------------------+
| MAX_ITERATIONS | Accumulator Result 1 | Accumulator Result 2 |
+----------------+----------------------+----------------------+
|              1 |                    2 |                    2 |
|              2 |                    4 |                    6 |
|              3 |                    6 |                   12 |
|              4 |                    8 |                   20 |
+----------------+----------------------+----------------------+

有人对 map 命令中的额外调用有解释吗?

最佳答案

RDD 上的操作定义了所谓的“谱系”。每个 RDD 都有一个对它的父级(或父级,在例如连接的情况下)的引用。当 RDD 被物化时,这个谱系被访问。这构成了 RDD 弹性的基础:Spark 可以通过在给定的数据分区上执行所述沿袭来重新创建数据集上的所有操作以得出结果。

这里发生的事情是我们正在链接 .map 调用。如果我们展开循环,我们会看到如下内容:

iter1 -> rdd.map(f)
iter2 -> rdd.map(f).map(f)
iter3 -> rdd.map(f).map(f).map(f)
...

我们可以通过在循环中发出 rdd.toDebugString 来看到这一点。

因此,最重要的是:每次通过实际上都会为前一阶段添加一个沿袭步骤。如果我们想打破这种血统,我们应该在每次迭代时对 RDD 进行 checkpoint 以“记住”最后的中间结果。 cache 有类似的效果,除了它不能保证评估停止(以防没有更多的内存可以缓存)。因此,RDD 实现可以进一步评估谱系。

关于algorithm - Spark - 迭代算法的奇怪行为,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37747712/

10-16 02:36