我已经开始将Pyspark应用程序编写为Java实现。我正在使用Java8。我刚刚开始在Java中执行一些基本的 Spark 程序。我使用了以下wordcount示例。

SparkConf conf = new SparkConf().setMaster("local").setAppName("Work Count App");

// Create a Java version of the Spark Context from the configuration
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> lines = sc.textFile(filename);

JavaPairRDD<String, Integer> counts = lines.flatMap(line -> Arrays.asList(line.split(" ")))
                    .mapToPair(word -> new Tuple2(word, 1))
                    .reduceByKey((x, y) -> (Integer) x + (Integer) y)
                    .sortByKey();

我在Type mismatch: cannot convert from JavaRDD<Object> to JavaRDD<String>中收到lines.flatMap(line -> Arrays.asList(line.split(" ")))错误
当我谷歌搜索时,在所有基于Java 8的spark示例中,我都看到了与上述相同的实现。我的环境或程序出了什么问题。

有人能帮我吗 ?

最佳答案

使用此代码。实际问题是rdd.flatMap函数在您的代码创建Iterator<String>时期望List<String>。调用iterator()将解决此问题。

JavaPairRDD<String, Integer> counts = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
            .mapToPair(word -> new Tuple2<String, Integer>(word, 1))
            .reduceByKey((x, y) ->  x +  y)
            .sortByKey();

counts.foreach(data -> {
        System.out.println(data._1()+"-"+data._2());
    });

10-01 21:45