本文主要研究一下rocketmq的TransientStorePool

TransientStorePool

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java

public class TransientStorePool {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private final int poolSize;
    private final int fileSize;
    private final Deque<ByteBuffer> availableBuffers;
    private final MessageStoreConfig storeConfig;

    public TransientStorePool(final MessageStoreConfig storeConfig) {
        this.storeConfig = storeConfig;
        this.poolSize = storeConfig.getTransientStorePoolSize();
        this.fileSize = storeConfig.getMappedFileSizeCommitLog();
        this.availableBuffers = new ConcurrentLinkedDeque<>();
    }

    /**
     * It's a heavy init method.
     */
    public void init() {
        for (int i = 0; i < poolSize; i++) {
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

            availableBuffers.offer(byteBuffer);
        }
    }

    public void destroy() {
        for (ByteBuffer byteBuffer : availableBuffers) {
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
        }
    }

    public void returnBuffer(ByteBuffer byteBuffer) {
        byteBuffer.position(0);
        byteBuffer.limit(fileSize);
        this.availableBuffers.offerFirst(byteBuffer);
    }

    public ByteBuffer borrowBuffer() {
        ByteBuffer buffer = availableBuffers.pollFirst();
        if (availableBuffers.size() < poolSize * 0.4) {
            log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
        }
        return buffer;
    }

    public int availableBufferNums() {
        if (storeConfig.isTransientStorePoolEnable()) {
            return availableBuffers.size();
        }
        return Integer.MAX_VALUE;
    }
}
  • TransientStorePool的构造器会根据MessageStoreConfig设置poolSize、fileSize属性;其init方法会创建poolSize个byteBuffer放入到availableBuffers中;其destroy方法会遍历availableBuffers,然后取出其address进行LibC.INSTANCE.munlock
  • borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法会执行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然后offerFirst方法放入availableBuffers
  • availableBufferNums方法在storeConfig.isTransientStorePoolEnable()为true的情况下会返回availableBuffers.size(),否则返回Integer.MAX_VALUE

isTransientStorePoolEnable

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

public class MessageStoreConfig {
    //The root directory in which the log data is kept
    @ImportantField
    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";

    //The directory in which the commitlog is kept
    @ImportantField
    private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
        + File.separator + "commitlog";

    //......

    @ImportantField
    private boolean transientStorePoolEnable = false;

    //......

    /**
     * Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is
     * ASYNC_FLUSH
     *
     * @return <tt>true</tt> or <tt>false</tt>
     */
    public boolean isTransientStorePoolEnable() {
        return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
            && BrokerRole.SLAVE != getBrokerRole();
    }

    public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {
        this.transientStorePoolEnable = transientStorePoolEnable;
    }

    //......
}
  • MessageStoreConfig定义了transientStorePoolEnable属性,默认为false;其isTransientStorePoolEnable方法在transientStorePoolEnable为true且flushDiskType为FlushDiskType.ASYNC_FLUSH且brokerRole不为BrokerRole.SLAVE的时候返回true

小结

  • TransientStorePool的构造器会根据MessageStoreConfig设置poolSize、fileSize属性;其init方法会创建poolSize个byteBuffer放入到availableBuffers中;其destroy方法会遍历availableBuffers,然后取出其address进行LibC.INSTANCE.munlock
  • borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法会执行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然后offerFirst方法放入availableBuffers
  • availableBufferNums方法在storeConfig.isTransientStorePoolEnable()为true的情况下会返回availableBuffers.size(),否则返回Integer.MAX_VALUE

doc

03-05 18:07