【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下)-LMLPHP

个人简介: 

前言:

目录

一:未来数据定时刷新

1.redis key值匹配

方案一:keys模糊匹配

方案二:scan

2.redis管道

3.定时刷新功能实现 

二:分布式锁解决集群下的方法抢占执行

1.问题描述

2.分布式锁

3.redis分布式锁

 4.实现

(1)方法添加

(2) 代码修改

5.数据库同步

三:延迟队列实现定时发布

1.提供对外接口

2.具体实现

(1)前期准备

(2)添加任务到延迟队列

(3)修改发布文章代码

(4)消费任务进行文章审核


一:未来数据定时刷新

1.redis key值匹配

        将未来5分钟之内要发布的文章加入到redis之后,我们需要定时对这部分数据(也就是zset中的数据)进行扫描,以便将zset中时间到了文章存入list中准备发布,但是这时候扫描zset中的数据有两种选择,见下面分析:

方案一:keys模糊匹配

keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞

【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下)-LMLPHP

方案二:scan

SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。  

【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下)-LMLPHP

2.redis管道

普通redis客户端和服务器交互模式

【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下)-LMLPHP

Pipeline请求模型

【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下)-LMLPHP

        两者的区别从图中就可以看出来,第一种方式对每一个命令都需要向服务端发送一次请求,假如命令过多会不断创建连接,降低执行效率;而第二种方式则是将一批命令积攒到一起再开启通道一次性执行,大大减少了连接数。  

3.定时刷新功能实现 

在taskserviceImpl中添加如下方法,并且引导类中开启任务调度注解@EnableScheduling

@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {

    // 获取所有未来数据集合的key值
    Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
    for (String futureKey : futureKeys) { // future_250_250

        String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
        //获取该组key下当前需要消费的任务数据
        Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
        if (!tasks.isEmpty()) {
            //将这些任务数据添加到消费者队列中
            cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
            log.info("成功将{}下的当前需要执行的任务添加到{}",futureKey,topicKey);
        }
    }
}

二:分布式锁解决集群下的方法抢占执行

1.问题描述

        假如启动两台tbug-headlines-schedule服务,这时候两者都会去执行refresh方法,但是我们只需要其中一台去执行扫描任务即可,这时候就需要加入分布式锁来进行控制。

2.分布式锁

分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。

解决方案:

【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下)-LMLPHP

3.redis分布式锁

setnx (Set if Not Exists) 命令在指定的 key 不存在时,为 key 设置指定的值。

【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下)-LMLPHP

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功

  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败

  • 客户端A执行代码完成,删除锁

  • 客户端B在等待一段时间后再去请求设置key的值,设置成功

  • 客户端B执行代码完成,删除锁

 4.实现

(1)方法添加

在工具类CacheService中添加如下方法:

/**
 * 加锁
 *
 * @param name
 * @param expire
 * @return
 */
public String tryLock(String name, long expire) {
    name = name + "_lock";
    String token = UUID.randomUUID().toString();
    RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
    RedisConnection conn = factory.getConnection();
    try {
        //参考redis命令:
        //set key value [EX seconds] [PX milliseconds] [NX|XX]
        Boolean result = conn.set(
                name.getBytes(),
                token.getBytes(),
                Expiration.from(expire, TimeUnit.MILLISECONDS),
                RedisStringCommands.SetOption.SET_IF_ABSENT //NX
        );
        if (result != null && result)
            return token;
    } finally {
        RedisConnectionUtils.releaseConnection(conn, factory,false);
    }
    return null;
}

参数name表示锁的名称,expire表示锁的过期时间,最重要的是set方法中最后一个参数RedisStringCommands.SetOption.SET_IF_ABSENT,这表示当有一个请求过来之后就会设置key值进行加锁,这样再有请求过来就获取不到了。

(2) 代码修改

/**
 * 定时器任务,每分钟扫描redis一次
 */
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
    String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);//加锁,30秒过期
    if(StringUtils.isNotBlank(token)) {
        log.info("开始执行定时扫描redis任务...");

        //获取所有未来数据集合的key值
        Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
        for (String futureKey : futureKeys) {
            //截取后半部分 future_200_100 --> _200_100
            String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
            //获取该组key下当前需要清理的任务数据
            Set<String> tasks = cacheService.zRangeByScore(futureKey,0,System.currentTimeMillis());
            if(!tasks.isEmpty()) {
                //将这些数据添加到消费队列中
                cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
                log.info("成功将{}下的当前需要执行的任务添加到{}",futureKey,topicKey);
            }
        }
    }
}

