本文介绍了在数据流泛型中进行转换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这与另一个SO问题有关[在这里]()在解决方法之后,帮助我在转换中使用自定义类型。但是因为我的自定义类型是通用的,所以我希望甚至可以使变换类成为泛型,然后可以使用相同类型对自定义类型进行参数化。但是当我尝试这样做时,我遇到了无法为类型变量T提供编码器,因为实际类型由于擦除而未知。解决方法建议注册一个编码器,它将返回类型参数,但由于类型参数本身是未知的,我猜这个异常被抛出,我不知道如何解决这个问题。

 的此实例的所有内部类, code>,类型变量 T 静态绑定到类型 String  。在这种情况下,最好使处理器为抽象类,以便消费者需要继承它。



2.覆盖 getOutputTypeDescriptor of ParDoFn 来解析其类型与外部类处理器。




$ b

  class Processor< T extends Serializable>扩展... {
class ParDoFn扩展DoFn< String,KV< String,Set< CustomType< T>>>> {
@Override
protected TypeDescriptor< KV< String,Set< CustomType< T>>>>
getOutputTypeDescriptor(){
返回新的TypeDescriptor< KV< String,Set< CustomType< T>>>>(
Processor.this.getClass()){};




$ b $ p $从一开始就是代码的完整工作版本然后是以下。再次注意,当得到解决时,这些都不是必需的。

  class CustomType< T extends Serializable> {...} 

抽象类Processor< T extends Serializable>
扩展PTransform< PCollection< String>,
PCollection< KV< String,Set< CustomType< T>>>>> {

class ParDoFn扩展DoFn< String,KV< String,Set< CustomType< T>>>> {
...

@Override
protected TypeDescriptor< KV< String,Set< CustomType< T>>>>
getOutputTypeDescriptor(){
返回新的TypeDescriptor< KV< String,Set< CustomType< T>>>>(
Processor.this.getClass()){};
}
}

类合并扩展BinaryCombineFn< Set< CustomType< T>>> {...}

@Override
public PCollection< KV< String,Set< CustomType< T>>>> apply(PCollection< String> items){

PCollection< KV< String,Set< CustomType< T>>>> partitionedItems =
items.apply(ParDo.of(new ParDoFn()));

PCollection< KV< String,Set< CustomType< T>>>> combinedItems =
partitionedItems.apply(
)合并。< String,Set< CustomType< T>>,Set< CustomType< T>> perKey(
new Merger()) );

返回combinedItems;
}
}

PCollection< String>输入= ...;
input.apply(new Processor< String>(){});

这不是唯一的解决方案 - 您也可以覆盖关于中间 partitionedItems 集合 - 但它似乎是最通用的为此使用。


This is related to another SO question [here] (Setting Custom Coders & Handling Parameterized types) Following the work-arounds there helped me use custom-types in the transforms. But since my custom types are generic, I was hoping to make even the transform classes generic which then could parameterize the custom type with the same type. But when I try doing that, I run into Cannot provide a Coder for type variable T because the actual type is unknown due to erasure. The work around suggested registering a coder which would return the type parameter but since the type parameter is itself unknown, I guess this exception is thrown and I was not sure how to get around this.

static class Processor<T> 
  extends PTransform<PCollection<String>, 
                     PCollection<KV<String, Set<CustomType<T>>>>> { 

  private static final long serialVersionUID = 0; 

  @Override public PCollection<KV<String, Set<CustomType<T>>>>  
  apply(PCollection<String> items) {
    PCollection<KV<String, Set<CustomType<T>>>> partitionedItems = items     
        .apply(ParDo.of(new ParDoFn())); 
    PCollection<KV<String, Set<CustomType<T>>>> combinedItems = partitionedItems
        .apply(Combine.<String, Set<CustomType<T>>>perKey(new Merger()));
  }
} 
解决方案

This looks to be also caused by Github Issue #57 and should be fixed along with that issue.

In the meantime, Dataflow actually includes advanced features that can solve your problem immediately. It appears from your code snippet that the entire system in question may look something like this:

class CustomType<T extends Serializable> { ... }

class Processor<T extends Serializable>
    extends PTransform<PCollection<String>,
                       PCollection<KV<String, Set<CustomType<T>>>>> {

    class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> { … }

    class Merger extends BinaryCombineFn<Set<CustomType<T>>> { … }

    @Override
    public PCollection<KV<String, Set<CustomType<T>>>>
    apply(PCollection<String> items) {

      PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
          items.apply(ParDo.of(new ParDoFn()));

      PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
          partitionedItems.apply(
              Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
                  new Merger()));

      return combinedItems;
    }
}

…

PCollection<String> input = ...
input.apply(new Processor<String>());

Dataflow obtains the output type of each DoFn by using a TypeDescriptor returned by getOutputTypeDescriptor

Because your ParDoFn is an inner class of Processor<T>, the output type descriptor is simply Set<CustomType<T>>, even when it is instantiated as new Processor<String>.

To gain type information, we need ParDoFn to know, statically, the type provided for T. There are two steps for this.

1. Create an anonymous subclass of Processor

PCollection<String> input = ...
input.apply(new Processor<String>() {});

This ensures that for all the inner classes of this instance of Processor, the type variable T is statically bound to the type String. It is probably best in this case to make Processor an abstract class so consumers are required to subclass it.

2.Override getOutputTypeDescriptor of ParDoFn to resolve its types against the outer class Processor.

class Processor<T extends Serializable> extends ... {
  class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
    @Override
    protected TypeDescriptor<KV<String, Set<CustomType<T>>>>
    getOutputTypeDescriptor() {
      return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
        Processor.this.getClass()) {};
    }
 }

The complete working version of the code from the beginning is then the following. Note again that none of this will be necessary when Github Issue #57 is resolved.

class CustomType<T extends Serializable> { ... }

abstract class Processor<T extends Serializable>
    extends PTransform<PCollection<String>,
                       PCollection<KV<String, Set<CustomType<T>>>>> {

  class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
    ...

    @Override
    protected TypeDescriptor<KV<String, Set<CustomType<T>>>> 
    getOutputTypeDescriptor() {
      return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
          Processor.this.getClass()) {};
    }
  }

  class Merger extends BinaryCombineFn<Set<CustomType<T>>> { ... }

    @Override
    public PCollection<KV<String, Set<CustomType<T>>>> apply(PCollection<String> items) {

    PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
        items.apply(ParDo.of(new ParDoFn()));

    PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
        partitionedItems.apply(
          Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
              new Merger()));

    return combinedItems;
  }
}

PCollection<String> input = …;
input.apply(new Processor<String>() {});

This is not the only solution -- you could also override Processor.getDefaultOutputCoder or explicitly call setCoder on the intermediate partitionedItems collection -- but it seems to be the most general for this use.

这篇关于在数据流泛型中进行转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-15 13:56