Broker在启动的时候会注册定时任务,定时清理过期的数据,默认是每10s执行一次,分别清理CommitLog文件和ConsumeQueue文件:

public class DefaultMessageStore implements MessageStore {

    // CommitLog清理类
    private final CleanCommitLogService cleanCommitLogService;
    // ConsumeQueue清理类
    private final CleanConsumeQueueService cleanConsumeQueueService;

    public void start() throws Exception {
        // ...

        // 添加定时任务
        this.addScheduleTask();

        //...
    }

    private void addScheduleTask() {
        // ...
        // 注册定时定理任务,默认10s执行一次
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                // 清理数据
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

        // ...
    }

    private void cleanFilesPeriodically() {
        // 启动CommitLog清理
        this.cleanCommitLogService.run();
        // 启动ConsumeQueue清理
        this.cleanConsumeQueueService.run();
    }
}

CommitLog文件清理

CommitLog文件清理的逻辑主要在CleanCommitLogService中,它是DefaultMessageStore的内部类,调用了deleteExpiredFiles方法删除过期文件:

public class DefaultMessageStore implements MessageStore {
    class CleanCommitLogService {
        public void run() {
            try {
                // 删除过期文件
                this.deleteExpiredFiles();
                this.redeleteHangedFile();
            } catch (Throwable e) {
                DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    }
}

在执行清理任务之前,首先会获取一些配置参数,然后通过isTimeToDelete方法判断是否到了清理文件的时间,通过isSpaceToDelete方法计算磁盘是否还有足够的空间,处于以下三种情况之一,将会执行文件清理操作:

  1. 已经到了清理文件的时间,默认是4点;
  2. 磁盘使用已经超过了设定的阈值;
  3. 手动删除;
public class DefaultMessageStore implements MessageStore {
    class CleanCommitLogService {
        private void deleteExpiredFiles() {
            int deleteCount = 0;
            // 获取文件保留时间
            long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
            // 删除文件的间隔时间
            int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
            int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
            // 计算是否到了清理文件的时间,默认是4点
            boolean timeup = this.isTimeToDelete();
            // 计算磁盘是否有足够空间
            boolean spacefull = this.isSpaceToDelete();
            // 是否手动删除
            boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
            // 如果到了时间或者磁盘使用率超过阈值或者是手动删除
            if (timeup || spacefull || manualDelete) {

                if (manualDelete)
                    this.manualDeleteFileSeveralTimes--;
                // 是否立刻清理
                boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

                log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
                    fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce);
                // fileReservedTime默认是72小时,这里转换为毫秒数
                fileReservedTime *= 60 * 60 * 1000;
                // 删除超过72小时的文件
                deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                    destroyMapedFileIntervalForcibly, cleanAtOnce);
                if (deleteCount > 0) {
                } else if (spacefull) {
                    log.warn("disk space will be full soon, but delete file failed.");
                }
            }
        }
    }
}

清理时间判断

deleteWhen:清理时间点,默认是凌晨四点:

public class MessageStoreConfig {

     // 触发文件删除的时间,默认凌晨四点
    @ImportantField
    private String deleteWhen = "04";
}

isTimeToDelete中首先会获取配置的清理时间,默认是早上4点,然后判断当前时间是否已经到达了这个时间点:

        private boolean isTimeToDelete() {
            // 获取配置的清理时间,默认是早上4点
            String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
            // 如果到达了时间,返回true
            if (UtilAll.isItTimeToDo(when)) {
                DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
                return true;
            }

            return false;
        }

磁盘空间使用判断

RocketMQ设定了两个阈值,如果磁盘使用率超过了这些阈值,需要立刻进行清理:

  • diskSpaceWarningLevelRatio:磁盘使用率警戒阈值,默认0.90;
  • diskSpaceCleanForciblyRatio:强制进行清理的磁盘使用比例阈值,默认0.85;
    class CleanCommitLogService {
        // 磁盘使用率警戒阈值,默认0.9
        private final double diskSpaceWarningLevelRatio =
            Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
        // 强制进行清理的磁盘使用比例阈值,默认0.85
        private final double diskSpaceCleanForciblyRatio =
            Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
    }

