本文介绍了如何使用NodeJS消费来自Kafka-Consumer的最新消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个NodeJS应用程序来将数据插入到MongoDB集合中。此数据库插入是通过使用Kafka完成的。Kafka-node是我用来调用Kafka的插件。

我可以创建主题并在生产者级别向消费者发送消息。消息和主题取自POST请求。
This is how I call the Kafka. Parameters are topic and message.

每次我调用此接口时,生产者都会创建一条新消息并将其发送给消费者。在每次调用中,以前的所有消息都将返回给消费者。

我已使用fromOffset: 'earliest'fromOffset: 'latest'选项来限制以前的消息,但不起作用。

有人能给我提个建议吗?Kafka-node版本

"kafka-node": "^5.0.0",

我使用的代码

var kafka = require('kafka-node');
const {MongoClient} = require('mongodb');
var url = 'mongodb://127.0.0.1:27017/';
const mongoClient = new MongoClient(url);
var Producer = kafka.Producer,
    client = new kafka.KafkaClient(),
    offset = new kafka.Offset(client),
    Consumer = kafka.Consumer,
    producer = new Producer(client);
producer.on('ready', function () {
        console.log('Producer is ready');
    });
producer.on('error', function (err) {
        console.log('Producer is in error state');
        console.log(err);
    })
const createProducer = async(req,res,next) => {
  var topic = req.body.topic;
  var sentMessage = JSON.stringify(req.body.messages);
  producer.send(payloads, async function( err, data) {
        })
  client = new kafka.KafkaClient(),
  consumer = new Consumer(client,
            [
                { topic: topic, partition: 0 }
            ],
            {
                autoCommit: false,
                fromOffset: 'earliest'
            }
        );
   consumer.on('message', async function (message) {            

      console.log("Message : "+JSON.stringify(message))
      try {            
        var currentdate = new Date(); 
        var datetime = "Last Sync: " + currentdate.getDate() + "/"
            + (currentdate.getMonth()+1)  + "/" 
            + currentdate.getFullYear() + " @ "  
            + currentdate.getHours() + ":"  
            + currentdate.getMinutes() + ":" 
            + currentdate.getSeconds();
        var abb = await createListing(mongoClient,
            {
              topic: topic,
              message: sentMessage,
              time: datetime
             }
             );
             
      } catch (e) {
         console.error(":"+e);
      }
       finally {
                
       }
    });
     await mongoClient.close();
        res.send({
            message: 'Successfully send data from producer',
            payloads: payloads
        })

    async function createListing(client, newListing){
        await mongoClient.connect();
        const result = await 
       
      client.db("sample_airbnb").collection("listingsAndReviews").insertOne(newListing);
        console.log(`New listing created with the following id: ${result.insertedId}`);
        return result.insertedId;
      }
}

The Messages consumed at the consumer are

谢谢,

推荐答案

我对代码进行了一些更改,现在可以从我的主题检索最新消息。

我已在offset.fetchLatestOffsets([topics],cb)中创建了消费者,并对消费者选项进行了一些更改。

var payloads = [
    { topic: topicName, messages: messageTotopic, partition: 0}
];
producer.send(payloads, async function(err, data) {

});
var client = new kafka.KafkaClient();
offset.fetchLatestOffsets([topic], async function (error, offsets) {
    if (error)
        console.log(error);
    offsetA = JSON.stringify(offsets[topic][0])
    console.log('offset Value:: '+offsetA);
    var consumer = new Consumer(
        client,
        [
            {
                topic: topic,
                partition: 0,
                offset: offsetA-1, // Offset value starts from 0
            }
        ], {
            autoCommit: false,
            fromOffset: true,
        }
    );
    consumer.on('message', async function (message) {
        console.log("Message from last offset:: " + JSON.stringify(message)); // will return the latest message.
        consumer.close();
    });
});

通过这种方式,我能够解决KafkaClient中与事件发射器相关的内存泄漏问题。

这篇关于如何使用NodeJS消费来自Kafka-Consumer的最新消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-03 11:19