5.数据库同步

在完成上述操作之后,我们需要知道的是redis中只是存放现在就需要发布和5分钟之内需要发布的文章,而那些超过5分钟之后才需要发布的文章(比如一天之后发布)我们是不将其存入redis中的,它们只是存放在数据库中,这时候就需要定时去扫描数据库查看哪些文章需要被放入redis进行处理,流程图还可以参考上一篇文章的:

【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下)-LMLPHP

/**
 * 数据库同步任务,每五分钟执行一次
 */
@PostConstruct  //表示服务一启动便执行一次
@Scheduled(cron = "0 */5 * * * ?")
public void reloadData() {
    log.info("开始同步数据库任务到redis...");
    //1.清理缓存任务,避免重复
    clearCache();

    //2.获取5分钟之后的时间
    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.MINUTE,5);

    //3.查看未来所有小于5分钟的任务
    List<Taskinfo> tasks = taskInfoMapper.selectList
            (Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));
    if(tasks != null && tasks.size() > 0) {
        for (Taskinfo taskinfo : tasks) {
            Task task = new Task();
            BeanUtils.copyProperties(taskinfo,task);
            task.setExecuteTime(taskinfo.getExecuteTime().getTime());
            addTaskToCache(task);
        }
    }
}

/**
 * 清理缓存任务
 */
private void clearCache() {
    log.info("开始清理缓存任务...");
    //获取任务集
    Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
    Set<String> topicKeys = cacheService.scan(ScheduleConstants.TOPIC + "*");
    cacheService.delete(futureKeys);
    cacheService.delete(topicKeys);
}

三:延迟队列实现定时发布

1.提供对外接口

提供远程的feign接口,在tbug-headlines-feign-api编写类如下:  

package com.my.apis.schedule;

import com.my.model.common.dtos.ResponseResult;
import com.my.model.schedule.dtos.Task;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

@FeignClient(value = "headlines-schedule")
public interface IScheduleClient {
    /**
     * 添加任务
     * @param task   任务对象
     * @return       任务id
     */
    @PostMapping("/api/v1/task/add")
    ResponseResult addTask(@RequestBody Task task);

    /**
     * 取消任务
     * @param taskId        任务id
     * @return              取消结果
     */
    @GetMapping("/api/v1/task/cancel/{taskId}")
    ResponseResult cancelTask(@PathVariable("taskId") long taskId);

    /**
     * 按照类型和优先级来拉取任务
     * @param type
     * @param priority
     * @return
     */
    @GetMapping("/api/v1/task/poll/{type}/{priority}")
    ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority")  int priority);
}

在tbug-headlines-schedule微服务下提供对应的实现

package com.my.schedule.feign;

import com.my.apis.schedule.IScheduleClient;
import com.my.model.common.dtos.ResponseResult;
import com.my.model.schedule.dtos.Task;
import com.my.schedule.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
public class ScheduleClient implements IScheduleClient {
    @Autowired
    private TaskService taskService;

    /**
     * 添加任务
     * @param task   任务对象
     * @return       任务id
     */
    @Override
    @PostMapping("/api/v1/task/add")
    public ResponseResult addTask(@RequestBody Task task) {
        return ResponseResult.okResult(taskService.addTask(task));
    }

    /**
     * 取消任务
     * @param taskId        任务id
     * @return              取消结果
     */
    @Override
    @GetMapping("/api/v1/task/cancel/{taskId}")
    public ResponseResult cancelTask(@PathVariable long taskId) {
        return ResponseResult.okResult(taskService.cancelTask(taskId));
    }


    /**
     * 按照类型和优先级来拉取任务
     * @param type
     * @param priority
     * @return
     */
    @Override
    @GetMapping("/api/v1/task/poll/{type}/{priority}")
    public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority) {
        return ResponseResult.okResult(taskService.poll(type,priority));
    }
}

2.具体实现

(1)前期准备

①枚举类

package com.my.model.common.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public enum TaskTypeEnum {

    NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
    REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
    private final int taskType; //对应具体业务
    private final int priority; //业务不同级别
    private final String desc; //描述信息
}

②序列化工具

        在添加任务到延迟队列的方法中,我们需要用到序列化工具进行序列化操作,而在任务消费时候又需要进行反序列化操作。java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化,但是这里我选用的是效率更高的Protostuff。Protostuff是google开源的,其采用更为紧凑的二进制数组,表现更加优异。

将ProtostuffUtil拷贝到tbug-headlines-utils中,然后导入如下依赖

