kafka0.9版本以后用java重新编写了producer,废除了原来scala编写的版本。

这里直接使用最新2.3版本,0.9以后的版本都适用。

注意引用的包为:org.apache.kafka.clients.producer

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDemo {

	public static void main(String[] args) {

		Properties properties = new Properties();
		properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
		properties.put("acks", "all");
		properties.put("retries", 0);
		properties.put("batch.size", 16384);
		properties.put("linger.ms", 1);
		properties.put("buffer.memory", 33554432);
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
		kafkaProducer.send(new ProducerRecord<>("topic", "value"));
		kafkaProducer.close();

	}

}

0.11.0以后增加了事务,事务producer的示例代码如下,需要适用于0.11.0以后的版本:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class TransactionsProducerDemo {

	public static void main(String[] args) {

		Properties props = new Properties();
		props.put("bootstrap.servers", "localhost:9092");
		props.put("transactional.id", "my-transactional-id");
		Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

		producer.initTransactions();

		try {
			producer.beginTransaction();
			for (int i = 0; i < 100; i++)
				producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
			producer.commitTransaction();
		} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
			// We can't recover from these exceptions, so our only option is to close the producer and exit.
			producer.close();
		} catch (KafkaException e) {
			// For all other exceptions, just abort the transaction and try again.
			producer.abortTransaction();
		}
		producer.close();

	}

}

更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算

Kafka 2.3 Producer (0.9以后版本适用)-LMLPHP

08-21 10:54