计算磁盘使用率

  1. 获取Commitlog存储目录;
  2. 计算每个目录下磁盘分区的使用率;
  3. 记录这些目录中,磁盘使用率最小的那个值,记在minPhysicRatio中;

接下来对磁盘使用率最小的那个值minPhysicRatio进行判断:

  1. 如果使用率最小的那个分区都已经大于警告阈值(默认0.90)说明需要立刻清理,将立即执行清理状态cleanImmediately置为true;
  2. 如果使用率最小的那个分区大于强制进行清理的磁盘使用比例阈值(默认0.85),说明需要立刻清理,将立即执行清理状态cleanImmediately置为true;
  3. 其他情况,表示磁盘使用率正常;
        private boolean isSpaceToDelete() {
            // 获取磁盘最大使用比率
            double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
            cleanImmediately = false;

            {
                // 获取Commitlog存储目录
                String commitLogStorePath = DefaultMessageStore.this.getStorePathPhysic();
                String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
                Set<String> fullStorePath = new HashSet<>();
                double minPhysicRatio = 100;
                String minStorePath = null;
                for (String storePathPhysic : storePaths) {
                    // 计算磁盘分区使用率
                    double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
                    if (minPhysicRatio > physicRatio) {
                        minPhysicRatio =  physicRatio; // 更新最小使用率,主要是记录所有目录分区中使用率最小的值
                        minStorePath = storePathPhysic;
                    }
                    // 如果大于设定的最大磁盘使用比例
                    if (physicRatio > diskSpaceCleanForciblyRatio) {
                        // 将对应的目录加入到fullStorePath
                        fullStorePath.add(storePathPhysic);
                    }
                }
                DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
                if (minPhysicRatio > diskSpaceWarningLevelRatio) { // 如果最小的那个磁盘使用率大于警告线
                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
                    if (diskok) {
                        DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio +
                                ", so mark disk full, storePathPhysic=" + minStorePath);
                    }
                    // 立即执行清理置为true
                    cleanImmediately = true;
                } else if (minPhysicRatio > diskSpaceCleanForciblyRatio) {// 如果最小的那个磁盘使用率大于警告线
                    cleanImmediately = true;
                } else {
                    // 其他情况,表示磁盘使用率正常
                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
                    if (!diskok) {
                        DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio +
                                ", so mark disk ok, storePathPhysic=" + minStorePath);
                    }
                }

                // 路径为空或者文件不存在的时候minPhysicRatio值会变成-1,
                if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
                    DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "
                            + minPhysicRatio + ", storePathPhysic=" + minStorePath);
                    return true;
                }
            }

            // ...

            // 其他情况,表示磁盘使用率正常
            return false;
        }

磁盘分区使用率计算方式

  1. 获取磁盘总空间大小totalSpace;
  2. 计算已使用空间大小(usedSpace) = 磁盘总空间(totalSpace) - 剩余空间(剩余空间不一定全部是可用的);
  3. 计算可用的剩余空间usableSpace;
  4. 计算可用空间总大小(entireSpace) = 已使用空间大小(usedSpace) + 可用的剩余空间(usableSpace);
  5. 计算了一个roundNum;
  6. 使用usedSpace * 100 / entireSpace + roundNum计算磁盘使用率;
public class UtilAll {
    public static double getDiskPartitionSpaceUsedPercent(final String path) {
        // 如果路径为空,返回-1
        if (null == path || path.isEmpty()) {
            log.error("Error when measuring disk space usage, path is null or empty, path : {}", path);
            return -1;
        }

        try {
            File file = new File(path);
            // 如果文件不存在
            if (!file.exists()) {
                log.error("Error when measuring disk space usage, file doesn't exist on this path: {}", path);
                return -1;
            }
            // 获取总空间
            long totalSpace = file.getTotalSpace();
            if (totalSpace > 0) {
                // 已使用空间 = 磁盘总空间 - 剩余空间(剩余空间不一定全部是可用的)
                long usedSpace = totalSpace - file.getFreeSpace();
                // 可用剩余空间
                long usableSpace = file.getUsableSpace();
                // 可用空间总大小
                long entireSpace = usedSpace + usableSpace;
                long roundNum = 0;
                if (usedSpace * 100 % entireSpace != 0) {
                    roundNum = 1;
                }
                long result = usedSpace * 100 / entireSpace + roundNum;
                return result / 100.0;
            }
        } catch (Exception e) {
            log.error("Error when measuring disk space usage, got exception: :", e);
            return -1;
        }
        // 异常情况返回-1
        return -1;
    }
}

