我的代码如下:

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  env.enableCheckpointing(500);

  DataStream<String> stream = env.addSource(getConsumer(TOPIC_1));

  Jedis jedis = new Jedis("master1");
  stream.map(new RichMapFunction<String, String>() {
      @Override
      public String map(String value) throws Exception {
          String result = jedis.hget("rtc", value);
          return result;
      }
  });

我想从map()中的redis获取一些数据,但它无法运行,因为jedis.class不可序列化。
如何在map()中使用不可序列化类,如zkclient、jedis?

最佳答案

RichMapFunction这样的所有富函数都有一个open(Configuration)close调用,您可以覆盖它。这些生命周期方法在函数部署到执行它的taskmanager后调用。

class MyMapFunction extends RichMapFunction<String, String> {

    private transient Jedis jedis;

    @Override
    public void open(Configuration parameters) {
        // open connection to Redis, for example
        jedis = new Jedis("master1");
    }

    @Override
    public void close() {
        // close connection to Redis
        jedis.close();
    }
}

关于redis - 如何在flink map()中使用Jedis,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48151728/

10-12 01:31