本文介绍了如何为 junit 测试实例化 Mock Kafka Topic?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对使用 kafka 主题的代码进行了一些 JUnit 测试.我尝试过的模拟 kafka 主题不起作用,而且网上找到的示例很旧,因此它们也不适用于 0.8.2.1.如何使用 0.8.2.1 创建模拟 kafka 主题?

I have some JUnit tests on code that uses a kafka topic. The mock kafka topics I've tried do not work and the examples found online are very old so they also do not work with 0.8.2.1. How do I create a mock kafka topic using 0.8.2.1?

澄清:我选择使用该主题的实际嵌入式实例,以便使用真实实例进行测试,而不是在 mockito 中模拟手动操作.这样我就可以测试我的自定义编码器和解码器是否实际工作,并且当我去使用真正的 kafka 实例时它不会失败.

To clarify: I'm choosing to use an actual embedded instance of the topic in order to test with a real instance rather than mocking the hand off in mockito. This is so I can test that my custom encoders and decoders actually work and it doesn't fail when I go to use a real kafka instance.

推荐答案

https://gist.github.com/asmaier/6465468#file-kafkaproducertest-java

此示例已更新为可在新的 0.8.2.2 版本中使用.这是带有 maven 依赖项的代码片段:

This example was updated to be working in the new 0.8.2.2 version. Here is the code snippet with maven dependencies:

pom.xml:

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.8.2.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.8.2.2</version>
      <classifier>test</classifier>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.8.2.2</version>
    </dependency>
</dependencies>

KafkaProducerTest.java:

KafkaProducerTest.java:

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.junit.Test;
import kafka.admin.TopicCommand;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.TestZKUtils;
import kafka.utils.Time;
import kafka.utils.ZKStringSerializer$;
import kafka.zk.EmbeddedZookeeper;
import static org.junit.Assert.*;

/**
 * For online documentation
 * see
 * https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/utils/TestUtils.scala
 * https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/TopicCommand.scala
 * https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
 */
public class KafkaProducerTest {

    private int brokerId = 0;
    private String topic = "test";

    @Test
    public void producerTest() throws InterruptedException {

        // setup Zookeeper
        String zkConnect = TestZKUtils.zookeeperConnect();
        EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
        ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$);

        // setup Broker
        int port = TestUtils.choosePort();
        Properties props = TestUtils.createBrokerConfig(brokerId, port, true);

        KafkaConfig config = new KafkaConfig(props);
        Time mock = new MockTime();
        KafkaServer kafkaServer = TestUtils.createServer(config, mock);

        String [] arguments = new String[]{"--topic", topic, "--partitions", "1","--replication-factor", "1"};
        // create topic
        TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments));

        List<KafkaServer> servers = new ArrayList<KafkaServer>();
        servers.add(kafkaServer);
        TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);

        // setup producer
        Properties properties = TestUtils.getProducerConfig("localhost:" + port);
        ProducerConfig producerConfig = new ProducerConfig(properties);
        Producer producer = new Producer(producerConfig);

        // setup simple consumer
        Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1);
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));

        // send message
        KeyedMessage<Integer, byte[]> data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8));

        List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
        messages.add(data);

        producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
        producer.close();

        // deleting zookeeper information to make sure the consumer starts from the beginning
        // see https://stackoverflow.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka
        zkClient.delete("/consumers/group0");

        // starting consumer
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

        if(iterator.hasNext()) {
            String msg = new String(iterator.next().message(), StandardCharsets.UTF_8);
            System.out.println(msg);
            assertEquals("test-message", msg);
        } else {
            fail();
        }

        // cleanup
        consumer.shutdown();
        kafkaServer.shutdown();
        zkClient.close();
        zkServer.shutdown();
    }
}

一定要检查你的 mvn dependency:tree 是否有任何冲突的库.我不得不为 slf 和 log4j 添加排除项:

Be sure to check your mvn dependency:tree for any conflicting libraries. I had to add exclusions for slf and log4j:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.2</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.2</version>
    <classifier>test</classifier>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.2</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>

我正在研究的另一个选项是使用 apache curator:是吗是否可以在进程中启动一个 zookeeper 服务器实例,比如说用于单元测试?

Another option I'm looking into is using apache curator:Is it possible to start a zookeeper server instance in process, say for unit tests?

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>2.2.0-incubating</version>
    <scope>test</scope>
</dependency>

TestingServer zkTestServer;

TestingServer zkTestServer;

@Before
public void startZookeeper() throws Exception {
    zkTestServer = new TestingServer(2181);
    cli = CuratorFrameworkFactory.newClient(zkTestServer.getConnectString(), new RetryOneTime(2000));
}

@After
public void stopZookeeper() throws IOException {
    cli.close();
    zkTestServer.stop();
}

这篇关于如何为 junit 测试实例化 Mock Kafka Topic?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-11 15:40