清理文件

先来看一些配置参数:

public class MessageStoreConfig {
    // 文件的保留时间,默认72小时
    @ImportantField
    private int fileReservedTime = 72;

    // CommitLog文件删除间隔时间,默认100ms,删除过期文件后会休眠一段时间(这个间隔时间),在进行下一个
    private int deleteCommitLogFilesInterval = 100;

    // 主要用于在调用shuntdown关闭文件后,如果到达这个间隔时间之后依旧有线程引用该文件时,会强制将文件的引用数置为负数,表示不再有线程引用
    private int destroyMapedFileIntervalForcibly = 1000 * 120;
}

回到删除方法,继续看到达清理条件的代码:

  1. 判断是否需要立刻清理,cleanImmediately为true并且开启了允许强制清理;
  2. fileReservedTime默认是72小时,将其转换为毫秒数;
  3. 调用CommitLog的deleteExpiredFile方法,将文件的过期时间、文件删除间隔、是否立刻清理等参数传入,开始删除过期文件;
public class DefaultMessageStore implements MessageStore {
    class CleanCommitLogService {
        private void deleteExpiredFiles() {
            // ...
            // 如果到了时间或者磁盘使用率超过阈值或者是手动删除
            if (timeup || spacefull || manualDelete) {
                // ...
                // 是否立刻清理
                boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

                log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
                    fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce);
                // fileReservedTime默认是72小时,这里转换为毫秒数
                fileReservedTime *= 60 * 60 * 1000;
                // 删除超过72小时的文件
                deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                    destroyMapedFileIntervalForcibly, cleanAtOnce);
                if (deleteCount > 0) {
                } else if (spacefull) {
                    log.warn("disk space will be full soon, but delete file failed.");
                }
            }
        }
    }
}

在CommitLog的deleteExpiredFile删除文件的方法中,又调用了MappedFileQueue的deleteExpiredFileByTime开始删除过期文件:

public class CommitLog {
   public int deleteExpiredFile(
        final long expiredTime,
        final int deleteFilesInterval,
        final long intervalForcibly,
        final boolean cleanImmediately
    ) {
        return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
    }
}

删除过期文件

MappedFileQueue的deleteExpiredFileByTime处理逻辑如下:

  1. 计算文件的存活时间:文件最后一次修改的时间 + 过期时间(默认72小时);
  2. 如果当前时间大于文件的存活时间,或者cleanImmediately为true,这两个条件都表示需要清理文件,则调用MappedFile的destroy方法删除文件;
  3. 删除当前文件后,如果设置了文件删除间隔,并且当前文件不是最后一个,则休眠一段时间(设置的删除间隔时间)后再删除下一个过期文件;
public class MappedFileQueue {
    public int deleteExpiredFileByTime(final long expiredTime,
        final int deleteFilesInterval,
        final long intervalForcibly,
        final boolean cleanImmediately) {
        Object[] mfs = this.copyMappedFiles(0);
        if (null == mfs)
            return 0;
        int mfsLength = mfs.length - 1;
        int deleteCount = 0;
        List<MappedFile> files = new ArrayList<MappedFile>();
        if (null != mfs) {
            for (int i = 0; i < mfsLength; i++) {
                MappedFile mappedFile = (MappedFile) mfs[i];
                // 文件最后一次修改的时间 + 过期时间
                long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
                // 如果当前时间大于文件的存活时间获取 cleanImmediately为true
                if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                    // 清理文件
                    if (mappedFile.destroy(intervalForcibly)) {
                        files.add(mappedFile);
                        deleteCount++;

                        //...
                        // 如果设置了文件删除间隔,并且不是最后一个文件,休眠一段时间后再删除下一个过期文件
                        if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                            try {
                                Thread.sleep(deleteFilesInterval);
                            } catch (InterruptedException e) {
                            }
                        }
                    } else {
                        break;
                    }
                } else {
                    //avoid deleting files in the middle
                    break;
                }
            }
        }
        deleteExpiredFile(files);
        return deleteCount;
    }
}

