在RocketMQ中,生产者使用Message对象表示一条消息,本文对Message的属性和构造方法进行详细的讲解。

1 Message属性

Message的定义如下所示:

org.apache.rocketmq.common.message.Message

  1. public class Message implements Serializable {
  2.     private String topic;
  3.     private int flag;
  4.     private Map<String, String> properties;
  5.     private byte[] body;
  6.     private String transactionId;

其中:

    topic:表示消息要到的发送主题,必填

    flag:选填,消息的标记,完全由应用设置,RocketMQ不做任何处理,类似于memcached中flag的作用。

    properties:消息属性,主要存储一些消息的元数据信息

    body:消息的内容,这是一个字节数组,序列化方式由应用决定,例如你可以将一个json转为字节数组,也可以通过protol buffer、hessian编码转为字节数组。

    transactionId:事务id,仅在事务消息中使用到 

Message数据结构中各个字段都可以通过get、set方式访问,例如访问topic

  1. msg.setTopic("TopicTest”)
  2. msg.getTopic()

2 构造方法

Message提供了以下构造方法:

  1. public Message()
  2. public Message(String topic, byte[] body)
  3. public Message(String topic, String tags, byte[] body)
  4. public Message(String topic, String tags, String keys, byte[] body)
  5. public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK)

上述构造方法中的参数,会被设置到Message的成员变量中。可以看到部分参数在Message成员变量中并没有定义,这些参数都可以可选的,会被设置到properties中:

tags:表示消息的标签,消费者在消费时,可以根据标签进行过滤,需要注意的是,一个生产者,只能指定一个tag

keys:用于建立索引,之后可以通过命令工具/API/或者管理平台查询key,可以为一个消息设置多个key,用空格""进行分割

waitStoreMsgOK:表示发送消息后,是否需要等待消息同步刷新到磁盘上。如果broker配置为ASYNC_MASTER,那么只需要消息在master上刷新到磁盘即可;如果配置为SYNC_MASTER,那么还需要等待slave也刷新到磁盘。需要注意的是,waitStoreMsgOK默认为false,只有将设置为true的情况下,才会等待刷盘成功再返回。

除了通过构造方法给Message成员变量赋值,也可以通过相关set方法进行赋值,通过对应的get方法取值。除了可以设置上述构造方法中出现的参数,还可以设置:

  1. public void setBuyerId(String buyerId)
  2. public void setDelayTimeLevel(int level)
  3. public void putUserProperty(final String name, final String value)

其中:

BuyerId:

DelayTimeLevel:设置消息的延迟级别,0表示不延迟,大于0会延迟特定时间才被消费

putUserProperty:自定义消息属性。前面提到tags、keys、waitStoreMsgOK等,都会设置到properties中。如果开发者有自定义消息属性的需求,可以通过此方法进行设置。 

3 消息属性保留关键字

properties字段有一些保留的关键字,这些保留关键字定义在MessageConst类中,用户在自定义消息属性时,需要避开这些关键字。注意,rocketmq还提供了putProperty和setProperties两个方法,但是这里不建议用户使用,因为可能会与rocketmq保留的属性名冲突。 

在MessageConst类中,定义了这些属性的key,例如生产者可能会使用到:

org.apache.rocketmq.common.message.MessageConst

  1. public class MessageConst {
  2.     //消息的key
  3.     public static final String PROPERTY_KEYS = "KEYS";
  4.     //消息的tag
  5.     public static final String PROPERTY_TAGS = "TAGS";
  6.     //是否waitStoreMsgOK
  7.     public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
  8.     //消息延迟级别
  9.     public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
  10.     //BUYER_ID
  11.     public static final String PROPERTY_BUYER_ID = "BUYER_ID";
  12.     ...

免费学习视频欢迎关注云图智联:https://e.yuntuzhilian.com/

06-15 11:54