本文介绍了我有一个用于kafka connect的kafka管道(json问题更新)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我根据一些建议进行了更新.但是流应用程序会在一段时间后终止.没有表演.ide显示的以下代码中没有错误.最后,我将数据发送给主题,因为键等于字符串和值(作为json对象).仍然无法正常工作.

so i updated according to some suggestions. but the streams application terminates after some time. without performing. no error in below code shown by ide. at last i'm sending data to topic as key equals string and value as a json object. still not working.

我猜它是一行还是什么,但不确定我是否正确.请.还附有下面的错误屏幕截图.

i guess its a line or something but not sure if im right. please. also attached the error screenshot below.

 Serializer<JsonNode> jsonSerializer = new JsonSerializer();
            Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
            Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
    
            JSONObject jsnObj = new JSONObject();
    
           ......(word count manipulationover part over here)
    
            KTable<Windowed<String>, Long> Ttable = TgroupedStream
                    .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
                    .count();
    
            Ttable
                    .toStream()
                    .selectKey((key, word) -> key.key())
                    .map((key, value) -> {
                                JSONParser par = new JSONParser();
                                StringWriter sw = new StringWriter();
    
                                KeyValue<String, JsonNode> kv = null;
                                try {
                                    ObjectMapper objectMapper = new ObjectMapper();
                                    JsonNode jsonNode = objectMapper.readTree("{ \"word\": \"" + key + "\" \",\" count: \"" + value + "\" }");
                                    KeyValue.pair(key.concat("s"), jsonNode);
                                    kv = KeyValue.pair(key.concat("s"), jsonNode);
    
                                } catch (JsonMappingException e) {
                                    e.printStackTrace();
                                } catch (JsonProcessingException e) {
                                    e.printStackTrace();
                                }
                                return kv;
                            }
                    )
                    .to("badliar", Produced.with(Serdes.String(), jsonSerde));
    
          
            KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
            streams.start();
    
            Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        }

推荐答案

您正在使用包含所需确切数据的键值对.您无需解析任何内容,只需创建JsonNode并返回它即可.

You're consuming a key-value pair containing the exact data you want. You dont need to parse anything, just create the JsonNode and return it.

final ObjectMapper mapper = new ObjectMapper();

Ttable
        .toStream()
        .selectKey((key, word) -> key.key())
        .map((key, value) -> {
             ObjectNode rootNode = mapper.createObjectNode();

             rootNode.put("word", key);
             rootNode.put("count", value);
                        
             return new KeyValue.pair(key, jsonNode);           
        })

如果您不修改密钥,也可以使用 mapValues 代替 map

You can also use mapValues instead of map if you aren't modifying the key

这篇关于我有一个用于kafka connect的kafka管道(json问题更新)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-02 13:26