由于文件可能被其他线程占用,所以在删除文件前需要保证没有其他线程使用文件,可以看到调用了shutdown方法关闭文件,然后调用isCleanupOver方法判断是否没有线程占用文件并且已经关闭文件相关的资源,只有这两个条件满足时才可以删除文件:

public class MappedFile extends ReferenceResource {
    public boolean destroy(final long intervalForcibly) {
        // 需要先关闭文件
        this.shutdown(intervalForcibly);
        // 是否关闭
        if (this.isCleanupOver()) {
            try {
                // 关闭filechannel
                this.fileChannel.close();
                log.info("close file channel " + this.fileName + " OK");
                long beginTime = System.currentTimeMillis();
                // 删除文件
                boolean result = this.file.delete();
                log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
                    + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
                    + this.getFlushedPosition() + ", "
                    + UtilAll.computeElapsedTimeMilliseconds(beginTime));
            } catch (Exception e) {
                log.warn("close file channel " + this.fileName + " Failed. ", e);
            }

            return true;
        } else {
            log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
                + " Failed. cleanupOver: " + this.cleanupOver);
        }

        return false;
    }
}

shutdown关闭文件

ReferenceResource中有两个成员变量需要关注:

  • available:表示文件是否可用,默认为true,表示文件可用,当调用shutdown方法关闭文件的时候,会置为false;
  • refCount:当前文件的引用数量,初始化为1,表示有一个线程在使用文件,对文件进行操作前,会先申请占用(hold),申请占用成功时refCount会增1表示新增一个引用,使用完毕之后会释放(release方法),将refCount的值再减1,通过refCount的值可以判断是否有其他线程在使用文件;
  • cleanupOver:文件的清理状态,如果文件已经被关闭、没有线程引用并且文件占用的相关资源已经释放,此时会被置为true,否则为false,表示文件的相关资源还未清理完毕;
public abstract class ReferenceResource {
    protected final AtomicLong refCount = new AtomicLong(1);
    protected volatile boolean available = true;
    protected volatile boolean cleanupOver = false;
}

shutdown方法的处理逻辑如下:

  1. 首先判断文件是否可用状态(available)是否为true,如果为true,将available为置为false,表示该文件不再提供使用,并记录本次关闭文件的时间戳,然后调用release方法关闭文件相关的资源;
  2. 如果available为false但是该文件的引用数量大于0,表示文件已经不再提供使用但是还有其他线程在引用,此时判断当前时间距离上次关闭文件的时间是否大于强制清理间隔,如果大于等于表示到了强制清理的时间,强制将文件的引用数量置为负数,再调用release方法关闭文件相关的资源;
public abstract class ReferenceResource {
   public void shutdown(final long intervalForcibly) {
        // 文件是否可用状态为true
        if (this.available) {
            // 置为false表示不再提供使用
            this.available = false;
            // 记录时间
            this.firstShutdownTimestamp = System.currentTimeMillis();
            // 开始释放资源
            this.release();
        } else if (this.getRefCount() > 0) { // 如果文件的引用数量大于0,表示文件被占用
            // 计算当前时间 - 上次关闭文件的时间,是否大于强制清理间隔
            if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
                // 引用数量设置为负值
                this.refCount.set(-1000 - this.getRefCount());
                // 释放资源
                this.release();
            }
        }
    }
}

以commit方法为例,在使用文件进行操作的时候,通常会先调用hold方法申请占用,使用完毕之后再调用release方法释放占用:

public class MappedFile extends ReferenceResource {
    public int commit(final int commitLeastPages) {
        // ...
        if (this.isAbleToCommit(commitLeastPages)) {
            // 申请文件的占用
            if (this.hold()) {
                commit0();
                // 释放文件的占用
                this.release();
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            }
        }
    }
}

