问题描述
我正在为 kafka 运行一个简单的使用者,例如:
I am running a simple consumer for kafka such as this:
int timeout = 80000;
int bufferSize = 64*1024;
consumer = new SimpleConsumer(host, port,timeout, bufferSize, clientName);
这可以正常运行几个小时,但我遇到了异常稍后的kafka.consumer.SimpleConsumer:由于套接字错误而重新连接:
This runs fine for a couple of hours but I get an exceptionlater onkafka.consumer.SimpleConsumer: Reconnect due to socket error:
java.nio.channels.ClosedChannelException
消费者停止...以前有人遇到过这个问题吗?
and consumer stops ... has anyone faced this problem before ?
推荐答案
一个稍微不同的问题,但可能具有相同的根本原因和解决方案是 在这里回答,相关部分:
A slightly different question, but perhaps with the same root cause and solution was answered here, the relevant part:
您已关闭频道,但仍在尝试使用它.
您的代码存在多个问题.
There are several issues with your code.
首先,您对 EOS 的测试有问题.删除 limit() == 0 测试.那不表示 EOS,它只是表示零长度读取,这可以随时以非阻塞模式发生.这并不意味着对等方有关闭了他的连接端,这并不意味着你应该关闭你的结局.
First, your test for EOS is faulty. Remove the limit() == 0 test. That doesn't indicate EOS, it just indicates a zero length read, which can happen in non-blocking mode at any time. It doesn't mean the peer has closed his end of the connection, and it doesn't mean you should close your end.
其次,关闭通道也会关闭套接字.你应该关闭仅通道,而不是套接字.
Second, closing a channel closes the socket as well. You should close the channel only, not the socket.
第三,关闭频道会取消密钥.你不需要跟随每次关闭都会取消.
Third, closing a channel cancels the key. You don't need to follow every close with a cancel.
您也可能没有检查就绪密钥在在使用之前选择循环,例如供阅读.
You may also have failed to check whether a ready key is valid in the select loop before using it, e.g. for reading.
这篇关于kafka.consumer.SimpleConsumer:由于套接字错误而重新连接:java.nio.channels.ClosedChannelException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!