<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.6.0</version>
</dependency>

<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.6.0</version>
</dependency>

(2)添加任务到延迟队列

创建WmNewsTaskService

package com.my.wemedia.service;

import java.util.Date;

public interface WmNewsTaskService {
    /**
     * 添加任务到延迟队列中
     * @param id 文章id
     * @param publishTime  文章发布时间
     */
    void addNewsToTask(Integer id, Date publishTime);

    /**
     * 消费延迟队列数据
     */
    void scanNewsByTask();
}

实现类 

package com.my.wemedia.service.impl;

import com.alibaba.fastjson.JSON;
import com.my.apis.schedule.IScheduleClient;
import com.my.common.constans.ScheduleConstants;
import com.my.common.redis.CacheService;
import com.my.model.common.dtos.ResponseResult;
import com.my.model.common.enums.TaskTypeEnum;
import com.my.model.schedule.dtos.Task;
import com.my.model.wemedia.pojos.WmNews;
import com.my.utils.common.ProtostuffUtil;
import com.my.wemedia.service.WmAutoScanService;
import com.my.wemedia.service.WmNewsTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.Calendar;
import java.util.Date;

@Slf4j
@Service
public class WmNewsTaskServiceImpl implements WmNewsTaskService {
    @Autowired
    private IScheduleClient scheduleClient;
    /**
     * 添加任务到延迟队列
     * @param id 文章id
     * @param publishTime  文章发布时间
     */
    @Override
    @Async
    public void addNewsToTask(Integer id, Date publishTime) {
        log.info("添加任务到延迟服务中---begin");

        Task task = new Task();
        task.setExecuteTime(publishTime.getTime());
        task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
        task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
        WmNews wmNews = new WmNews();
        wmNews.setId(id);
        task.setParameters(ProtostuffUtil.serialize(wmNews));

        scheduleClient.addTask(task);

        log.info("添加任务到延迟服务中---end");
    }
}

(3)修改发布文章代码

将之前的异步调用审核文章修改为将文章数据加入延迟队列

    @Autowired
    private WmAutoScanService wmAutoScanService;
    @Autowired
    private WmNewsTaskService wmNewsTaskService;
    /**
     * 提交文章
     * @param dto
     * @return
     */
    @Override
    public ResponseResult submitNews(WmNewsDto dto) throws Exception {
        //1.参数校验
        if(dto == null || dto.getContent().length() == 0) {
            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
        }

        //2.保存或修改文章
        //2.1属性拷贝
        WmNews wmNews = new WmNews();
        BeanUtils.copyProperties(dto,wmNews);

        //2.2设置封面图片
        if(dto.getImages() != null && dto.getImages().size() != 0) {
            String images = StringUtils.join(dto.getImages(), ",");
            wmNews.setImages(images);
        }

        //2.3封面类型为自动
        if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)) {
            wmNews.setType(null);
        }

        saveOrUpdateWmNews(wmNews);

        //3.判断是否为草稿
        if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())) {
            //直接保存结束
            return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
        }

        //4.不是草稿
        //4.1保存文章图片素材与文章关系
        //4.1.1提取图片素材列表
        List<String> imagesList = getImagesList(dto);
        //4.1.2保存
        saveRelatedImages(imagesList,wmNews.getId(),WemediaConstants.WM_CONTENT_REFERENCE);

        //4.2保存封面图片和文章关系
        saveRelatedCover(dto,imagesList,wmNews);

        //5.审核文章(异步调用)
        // wmAutoScanService.AutoScanTextAndImage(wmNews.getId());
        //5.将任务添加到延迟服务
        wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());

        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }

(4)消费任务进行文章审核

在WmNewsTaskServiceImpl中添加如下方法:

@Autowired
private WmNewsAutoScanServiceImpl wmNewsAutoScanService;

/**
     * 消费延迟队列数据
     */
@Scheduled(fixedRate = 1000)
@Override
@SneakyThrows
public void scanNewsByTask() {

    ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
    if(responseResult.getCode().equals(200) && responseResult.getData() != null){
        log.info("文章审核---消费任务执行---begin---");

        String json_str = JSON.toJSONString(responseResult.getData());
        Task task = JSON.parseObject(json_str, Task.class);
        byte[] parameters = task.getParameters();
        WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
        System.out.println(wmNews.getId()+"-----------");
        wmNewsAutoScanService.autoScanWmNews(wmNews.getId());

        log.info("文章审核---消费任务执行---end---");

    }

}

下篇预告:定时发布文章优化策略

08-28 08:43