本文介绍了在Kafka-storm中未能将偏移数据写入Zookeeper的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我当时正在建立一个风暴集群,以计算实时趋势和其他统计数据,但是,通过允许最后一次由kafka-spout读取的偏移量(源),我在将恢复"功能引入该项目时遇到了一些问题. kafka-spout的代码来自 https://github.com/apache/incubator-storm/tree/master/external/storm-kafka ).我以这种方式启动kafka-spout:

I was setting up a storm cluster to calculate real time trending and other statistics, however I have some problems introducing the "recovery" feature into this project, by allowing the offset that was last read by the kafka-spout (the source code for kafka-spout comes from https://github.com/apache/incubator-storm/tree/master/external/storm-kafka) to be remembered. I start my kafka-spout in this way:

BrokerHosts zkHost = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHost, "test", "", "test");
kafkaConfig.forceFromStart = false;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("test" + "spout", kafkaSpout, ESConfig.spoutParallelism);

默认设置应该执行此操作,但是对于我而言,我认为这样做并非如此,每次我启动项目时,PartitionManager都会尝试查找具有偏移量的文件,但未找到任何内容:

The default settings should be doing this, but I think it is not doing so in my case, every time I start my project, the PartitionManager tries to look for the file with the offsets, then nothing is found:

2014-06-25 11:57:08 INFO  PartitionManager:73 - Read partition information from: /storm/partition_1  --> null
2014-06-25 11:57:08 INFO  PartitionManager:86 - No partition information found, using configuration to determine offset

然后从最近的可能偏移开始读取.如果我的项目从未失败过,但又不是我想要的,那没关系.

Then it starts reading from the latest possible offset. Which is okay if my project never fails, but not exactly what I wanted.

我还进一步研究了PartitionManager类,该类使用Zkstate类从以下代码片段中编写偏移量:

I also looked a bit more into the PartitionManager class which uses Zkstate class to write the offsets, from this code snippet:

PartitionManeger

PartitionManeger

public void commit() {
    long lastCompletedOffset = lastCompletedOffset();
    if (_committedTo != lastCompletedOffset) {
        LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
        Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
                .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                        "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                .put("offset", lastCompletedOffset)
                .put("partition", _partition.partition)
                .put("broker", ImmutableMap.of("host", _partition.host.host,
                        "port", _partition.host.port))
                .put("topic", _spoutConfig.topic).build();
        _state.writeJSON(committedPath(), data);

        _committedTo = lastCompletedOffset;
        LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
    } else {
        LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
    }
}

ZkState

public void writeBytes(String path, byte[] bytes) {
    try {
        if (_curator.checkExists().forPath(path) == null) {
            _curator.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(path, bytes);
        } else {
            _curator.setData().forPath(path, bytes);
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

我可以看到,对于第一条消息,writeBytes方法进入if块并尝试创建路径,然后对于第二条消息,它进入else块,这似乎没问题.但是,当我再次启动该项目时,会出现与上述相同的消息.找不到partition information.

I could see that for the first message, the writeBytes method gets into the if block and tries to create a path, then for the second message it goes into the else block, which seems to be ok. But when I start the project again, the same message as mentioned above shows up. No partition information can be found.

推荐答案

我遇到了同样的问题.原来,我是在本地模式下运行的,它使用的是内存中的Zookeeper,而不是Kafka使用的Zookeeper.

I had the same problem. Turned out I was running in local mode which uses an in memory zookeeper and not the zookeeper that Kafka is using.

要确保KafkaSpout不在存储偏移量的ZkState中使用Storm的ZooKeeper,除了ZkHosts外,还需要设置SpoutConfig.zkServersSpoutConfig.zkPortSpoutConfig.zkRoot.例如

To make sure that KafkaSpout doesn't use Storm's ZooKeeper for the ZkState that stores the offset, you need to set the SpoutConfig.zkServers, SpoutConfig.zkPort, and SpoutConfig.zkRoot in addition to the ZkHosts. For example

import org.apache.zookeeper.client.ConnectStringParser;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.KeyValueSchemeAsMultiScheme;

...

    final ConnectStringParser connectStringParser = new ConnectStringParser(zkConnectStr);
    final List<InetSocketAddress> serverInetAddresses = connectStringParser.getServerAddresses();
    final List<String> serverAddresses = new ArrayList<>(serverInetAddresses.size());
    final Integer zkPort = serverInetAddresses.get(0).getPort();
    for (InetSocketAddress serverInetAddress : serverInetAddresses) {
        serverAddresses.add(serverInetAddress.getHostName());
    }

    final ZkHosts zkHosts = new ZkHosts(zkConnectStr);
    zkHosts.brokerZkPath = kafkaZnode + zkHosts.brokerZkPath;

    final SpoutConfig spoutConfig = new SpoutConfig(zkHosts, inputTopic, kafkaZnode, kafkaConsumerGroup);
    spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(inputKafkaKeyValueScheme);

    spoutConfig.zkServers = serverAddresses;
    spoutConfig.zkPort = zkPort;
    spoutConfig.zkRoot = kafkaZnode;

这篇关于在Kafka-storm中未能将偏移数据写入Zookeeper的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-23 18:12