当我向消费者查询分配的主题分区列表时,结果中的所有分区的偏移量都为-1001。
如果我打印出接收到的消息的偏移量,则偏移量将设置为正确的值。

这是我用来消耗消息的代码:

static void print_partition_list(FILE* fp,
    const rd_kafka_topic_partition_list_t
    * partitions) {
    int i;
    for (i = 0; i < partitions->cnt; i++) {
        fprintf(fp, "%s %s [%d] offset %lld",
            i > 0 ? "," : "",
            partitions->elems[i].topic,
            partitions->elems[i].partition,
            partitions->elems[i].offset);
    }
    fprintf(fp, "\n");

}

static void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque) {
    fprintf(stderr, "%% Consumer group rebalanced: ");
    switch (err) {
    case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
        fprintf(stderr, "assigned:\n");
        print_partition_list(stderr, partitions);
        rd_kafka_assign(rk, partitions);
        break;
    case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
        fprintf(stderr, "revoked:\n");
        print_partition_list(stderr, partitions);
        rd_kafka_assign(rk, NULL);
        break;
    default:
        fprintf(stderr, "failed: %s\n", rd_kafka_err2str(err));
        rd_kafka_assign(rk, NULL);
        break;
    }
}

int main()
{

    rd_kafka_t* rk;
    rd_kafka_conf_t* conf;
    rd_kafka_resp_err_t err;

    char errstr[512];
    const char* brokers{ "localhost:9092" };
    const char* groupid{ "OffsetTest" };
    const char* topics[] = { "OffsetTesting" };

    rd_kafka_topic_partition_list_t* subscription;

    conf = rd_kafka_conf_new();

    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "group.id", groupid,
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "enable.auto.commit", "false",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "offset.store.method", "broker",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);

    rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
    if (!rk) {
        fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
        return 1;
    }


    conf = NULL;

    rd_kafka_poll_set_consumer(rk);

    subscription = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(subscription, topics[0], RD_KAFKA_PARTITION_UA);

    err = rd_kafka_subscribe(rk, subscription);
    if (err) {
        fprintf(stderr,
            "%% Failed to subscribe to %d topics: %s\n",
            subscription->cnt, rd_kafka_err2str(err));
        rd_kafka_topic_partition_list_destroy(subscription);
        rd_kafka_destroy(rk);
        return 1;
    }

    fprintf(stderr,
        "%% Subscribed to %d topic(s), "
        "waiting for rebalance and messages...\n",
        subscription->cnt);

    rd_kafka_topic_partition_list_destroy(subscription);

    int runningCounter = 0;

    while (runningCounter != 10) {
        rd_kafka_message_t* rkm;

        rkm = rd_kafka_consumer_poll(rk, 100);
        if (!rkm) {
            Sleep(2000);
            runningCounter++;
            continue;
        }
        if (rkm->err) {
            fprintf(stderr,
                "%% Consumer error: %s\n",
                rd_kafka_message_errstr(rkm));
            rd_kafka_message_destroy(rkm);
            continue;
        }

        rd_kafka_topic_partition_list_t* list;
        err = rd_kafka_assignment(rk, &list);

        if (err) {
            fprintf(stderr,
                "%% Failed to subscribe to %d topics: %s\n",
                subscription->cnt, rd_kafka_err2str(err));
            rd_kafka_topic_partition_list_destroy(subscription);
            return 1;
        }

        print_partition_list(stderr, list);

        rd_kafka_topic_partition_list_destroy(list);

        printf("Message on %s [%d] at offset %lld:\n",
            rd_kafka_topic_name(rkm->rkt), rkm->partition,
            rkm->offset);

        if (rkm->key)
            printf(" Key: %.*s\n",
            (int)rkm->key_len, (const char*)rkm->key);
        else if (rkm->key)
            printf(" Key: (%d bytes)\n", (int)rkm->key_len);

        if (rkm->payload)
            printf(" Value: %.*s\n",
            (int)rkm->len, (const char*)rkm->payload);
        else if (rkm->key)
            printf(" Value: (%d bytes)\n", (int)rkm->len);

        rd_kafka_commit_message(rk, rkm, 0);

        rd_kafka_message_destroy(rkm);

        runningCounter++;
    }

    fprintf(stderr, "%% Closing consumer\n");
    rd_kafka_consumer_close(rk);

    rd_kafka_destroy(rk);

    return 0;

}

我知道这里有类似问题的答案LibRdKafka: commited_offset always at -1001
但这无济于事。我将主题分区列表分配给rebalance_cb中的使用者。

更新:

这是例如2条消息的输出:
> %4|1580198390.566|CONFWARN|rdkafka#consumer-1| [thrd:app]: Configuration property offset.store.method is deprecated: Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker).

> % Subscribed to 1 topic(s), waiting for rebalance and messages...

> % Consumer group rebalanced: assigned:
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001,
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001,
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
>
> Message on NewTestingTopic [0] at offset 25:
> Key: 0
> Value: ExampleMessage 0
>
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001,
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
>
> Message on NewTestingTopic [3] at offset 41:
> Key: 1
> Value: ExampleMessage 1

最佳答案

我相信这可能是设计使然。
rd_kafka_assignment()方法返回通过rd_kafka_assign()提供的分配。当为消费者分配组中的分区时,分配只是分区列表,没有偏移量。

类似地,在Java库中, assignment() 返回Set<TopicPartition>,这里也没有偏移量。在librdkafka中,rd_kafka_assignment()给出了rd_kafka_topic_partition_list_t,类似于Set<TopicPartition>。主要区别在于,它重用了 rd_kafka_topic_partition_t 类型,它具有一些额外的字段,例如offset
rd_kafka_topic_partition_t类型在很多地方都使用过,它的所有字段在所有上下文中都没有意义。分配上下文就是这种情况,因此某些字段设置为“空白”值,这是用于偏移的-1001

如果要获取分配的当前偏移量,则需要使用 rd_kafka_position() 。同样,在Java中,您将使用 position()

10-08 03:54