在hold方法中可以看到,先判断当前文件是否可用(通过available的值判断),如果可用再进行如下操作:

  1. 调用getAndIncrement先让refCount自增,如果refCount大于0表示文件占用成功;
  2. 如果上一步refCount自增之后小于等于0,表示未成功,由于上面进行了一次自增操作,所以这里要再减回去;
public abstract class ReferenceResource {
    public synchronized boolean hold() {
        // 判断文件是否可用
        if (this.isAvailable()) {
            // 调用getAndIncrement先让refCount自增,如果refCount大于0表示获取成功
            if (this.refCount.getAndIncrement() > 0) {
                return true;
            } else {
                // 如果上一步refCount自增之后小于等于0,表示未获取成功,由于上面进行了一次自增操作,所以这里要再减回去
                this.refCount.getAndDecrement();
            }
        }

        return false;
    }
}

在release方法中,首先可以看到对refCount进行了自减操作,释放文件的占用数,然后进行如下判断:

  1. 如果此时refCount的值依旧大于0,表示有其他线程还在引用,直接返回即可;
  2. 如果此时refCount的值小于等于0,表示没有其他线程使用,加锁调用cleanup方法开始清理文件的使用资源,然后返回清理状态,这一步主要是在文件没有其他线程使用的时候,关闭文件占用相关资源;
public abstract class ReferenceResource {

    public void release() {
        // 自减操作,释放文件的占用数
        long value = this.refCount.decrementAndGet();
        // 如果文件的引用数量大于0,表示文件被占用
        if (value > 0)
            return;
        // 加锁
        synchronized (this) {
            // 清理文件的使用资源,返回清理状态,true表示清理完毕,false表示未完成
            this.cleanupOver = this.cleanup(value);
        }
    }
}

cleanup关闭文件占用资源

cleanup的处理逻辑如下:

  1. 如果文件的可用状态vailable还为true,表示文件未关闭,不能清理,返回false;
  2. 如果isCleanupOver已经为true,表示已经清理完毕,返回true;
  3. 开始清理文件占用的资源,MappedByteBuffer涉及到堆外内存,所以关闭文件前需要清理相关内存,防止内存泄露;
  4. 清理完毕,返回true赋值给cleanupOver,表示文件相关占用资源都已清理完毕;
public class MappedFile extends ReferenceResource {
    @Override
    public boolean cleanup(final long currentRef) {
        // 如果文件的可用状态还为true,表示文件未关闭,不能清理,返回false
        if (this.isAvailable()) {
            log.error("this file[REF:" + currentRef + "] " + this.fileName
                + " have not shutdown, stop unmapping.");
            return false;
        }
        // 如果isCleanupOver已经为true,表示已经清理完毕,返回true
        if (this.isCleanupOver()) {
            log.error("this file[REF:" + currentRef + "] " + this.fileName
                + " have cleanup, do not do it again.");
            return true;
        }
        // 开始清理文件的使用资源
        clean(this.mappedByteBuffer);
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
        TOTAL_MAPPED_FILES.decrementAndGet();
        log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
        // 返回true,之后cleanupOver为true,表示已经清理了文件的相关占用资源
        return true;
    }
}

总结
RocketMQ会注册定时任务,定时执行清理任务,删除过期文件(默认72小时),在清理任务执行时,会先判断是否达到了清理的条件,主要有以下三个条件:

  1. 已经到了清理文件的时间,默认是4点;
  2. 磁盘使用已经超过了设定的阈值;
  3. 手动删除;

如果满足以上条件之一,开始执行清理。在删除过期文件之前,需要保证文件已经关闭以及没有其他线程在引用该文件,所以会调用关闭文件的方法修改文件可用状态,将其改为不可用并清理相关资源,接着会调用isCleanupOver方法判断该文件是否没有线程引用并且文件占用的相关资源已经释放(MappedByteBuffer涉及到堆外内存,关闭文件前需要清理相关内存,防止内存泄露),之后才可以对文件进行删除。

10-23 09:06