我想用 Spark runner 运行管道,数据存储在远程机器上.已使用以下命令提交作业:

I want to run a pipeline with Spark runner and data is stored on a remote machine.The following command has been used to submit the job:

./spark-submit   --class org.apache.beam.examples.WordCount   --master spark://   --deploy-mode cluster   --supervise   --executor-memory 2G   --total-executor-cores 4 hdfs:// --runner=SparkRunner


Running Spark using the REST application submission protocol.
        Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
        17/06/12 14:44:49 INFO RestSubmissionClient: Submitting a request to launch an application in spark://
        17/06/12 14:44:49 INFO RestSubmissionClient: Submission successfully created as driver-20170612200920-0006. Polling submission state...
        17/06/12 14:44:49 INFO RestSubmissionClient: Submitting a request for the status of submission driver-20170612200920-0006 in spark://
        17/06/12 14:44:49 INFO RestSubmissionClient: State of driver driver-20170612200920-0006 is now RUNNING.
        17/06/12 14:44:49 INFO RestSubmissionClient: Driver is running on worker worker-20170612193258- at
        17/06/12 14:44:49 INFO RestSubmissionClient: Server responded with CreateSubmissionResponse:
          "action" : "CreateSubmissionResponse",
          "message" : "Driver successfully submitted as driver-20170612200920-0006",
          "serverSparkVersion" : "1.6.3",
          "submissionId" : "driver-20170612200920-0006",
          "success" : true

但是,作业卡在正在运行"状态,stderror 显示以下异常以及其他详细信息:

Howewever,the job is stuck in 'RUNNING' status with stderror displaying the following exception along with other details:

Exception in thread "main" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
        at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
    Caused by: java.lang.IllegalStateException: Unable to find registrar for hdfs
        at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447)
        at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:523)
        at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:204)
        at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:294)
        at org.apache.beam.examples.WordCount.main(WordCount.java:132)
        ... 6 more


The following are the plugins and dependencies i used in my project:



            <name>Apache Development Snapshot Repository</name>

    <!--         <exclusions>
            </exclusions> -->

        <!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->

        <!-- Dependencies below this line are specific dependencies needed by the examples code. -->
                <!-- Exclude an old version of guava that is being pulled
                     in by a transitive dependency of google-api-client -->

                <!-- Exclude an old version of guava that is being pulled
                     in by a transitive dependency of google-api-client -->

                <!-- Exclude an old version of guava that is being pulled
                     in by a transitive dependency of google-api-client -->

                <!-- Exclude an old version of guava that is being pulled
                     in by a transitive dependency of google-api-client -->



        <!-- Add slf4j API frontend binding with JUL backend -->

            <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->

        <!-- Hamcrest and JUnit are required dependencies of PAssert,
             which is used in the main code of DebuggingWordCount example. -->



        <!-- The DirectRunner is needed for unit tests. -->

            <!-- Ensure that the Maven jar plugin runs before the Maven
              shade plugin by listing the plugin higher within the file. -->

              Configures `mvn package` to produce a bundled jar ("fat jar") for runners
              that require this for job submission to a cluster.


fatjar 包含 HadoopFileSystemRegistrar.以下是WordCount类的源代码:

The fatjar contains HadoopFileSystemRegistrar.The following is the source code of the WordCount class:

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.beam.examples;

import java.util.Collections;

import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.hdfs.HadoopFileSystemOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
//import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hadoop.conf.Configuration;

 * An example that counts words in Shakespeare and includes Beam best practices.
public class WordCount {
    static class ExtractWordsFn extends DoFn<String, String> {
        private final Counter emptyLines = Metrics
                .counter(ExtractWordsFn.class, "emptyLines");

        public void processElement(ProcessContext c) {
            if (c.element().trim().isEmpty()) {

            // Split the line into words.
            String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN);

            // Output each word encountered into the output PCollection.
            for (String word : words) {
                if (!word.isEmpty()) {

     * A SimpleFunction that converts a Word and Count into a printable string.
    public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
        public String apply(KV<String, Long> input) {
            return input.getKey() + ": " + input.getValue();

    public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
        public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

            // Convert lines of text into individual words.
            PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

            // Count the number of times each word occurs.
            PCollection<KV<String, Long>> wordCounts = words

            return wordCounts;

     * Options supported by {@link WordCount}. Concept #4: Defining your own
     * configuration options. Here, you can add your own arguments to be
     * processed by the command-line parser, and specify default values for
     * them. You can then access the options values in your pipeline code.
     * Inherits standard configuration options.
    public interface WordCountOptions extends HadoopFileSystemOptions {

         * By default, this example reads from a public dataset containing the
         * text of King Lear. Set this option to choose a different input file
         * or glob.
        @Description("Path of the file to read from")
        String getInputFile();

        void setInputFile(String value);

         * Set this required option to specify where to write the output.
        @Description("/home/ankit/kinglear_chandan.txt ")
        String getOutput();

        void setOutput(String value);

    public static void main(String[] args) {
           String[] args1 =new String[]{ "--hdfsConfiguration=[{\"fs.defaultFS\" : \"hdfs://\"}]","--runner=SparkRunner"};
           WordCountOptions options = PipelineOptionsFactory
    Pipeline p = Pipeline.create(options);
        p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
        .apply(new CountWords())
                .apply(MapElements.via(new FormatAsTextFn()))
                .apply("WriteCounts", TextIO.write().to(options.getOutput()));


我遇到了同样的问题.请查看此 Jira 票证 https://issues.apache.org/jira/projects/BEAM/issues/BEAM-2429 并设置参数 fs.defaultFS 来处理 hdfs 路径.希望这对您有所帮助.

I had the same issue. Please take a look to this Jira ticket https://issues.apache.org/jira/projects/BEAM/issues/BEAM-2429 and set the parameter fs.defaultFS to handle hdfs path. Hope this will help you.

