[从源码学设计]蚂蚁金服SOFARegistry之服务注册和操作日志

目录

0x00 摘要

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。

本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。

本文为第十四篇,介绍SOFARegistry服务上线和操作日志。上文是从Session Server角度,本文从 Data Server 角度介绍。

0x01 整体业务流程

我们首先回顾总体业务流程,这部分属于数据分片。

1.1 服务注册过程

回顾下“一次服务注册过程”的服务数据在内部流转过程。

  1. Client 调用 publisher.register 向 SessionServer 注册服务。
  2. SessionServer 收到服务数据 (PublisherRegister) 后,将其写入内存 (SessionServer 会存储 Client 的数据到内存,用于后续可以跟 DataServer 做定期检查),再根据 dataInfoId 的一致性 Hash 寻找对应的 DataServer,将 PublisherRegister 发给 DataServer。
  3. DataServer 接收到 PublisherRegister 数据,首先也是将数据写入内存 ,DataServer 会以 dataInfoId 的维度汇总所有 PublisherRegister。同时,DataServer 将该 dataInfoId 的变更事件通知给所有 SessionServer,变更事件的内容是 dataInfoId 和版本号信息 version。
  4. 同时,异步地,DataServer 以 dataInfoId 维度增量地同步数据给其他副本。因为 DataServer 在一致性 Hash 分片的基础上,对每个分片保存了多个副本(默认是3个副本)。
  5. SessionServer 接收到变更事件通知后,对比 SessionServer 内存中存储的 dataInfoId 的 version,若发现比 DataServer 发过来的小,则主动向 DataServer 获取 dataInfoId 的完整数据,即包含了所有该 dataInfoId 具体的 PublisherRegister 列表。
  6. 最后,SessionServer 将数据推送给相应的 Client,Client 就接收到这一次服务注册之后的最新的服务列表数据。

因为篇幅所限,上文讨论的是前两点,本文介绍第三,第四点

1.2 数据分片

当服务上线时,会计算新增服务的 dataInfoId Hash 值,从而对该服务进行分片,最后寻找最近的一个节点,存储到相应的节点上。

DataServer 服务在启动时添加了 publishDataProcessor 来处理相应的服务发布者数据发布请求,该 publishDataProcessor 就是 PublishDataHandler。当有新的服务发布者上线,DataServer 的 PublishDataHandler 将会被触发。

该 Handler 首先会判断当前节点的状态,若是非工作状态则返回请求失败。若是工作状态,则触发数据变化事件中心 DataChangeEventCenter 的 onChange 方法。

DataChangeEventQueue 中维护着一个 DataChangeEventQueue 队列数组,数组中的每个元素是一个事件队列。当上文中的 onChange 方法被触发时,会计算该变化服务的 dataInfoId 的 Hash 值,从而进一步确定出该服务注册数据所在的队列编号,进而把该变化的数据封装成一个数据变化对象,传入到队列中。

DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的时候被一个新的线程调用,该方法会源源不断地从队列中获取新增事件,并且进行分发。新增数据会由此添加进节点内,实现分片。

与此同时,DataChangeHandler 会把这个事件变更信息通过 ChangeNotifier 对外发布,通知其他节点进行数据同步

0x02 基础数据结构

这里需要首先讲解几个相关数据结构。

2.1 Publisher

Publisher是数据发布者信息

public class Publisher extends BaseInfo {
    private List<ServerDataBox> dataList;
    private PublishType         publishType      = PublishType.NORMAL;
}

2.2 Datum

从SOFARegistry本身出发而汇集的数据发布者信息,里面核心是 :

  • dataInfoId:服务唯一标识,由``<分组 group><租户 instanceId>构成,例如在 SOFARPC 的场景下,一个 dataInfoId 通常是 com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名称,00001 是租户 id。group 和 instance 主要是方便对服务数据做逻辑上的切分,使不同 group 和 instance 的服务数据在逻辑上完全独立。模型里有 group 和 instanceId 字段,但这里不额外列出来,读者只要理解 dataInfoId 的含义即可;
  • dataCenter:一个物理机房,包含多个逻辑单元(zone)。zone:是一种单元化架构下的概念,代表一个机房内的逻辑单元。在服务发现场景下,发布服务时需指定逻辑单元(zone),而订阅服务者可以订阅逻辑单元(zone)维度的服务数据,也可以订阅物理机房(datacenter)维度的服务数据,即订阅该 datacenter 下的所有 zone 的服务数据。;
  • pubMap:包括的Publisher;
  • version:对应的版本

具体代码如下:

public class Datum implements Serializable {
    private String                                dataInfoId;
    private String                                dataCenter;
    private String                                dataId;
    private String                                instanceId;
    private String                                group;
    private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>();
    private long                                  version;
    private boolean                               containsUnPub    = false;
}

2.3 DatumCache

DatumCache 是最新的Datum。

public class DatumCache {
    @Autowired
    private DatumStorage localDatumStorage;
}

具体存储是在LocalDatumStorage中完成。

public class LocalDatumStorage implements DatumStorage {
    /**
     * row:     dataCenter
     * column:  dataInfoId
     * value:   datum
     */
    protected final Map<String, Map<String, Datum>>     DATUM_MAP            = new ConcurrentHashMap<>();

    /**
     * all datum index
     *
     * row:     ip:port
     * column:  registerId
     * value:   publisher
     */
    protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>();

    @Autowired
    private DataServerConfig                            dataServerConfig;
}

2.4 Operator

Operator 是每一步Datum对应的操作

public class Operator {
    private Long               version;
    private Long               sourceVersion;
    private Datum              datum;
    private DataSourceTypeEnum sourceType;
}

2.5 Acceptor

记录了所有的Datum操作。其中:

  • logOperatorsOrder记录了操作的顺序;
  • logOperators是所有的操作;
public class Acceptor {
    private final String                    dataInfoId;
    private final String                    dataCenter;
    private int                             maxBufferSize;
    static final int                        DEFAULT_DURATION_SECS = 30;
    private final Deque<Long/*version*/>   logOperatorsOrder = new ConcurrentLinkedDeque<>();
    private Map<Long/*version*/, Operator> logOperators = new ConcurrentHashMap<>();
    private final DatumCache                datumCache;
}

2.6 总结

总结下这几个数据结构的联系:

  • Publisher是数据发布者信息
  • Datum是从SOFARegistry本身出发而汇集的数据发布者信息
  • DatumCache 是最新的Datum
  • Operator 是每一步Datum对应的操作
  • Acceptor记录了所有的Datum操作

0x03 Datum的来龙去脉

我们先回顾下 Datum 的来龙去脉。

3.1 Session Server 内部

首先,我们讲讲Session Server 内部如何获取Datum

在 Session Server 内部,Datum存储在 SessionCacheService 之中。

比如在 DataChangeFetchCloudTask 内部,可以这样获取 Datum。

private Map<String, Datum> getDatumsCache() {
    Map<String, Datum> map = new HashMap<>();
    NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META);
    Collection<String> dataCenters = nodeManager.getDataCenters();
    if (dataCenters != null) {
        Collection<Key> keys = dataCenters.stream().
                map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(),
                        new DatumKey(fetchDataInfoId, dataCenter))).
                collect(Collectors.toList());

        Map<Key, Value> values = null;
        values = sessionCacheService.getValues(keys);

        if (values != null) {
            values.forEach((key, value) -> {
                if (value != null && value.getPayload() != null) {
                    map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload());
                }
            });
        }
    }
    return map;
}

Session Server 会向 Data Server 发送 PublishDataRequest 请求

3.2 PublishDataHandler

在DataServer内部,PublishDataHandler 是用来处理 PublishDataRequest

public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> {
    @Autowired
    private ForwardService                 forwardService;

    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;

    @Autowired
    private DataChangeEventCenter          dataChangeEventCenter;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private DatumLeaseManager              datumLeaseManager;

    @Autowired
    private ThreadPoolExecutor             publishProcessorExecutor;

    @Override
    public Object doHandle(Channel channel, PublishDataRequest request) {
        Publisher publisher = Publisher.internPublisher(request.getPublisher());
        if (forwardService.needForward()) {
            CommonResponse response = new CommonResponse();
            response.setSuccess(false);
            response.setMessage("Request refused, Server status is not working");
            return response;
        }

        dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());

        if (publisher.getPublishType() != PublishType.TEMPORARY) {
            String connectId = WordCache.getInstance().getWordCache(
                publisher.getSourceAddress().getAddressString());
            sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(),
                connectId);
            // record the renew timestamp
            datumLeaseManager.renew(connectId);
        }

        return CommonResponse.buildSuccessResponse();
    }
}

3.3 DataChangeEventCenter

在 DataChangeEventCenter 的 onChange 函数中,会进行投放

public void onChange(Publisher publisher, String dataCenter) {
    int idx = hash(publisher.getDataInfoId());
    Datum datum = new Datum(publisher, dataCenter);
    if (publisher instanceof UnPublisher) {
        datum.setContainsUnPub(true);
    }
    if (publisher.getPublishType() != PublishType.TEMPORARY) {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB, datum));
    } else {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB_TEMP, datum));
    }
}

3.4 DataChangeEventQueue

在DataChangeEventQueue之中,会调用 handleDatum 来处理。在这里对Datum进行存储。

3.5 DataChangeHandler

在 DataChangeHandler 之中,会提取ChangeData,然后进行Notify。

public void start() {
    DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
    int queueCount = queues.length;
    Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
    Executor notifyExecutor = ExecutorFactory
            .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
    for (int idx = 0; idx < queueCount; idx++) {
        final DataChangeEventQueue dataChangeEventQueue = queues[idx];
        final String name = dataChangeEventQueue.getName();
        executor.execute(() -> {
            while (true) {
                    final ChangeData changeData = dataChangeEventQueue.take();
                    notifyExecutor.execute(new ChangeNotifier(changeData, name));
            }
        });
    }
}

具体如下:

                                           +
                           Session Server  |  Data Server
                                           |
                                           |
                                           |
                                           |
+--------------------------+  PublishDataRequest   +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+               |       +------+-------------+
            ^                              |              |
            | getValues                    |              |  onChange(Publisher)
            |                              |              v
            |                              |     +--------+--------------+
  +---------+----------+                   |     | DataChangeEventCenter |
  |sessionCacheService |                   |     +--------+--------------+
  +--------------------+                   |              |
                                           |              |  Datum
                                           |              |
                                           |              v
                                           |     +--------+-------------+
                                           |     | DataChangeEventQueue |
                                           |     +--------+-------------+
                                           |              |
                                           |              |
                                           |              | ChangeData
                                           |              v
                                           |      +-------+-----------+
                                           |      | DataChangeHandler |
                                           +      +-------------------+

0x04 DataChangeHandler处理

于是我们接着进行 DataChangeHandler 处理。即总述中提到的:DataChangeHandler 会把这个事件变更信息:

  1. 把这个事件变更信息变成Operator,放到AbstractAcceptorStore;
  2. 通过 ChangeNotifier 对外发布,通知其他节点进行数据同步;

下面我们从第一部分 :把这个事件变更信息变成Operator,放到AbstractAcceptorStore 出发,进行讲解日志操作。

即如图所示:

                                           +
                           Session Server  |  Data Server
                                           |
                                           |
                                           |
                                           +
+--------------------------+  PublishDataRequest   +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+               |       +------+-------------+
            ^                              |              |
            | getValues                    |              |  onChange(Publisher)
            |                              |              v
            |                              |     +--------+--------------+
  +---------+----------+                   |     | DataChangeEventCenter |
  |sessionCacheService |                   |     +--------+--------------+
  +--------------------+                   |              |
                                           |              |  Datum
                                           |              |
                                           |              v
                                           |     +--------+-------------+
                                           |     | DataChangeEventQueue |
                                           |     +--------+-------------+
                                           |              |
                                           |              |
                                           |              | ChangeData
                                           |              v
                                           |      +-------+-----------+
                                           |      | DataChangeHandler |
                                           |      +-------+-----------+
                                           |              |
                                           |              |
                                           |              v
                                           |      +-------+---------+
                                           |      |  ChangeNotifier |
                                           |      +-------+---------+
                                           |              |
                                           |              |
                                           |              v
                                           |   +----------+------------+
                                           |   | AbstractAcceptorStore |
                                           |   +-----------------------+
                                           +

Acceptor的appendOperator谁来调用?在Notifier 里面有,比如:

public class BackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum,
            DataSourceTypeEnum.BACKUP));
    }
}

以及另一个:

public class SnapshotBackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum,
            DataSourceTypeEnum.BACKUP));
    }
}

0x05 AbstractAcceptorStore存储

AbstractAcceptorStore是日志存储,我们下面详细分析。

5.1 Bean

对于操作信息,提供了一个Bean来存储。

@Bean
public AcceptorStore localAcceptorStore() {
    return new LocalAcceptorStore();
}

5.2 StoreServiceFactory

作用是在 storeServiceMap 中存放各种 AcceptorStore,目前只有LocalAcceptorStore 这一个。

public class StoreServiceFactory implements ApplicationContextAware {

    private static Map<String/*supportType*/, AcceptorStore> storeServiceMap = new HashMap<>();

    /**
     * get AcceptorStore by storeType
     * @param storeType
     * @return
     */
    public static AcceptorStore getStoreService(String storeType) {
        return storeServiceMap.get(storeType);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, AcceptorStore> map = applicationContext.getBeansOfType(AcceptorStore.class);

        map.forEach((key, value) -> storeServiceMap.put(value.getType(), value));
    }
}

5.3 AbstractAcceptorStore

AbstractAcceptorStore 是存储的基本实现类,几个基本成员是。

  • acceptors :是一个矩阵,按照dataCenter,dataInfoId维度来分类,存储了此维度下的Acceptor;就是说,针对每一个dataCenter,dataInfoId的组合,都有一个Acceptor,用来存储这下面的Operator。

  • notifyAcceptorsCache :是一个矩阵,按照dataCente,dataInfoId维度来分类,缓存了此维度下需要进一步处理的Acceptor;

  • delayQueue :配合notifyAcceptorsCache使用,针对notifyAcceptorsCache的每一个新acceptor,系统会添加一个消息进入queue,这个queue等延时到了,就会取出,并且从notifyAcceptorsCache取出对应的新acceptor进行相应处理;

按说应该是 cache 有东西,所以dequeue 时候就会取出来,但是如果这期间多放入了几个进入 Cache,原有cache 的 value 只是被替换而已,等时间到了,也会取出来

notifyAcceptorsCache 也是按照 data center 来控制的,只有定期 removeCache。

public abstract class AbstractAcceptorStore implements AcceptorStore {

    private static final int               DEFAULT_MAX_BUFFER_SIZE = 30;

    @Autowired
    protected IMetaServerService           metaServerService;

    @Autowired
    private Exchange                       boltExchange;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private DataServerConnectionFactory    dataServerConnectionFactory;

    @Autowired
    private DatumCache                     datumCache;

    private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> acceptors               = new ConcurrentHashMap<>();

    private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> notifyAcceptorsCache    = new ConcurrentHashMap<>();

    private DelayQueue<DelayItem<Acceptor>>     delayQueue
}

具体如下图:

+-----------------------------+                      +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
|[AbstractAcceptorStore]      |                      |
|                             |   +-> dataCenter +---+
|                             |   |                  |
|     acceptors  +--------------->+                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
|                             |   |
|     notifyAcceptorsCache    |   |                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
|           +                 |   +-> dataCenter +-->+
+-----------------------------+                      |
            |                                        +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
            |
            |
            |                                        +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
            |                     +-> dataCenter +-->+
            |                     |                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
            +-------------------->+
                                  |                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
                                  +-> dataCenter +---+
                                                     +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>

手机如图:

[从源码学设计]蚂蚁金服SOFARegistry 之 服务注册和操作日志-LMLPHP

有一点需要说明,就是delayQueue 为何要延迟队列。这是由于SOFA的“秒级服务上下线通知“特性造成的

因为要实现此特性,所以涉及到了一个连接敏感性问题,即在 SOFARegistry 里,所有 Client 都与 SessionServer 保持长连接,每条长连接都会有基于 bolt 的连接心跳,如果连接断开,Client 会马上重新建连,时刻保证 Client 与 SessionServer 之间有可靠的连接。

因为强烈的连接敏感性,所以导致如果只是网络问题导致连接断开,实际的进程并没有宕机,那么 Client 会马上重连 SessionServer 并重新注册所有服务数据。这种大量的短暂的服务下线后又重新上线会给用户带来困扰和麻烦

因此在 DataServer 内部实现了数据延迟合并的功能,就是这里的DelayQueue

5.4 加入

addOperator的基本逻辑是:

  • 从Operator的Datum中提取dataCenter和dataInfoId;
  • 从acceptors取出dataCenter对应的Map<dataInfoId, Acceptor> acceptorMap;
  • 从acceptorMap中提取dataInfoId对应的existAcceptor;
  • 如果新operator是SnapshotOperator类型,则清除之前的 opeator queue。
  • 否则加入新operator;
  • 使用putCache(existAcceptor);把目前的Acceptor加入Cache,定时任务会处理;

在操作中,都是使用putIfAbsent,这样短期内若有多个同样value插入,则不会替换原有的value,这样 起到了归并作用

@Override
public void addOperator(Operator operator) {

    Datum datum = operator.getDatum();
    String dataCenter = datum.getDataCenter();
    String dataInfoId = datum.getDataInfoId();
    try {
        Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter);
        if (acceptorMap == null) {
            Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
            acceptorMap = acceptors.putIfAbsent(dataCenter, newMap);
            if (acceptorMap == null) {
                acceptorMap = newMap;
            }
        }

        Acceptor existAcceptor = acceptorMap.get(dataInfoId);
        if (existAcceptor == null) {
            Acceptor newAcceptor = new Acceptor(DEFAULT_MAX_BUFFER_SIZE, dataInfoId,
                dataCenter, datumCache);
            existAcceptor = acceptorMap.putIfAbsent(dataInfoId, newAcceptor);
            if (existAcceptor == null) {
                existAcceptor = newAcceptor;
            }
        }

        if (operator instanceof SnapshotOperator) {
            //snapshot: clear the queue, Make other data retrieve the latest memory data
            existAcceptor.clearBefore();
        } else {
            existAcceptor.appendOperator(operator);
        }

        //put cache
        putCache(existAcceptor);
    }
}

putCache的作用是:

  • 从acceptor中提取dataCenter和dataInfoId;
  • 从notifyAcceptorsCache中取出dataCenter对应的Map<dataInfoId, Acceptor> acceptorMap;
  • 向acceptorMap中放入dataInfoId对应的acceptor;
  • 如果acceptorMap中之前没有对应的value,则把acceptor放入delayQueue;

这里也使用putIfAbsent,这样短期内若有多个同样value插入,则不会替换原有的value,这样 起到了归并作用

private void putCache(Acceptor acceptor) {

    String dataCenter = acceptor.getDataCenter();
    String dataInfoId = acceptor.getDataInfoId();

    try {
        Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
        if (acceptorMap == null) {
            Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
            acceptorMap = notifyAcceptorsCache.putIfAbsent(dataCenter, newMap);
            if (acceptorMap == null) {
                acceptorMap = newMap;
            }
        }
        Acceptor existAcceptor = acceptorMap.putIfAbsent(dataInfoId, acceptor);
        if (existAcceptor == null) {
            addQueue(acceptor);
        }
    }
}

5.5 使用

具体消费是在定期任务中完成。消费日志的目的就是同步日志操作给其他 DataServer。

5.5.1 Scheduler

Scheduler类是定期任务,会启动两个线程池定期调用AcceptorStore的函数

public class Scheduler {
    private final ScheduledExecutorService scheduler;

    public final ExecutorService           versionCheckExecutor;

    private final ThreadPoolExecutor       expireCheckExecutor;

    @Autowired
    private AcceptorStore                  localAcceptorStore;

   public Scheduler() {
        scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler"));

        expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
            new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));

        versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(), new NamedThreadFactory(
                "SyncDataScheduler-versionChangeCheck"));

    }

    public void startScheduler() {
        scheduler.schedule(
                new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3,
                        TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),
                30, TimeUnit.SECONDS);

        versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());

    }
}

AbstractAcceptorStore中函数如下:

5.5.2 changeDataCheck

changeDataCheck 内部是一个while true,所以不需要再使用线程池。

changeDataCheck绑定在delayQueue上,如果有新消息,则会取出Acceptor,也从notifyAcceptorsCache取出Acceptor,调用notifyChange(acceptor);进行处理 。

@Override
public void changeDataCheck() {

    while (true) {
        try {
            DelayItem<Acceptor> delayItem = delayQueue.take();
            Acceptor acceptor = delayItem.getItem();
            removeCache(acceptor); // compare and remove
        } catch (InterruptedException e) {
            break;
        } catch (Throwable e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

}

消费Cache用到的是removeCache。

private void removeCache(Acceptor acceptor) {
    String dataCenter = acceptor.getDataCenter();
    String dataInfoId = acceptor.getDataInfoId();
    try {
        Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
        if (acceptorMap != null) {
            boolean result = acceptorMap.remove(dataInfoId, acceptor);
            if (result) {
                //data change notify
                notifyChange(acceptor);
            }
        }
    }
}
5.5.2.1 通知NotifyDataSyncRequest

在removeCache中,也使用notifyChange进行了通知,逻辑如下:

  • 从acceptor中提取 DataInfoId;
  • 根据DataInfoId从meta service中获取dataServerNodes的ip;
  • 遍历ip,通过bolt server进行通知syncServer.sendSync,就是给ip对应的data center发送 NotifyDataSyncRequest;
private void notifyChange(Acceptor acceptor) {

    Long lastVersion = acceptor.getLastVersion();

    NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(),
        acceptor.getDataCenter(), lastVersion, getType());

    List<String> targetDataIps = getTargetDataIp(acceptor.getDataInfoId());
    for (String targetDataIp : targetDataIps) {

        if (DataServerConfig.IP.equals(targetDataIp)) {
            continue;
        }

        Server syncServer = boltExchange.getServer(dataServerConfig.getSyncDataPort());

        for (int tryCount = 0; tryCount < dataServerConfig.getDataSyncNotifyRetry(); tryCount++) {
            try {
                Connection connection = dataServerConnectionFactory.getConnection(targetDataIp);
                if (connection == null) {
                    TimeUtil.randomDelay(1000);
                    continue;
                }
                syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()),
                    request, 1000);
                break;
            }
        }
    }
}

这部分的调用逻辑为:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest

具体如下图:

+--------------------------+
|                          |     +----------------------------------------------------------------------+
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |
|                          |     |                                                                      |
+--------+-----------------+     |                                                                      |
         |                       |                                                                      |
         |                       |                                                                      |
         |                       |                                                                      |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors            |
         |   changeDataCheck     |                                                                      |
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                    +            +----------------------------------------------------------------------+
                    |
                    |
                    |
                    | NotifyDataSyncRequest
                    |
                    v
             +------+-----------+
             | Other DataServer |
             +------------------+

手机如下图:

[从源码学设计]蚂蚁金服SOFARegistry 之 服务注册和操作日志-LMLPHP

5.5.3 checkAcceptorsChangAndExpired

checkAcceptorsChangAndExpired作用是遍历acceptors每个acceptor,看看是否expired,进行处理。

@Override
public void checkAcceptorsChangAndExpired() {
    acceptors.forEach((dataCenter, acceptorMap) -> {
        if (acceptorMap != null && !acceptorMap.isEmpty()) {
            acceptorMap.forEach((dataInfoId, acceptor) -> acceptor.checkExpired(0));
        }
    });
}

此时,逻辑如下:

+--------------------------+                                                                                     +------------------------+
|                          |     +----------------------------------------------------------------------+        |                        |
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |        |   expireCheckExecutor  |
|                          |     |                                                                      |        |                        |
+--------+-----------------+     |                                                                      |        +--------------+---------+
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors  <---------------------------------+
         |   changeDataCheck     |                                                                      |     checkAcceptorsChangAndExpired
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                    +            +----------------------------------------------------------------------+
                    |
                    |
                    |
                    | NotifyDataSyncRequest
                    |
                    v
             +------+-----------+
             | Other DataServer |
             +------------------+

手机如下:

[从源码学设计]蚂蚁金服SOFARegistry 之 服务注册和操作日志-LMLPHP

0x06 Acceptor日志操作

这里记录了日志,即记录了所有的Datum操作。

操作日志存储采用Queue方式,获取日志时候通过当前版本号在堆栈内所在位置,把所有版本之后的操作日志同步过来执行。

public class Acceptor {
    private final String                    dataInfoId;
    private final String                    dataCenter;
    private int                             maxBufferSize;
    static final int                        DEFAULT_DURATION_SECS = 30;
    private final Deque<Long/*version*/>   logOperatorsOrder     = new ConcurrentLinkedDeque<>();
    private Map<Long/*version*/, Operator> logOperators          = new ConcurrentHashMap<>();
    private final DatumCache                datumCache;
}

关键变量是:

  • logOperators:按照版本号为key存储的map,用来存储所有的Operator;
  • logOperatorsOrder:因为map没有办法排序,所以设置此queue来存储版本号

Operator 就是每一步操作对应的Datum。

public class Operator {
    private Long               version;
    private Long               sourceVersion;
    private Datum              datum;
    private DataSourceTypeEnum sourceType;
}

6.1 appendOperator

此函数作用是:添加一个操作日志。

  • 如果queue已经满了,则取出第一个消息,为了向后段插入一个新的 。
  • 如果Operator版本号为空,则设置为0L;
  • 如果Operator的前一个版本号与queue尾部Operator版本号不一致,说明queue里面不对了,需要清空map和queue。
  • 向map中加入Operator;
  • 如果是新版本的Operator,则把版本加入queue;

具体代码如下:

/**
 * append operator to queue,if queue is full poll the first element and append.
 * Process will check version sequence,it must append with a consequent increase in version,
 * otherwise queue will be clean
 *
 * @param operator
 */
public void appendOperator(Operator operator) {
    write.lock();
    try {
        if (isFull()) {
            logOperators.remove(logOperatorsOrder.poll());
        }
        if (operator.getSourceVersion() == null) {
            operator.setSourceVersion(0L);
        }
        Long tailVersion = logOperatorsOrder.peekLast();
        if (tailVersion != null) {
            //operation add not by solid sequence
            if (tailVersion.longValue() != operator.getSourceVersion().longValue()) {
                clearBefore();
            }
        }

        Operator previousOperator = logOperators.put(operator.getVersion(), operator);
        if (previousOperator == null) {
            logOperatorsOrder.add(operator.getVersion());
        }
    } finally {
        write.unlock();
    }
}

appendOperator谁来调用?在Notifier 里面有,比如:

public class BackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum,
            DataSourceTypeEnum.BACKUP));
    }
}

以及

public class SnapshotBackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum,
            DataSourceTypeEnum.BACKUP));
    }
}

6.2 checkExpired

此方法作用是去除过期日志。version是时间戳,所以可以定期check,如果过期,就清除。

public void checkExpired(int durationSEC) {
    write.lock();
    try {
        //check all expired
        Long peekVersion = logOperatorsOrder.peek();
        if (peekVersion != null && isExpired(durationSEC, peekVersion)) {
            logOperators.remove(logOperatorsOrder.poll());
            checkExpired(durationSEC);
        }
    } finally {
        write.unlock();
    }
}

0x07 NotifyDataSyncRequest通知数据同步

此请求作用是通知接收端进行数据同步。

回忆下这部分的调用逻辑为:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest

7.1 NotifyDataSyncHandler

接收端data server通过NotifyDataSyncHandler处理

public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements
                                                                                       AfterWorkingProcess {

    @Autowired
    private DataServerConfig                                      dataServerConfig;

    @Autowired
    private GetSyncDataHandler                                    getSyncDataHandler;

    @Autowired
    private DataChangeEventCenter                                 dataChangeEventCenter;

    private Executor                                              executor    = ExecutorFactory
                                                                                  .newFixedThreadPool(
                                                                                      10,
                                                                                      NotifyDataSyncHandler.class
                                                                                          .getSimpleName());

    private ThreadPoolExecutor                                    notifyExecutor;

    @Autowired
    private DataNodeStatus                                        dataNodeStatus;

    @Autowired
    private DatumCache                                            datumCache;
}

7.1.1 doHandle

doHandle方法用来继续处理。

@Override
public Object doHandle(Channel channel, NotifyDataSyncRequest request) {
    final Connection connection = ((BoltChannel) channel).getConnection();
    if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
        noWorkQueue.add(new SyncDataRequestForWorking(connection, request));
        return CommonResponse.buildSuccessResponse();
    }
    executorRequest(connection, request);
    return CommonResponse.buildSuccessResponse();
}

7.1.2 executorRequest

因为接到了发起端DataServer的同步通知NotifyDataSyncRequest,所以接收端DataServer主动发起拉取,进行同步数据。即调用GetSyncDataHandler来发送SyncDataRequest

private void executorRequest(Connection connection, NotifyDataSyncRequest request) {
    executor.execute(() -> {
        fetchSyncData(connection, request);
    });
}

protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) {
    String dataInfoId = request.getDataInfoId();
    String dataCenter = request.getDataCenter();
    Datum datum = datumCache.get(dataCenter, dataInfoId);
    Long version = (datum == null) ? null : datum.getVersion();
    Long requestVersion = request.getVersion();

    if (version == null || requestVersion == 0L || version < requestVersion) {
        getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection,
            new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()),
            dataChangeEventCenter));
    }
}

7.1.3 GetSyncDataHandler

GetSyncDataHandler和SyncDataCallback配合。

即调用GetSyncDataHandler来发送SyncDataRequest,用SyncDataCallback接收同步结果。

├── remoting
│   ├── dataserver
│   │   ├── DataServerConnectionFactory.java
│   │   ├── DataServerNodeFactory.java
│   │   ├── GetSyncDataHandler.java
│   │   ├── SyncDataCallback.java
│   │   ├── handler
│   │   └── task

GetSyncDataHandler 和 SyncDataCallback 这两个辅助类的位置比较奇怪,大概因为是功能类,所以放在dataserver目录下,个人认为也许单独设置一个目录存放更好。

public class GetSyncDataHandler {
    @Autowired
    private DataNodeExchanger   dataNodeExchanger;

    public void syncData(SyncDataCallback callback) {
        int tryCount = callback.getRetryCount();
        if (tryCount > 0) {
            try {
                callback.setRetryCount(--tryCount);
                dataNodeExchanger.request(new Request() {
                    @Override
                    public Object getRequestBody() {
                        return callback.getRequest();
                    }

                    @Override
                    public URL getRequestUrl() {
                        return new URL(callback.getConnection().getRemoteIP(), callback
                            .getConnection().getRemotePort());
                    }

                    @Override
                    public CallbackHandler getCallBackHandler() {
                        return new CallbackHandler() {
                            @Override
                            public void onCallback(Channel channel, Object message) {
                                callback.onResponse(message);
                            }

                            @Override
                            public void onException(Channel channel, Throwable exception) {
                                callback.onException(exception);
                            }

                            @Override
                            public Executor getExecutor() {
                                return callback.getExecutor();
                            }
                        };
                    }
                });
            }
        }
    }

}

7.1.4 SyncDataCallback

这里接收同步结果。

public class SyncDataCallback implements InvokeCallback {

    private static final Executor EXECUTOR    = ExecutorFactory.newFixedThreadPool(5,
                                                  SyncDataCallback.class.getSimpleName());

    private static final int      RETRY_COUNT = 3;

    private Connection            connection;

    private SyncDataRequest       request;

    private GetSyncDataHandler    getSyncDataHandler;

    private int                   retryCount;

    private DataChangeEventCenter dataChangeEventCenter;

    @Override
    public void onResponse(Object obj) {
        GenericResponse<SyncData> response = (GenericResponse) obj;
        if (!response.isSuccess()) {
            getSyncDataHandler.syncData(this);
        } else {
            SyncData syncData = response.getData();
            Collection<Datum> datums = syncData.getDatums();
            DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request
                .getDataSourceType());
            if (syncData.getWholeDataTag()) {
                //handle all data, replace cache with these datum directly
                for (Datum datum : datums) {
                    if (datum == null) {
                        datum = new Datum();
                        datum.setDataInfoId(syncData.getDataInfoId());
                        datum.setDataCenter(syncData.getDataCenter());
                    }
                    Datum.internDatum(datum);
                    dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
                    break;
                }
            } else {
                //handle incremental data one by one
                if (!CollectionUtils.isEmpty(datums)) {
                    for (Datum datum : datums) {
                        if (datum != null) {
                            Datum.internDatum(datum);
                            dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,
                                dataSourceTypeEnum, datum);
                        }
                    }
                }
            }
        }
    }
}

此时逻辑如下:

[Sender DataServer]

+--------------------------+                                                                                     +------------------------+
|                          |     +----------------------------------------------------------------------+        |                        |
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |        |   expireCheckExecutor  |
|                          |     |                                                                      |        |                        |
+--------+-----------------+     |                                                                      |        +--------------+---------+
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors  <---------------------------------+
         |   changeDataCheck     |                                                                      |     checkAcceptorsChangAndExpired
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                     +           +----------------------------------------------------------------------+
                     |
NotifyDataSyncRequest| 1         ^ 2
                     |           |
+-------------------------------------------------------------------------------------------------------------------------------------------+
                     |           |  SyncDataRequest
                     v           |
             +-------+-----------------------------------+
             |[Other DataServer] |                       |
             |                   |                       |
             |                   |                       |
             |                   +                       |
             |  GetSyncDataHandler      SyncDataCallback |
             |                                           |
             |                                           |
             |                                           |
             |                                           |
             +-------------------------------------------+

手机如图:

[从源码学设计]蚂蚁金服SOFARegistry 之 服务注册和操作日志-LMLPHP

0x08 SyncDataRequest回送通知

SyncDataRequest发送回通知发送者。所以这里是other DataServer 发送给 Sender DataServer

8.1 SyncDataRequest

public class SyncDataRequest implements Serializable {

    private String            dataInfoId;

    private String            dataCenter;

    private String            dataSourceType;

    /**
     * be null when dataInfoId not exist in local datumCache
     */
    private Long              version;
}

8.1.1 SyncDataRequest 从哪里来

我们回忆下,SyncDataRequest 从哪里来?在 NotifyDataSyncHandler 的响应函数中,会产生 SyncDataRequest。这里会根据请求的信息,从cache之中获取infoId对应的version,然后发送请求。

public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements AfterWorkingProcess {

    protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) {
        String dataInfoId = request.getDataInfoId();
        String dataCenter = request.getDataCenter();
        Datum datum = datumCache.get(dataCenter, dataInfoId);
        Long version = (datum == null) ? null : datum.getVersion();
        Long requestVersion = request.getVersion();

        if (version == null || requestVersion == 0L || version < requestVersion) {
            getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection,
                new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()),
                dataChangeEventCenter));
        }
    }
}

进而在AbstractAcceptorStore之中

private void notifyChange(Acceptor acceptor) {

    Long lastVersion = acceptor.getLastVersion();

    //may be delete by expired
    if (lastVersion == null) {
        lastVersion = 0L;
    }

    NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(),
        acceptor.getDataCenter(), lastVersion, getType());

    syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()),
                        request, 1000);
}

8.2 syncDataHandler

通知发起者使用 SyncDataHandler 来处理。

  • syncDataHandler

节点间数据同步 Handler,该 Handler 被触发时,会通过版本号进行比对,若当前 DataServer 所存储数据版本号含有当前请求版本号,则会返回所有大于当前请求数据版本号的所有数据,便于节点间进行数据同步。

public class SyncDataHandler extends AbstractServerHandler<SyncDataRequest> {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public Object doHandle(Channel channel, SyncDataRequest request) {
        SyncData syncData = syncDataService.getSyncDataChange(request);
        return new GenericResponse<SyncData>().fillSucceed(syncData);
    }

    @Override
    public HandlerType getType() {
        return HandlerType.PROCESSER;
    }

    @Override
    public Class interest() {
        return SyncDataRequest.class;
    }

    @Override
    protected Node.NodeType getConnectNodeType() {
        return Node.NodeType.DATA;
    }
}

8.3 SyncDataServiceImpl

具体业务服务是SyncDataServiceImpl。会从acceptorStore获取data,即getSyncDataChange方法。

public class SyncDataServiceImpl implements SyncDataService {

    @Override
    public void appendOperator(Operator operator) {
        AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(operator.getSourceType()
            .toString());
        if (acceptorStore != null) {
            acceptorStore.addOperator(operator);
        }
    }

    @Override
    public SyncData getSyncDataChange(SyncDataRequest syncDataRequest) {
        AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(syncDataRequest
            .getDataSourceType());
        if (acceptorStore != null) {
            return acceptorStore.getSyncData(syncDataRequest);
        }
    }
}

关于appendOperator如何调用,前文有描述。

SyncDataServiceImpl会继续调用到AbstractAcceptorStore。

8.4 AbstractAcceptorStore

根据dataCenter和dataInfoId获取出Acceptor,然后返回其process后的数据。

@Override
public SyncData getSyncData(SyncDataRequest syncDataRequest) {

    String dataCenter = syncDataRequest.getDataCenter();
    String dataInfoId = syncDataRequest.getDataInfoId();

    Long currentVersion = syncDataRequest.getVersion();
    try {
        Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter);
        Acceptor existAcceptor = acceptorMap.get(dataInfoId);
        return existAcceptor.process(currentVersion);
    }
}

8.5 Acceptor

然后是Acceptor的处理。

处理发送数据的当前版本号,如果当前版本号存在于当前queue中,返回所有版本号大于当前版本号的Operator,否则所有Operator。

public SyncData process(Long currentVersion) {
    read.lock();
    try {
        Collection<Operator> operators = acceptOperator(currentVersion);
        List<Datum> retList = new LinkedList<>();
        SyncData syncData;
        boolean wholeDataTag = false;
        if (operators != null) {
            //first get all data
            if (operators.isEmpty()) {
                wholeDataTag = true;
                retList.add(datumCache.get(dataCenter, dataInfoId));
            } else {
                for (Operator operator : operators) {
                    retList.add(operator.getDatum());
                }
            }
            syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList);
        } else {
            //no match get all data
            wholeDataTag = true;
            retList.add(datumCache.get(dataCenter, dataInfoId));
            syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList);
        }

        return syncData;
    } finally {
        read.unlock();
    }
}

同步数据结构如下:

public class SyncData implements Serializable {

    private String            dataInfoId;

    private String            dataCenter;

    private Collection<Datum> datums;

    private boolean           wholeDataTag;
}

此时图示如下:

[Sender DataServer]

+--------------------------+                                                                                     +------------------------+
|                          |     +----------------------------------------------------------------------+        |                        |
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |        |   expireCheckExecutor  |
|                          |     |                                                                      |        |                        |
+--------+-----------------+     |                                                                      |        +--------------+---------+
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors  <---------------------------------+
         |   changeDataCheck     |                                                                      |     checkAcceptorsChangAndExpired
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                     +           +------------------------------------------------+-----+---------------+
                     |                                                            ^     |
NotifyDataSyncRequest| 1   +-----------------+  3     +--------------------+   4  |     |
                     |     | syncDataHandler +------> | SyncDataServiceImpl+------+     |
                     |     +-----+-----------+        +--------------------+            |
                     |           ^ 2                                                    |
                     |           |                                                      |  5
                     |           |                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------+
                     |           |  SyncDataRequest                                     |
                     v           |                                                      |
             +-------+-----------------------------------+                              |
             |[Other DataServer] |                       |                              |
             |                   |                       |                              |
             |                   |                       |                              |
             |                   +                       |                              |
             |  GetSyncDataHandler      SyncDataCallback |  <---------------------------+
             |                                           |
             |                                           |
             |                                           |
             |                                           |
             +-------------------------------------------+

手机如下:

[从源码学设计]蚂蚁金服SOFARegistry 之 服务注册和操作日志-LMLPHP

0x09 SyncDataCallback接受者回调

回到接受者,遍历接受到的所有Datum,逐一调用:

如果是全部datum,调用

dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);

否则调用

dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,dataSourceTypeEnum, datum)

具体如下:

public class SyncDataCallback implements InvokeCallback {

    private static final Executor EXECUTOR    = ExecutorFactory.newFixedThreadPool(5,
                                                  SyncDataCallback.class.getSimpleName());

    private static final int      RETRY_COUNT = 3;

    private Connection            connection;

    private SyncDataRequest       request;

    private GetSyncDataHandler    getSyncDataHandler;

    private int                   retryCount;

    private DataChangeEventCenter dataChangeEventCenter;

    @Override
    public void onResponse(Object obj) {
        GenericResponse<SyncData> response = (GenericResponse) obj;
        if (!response.isSuccess()) {
            getSyncDataHandler.syncData(this);
        } else {
            SyncData syncData = response.getData();
            Collection<Datum> datums = syncData.getDatums();
            DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request
                .getDataSourceType());
            if (syncData.getWholeDataTag()) {
                //handle all data, replace cache with these datum directly
                for (Datum datum : datums) {
                    if (datum == null) {
                        datum = new Datum();
                        datum.setDataInfoId(syncData.getDataInfoId());
                        datum.setDataCenter(syncData.getDataCenter());
                    }
                    Datum.internDatum(datum);
                    dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
                    break;
                }
            } else {
                //handle incremental data one by one
                if (!CollectionUtils.isEmpty(datums)) {
                    for (Datum datum : datums) {
                        if (datum != null) {
                            Datum.internDatum(datum);
                            dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,
                                dataSourceTypeEnum, datum);
                        }
                    }
                }
            }
        }
    }
}

DataChangeEventCenter调用如下:

public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) {
    int idx = hash(datum.getDataInfoId());
    DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum);
    dataChangeEventQueues[idx].onChange(event);
}

DataChangeEventQueue调用handleDatum处理,这部分在其他文章中已经讲述。这里只是贴出代码。

@Override
public void run() {
    if (changeData instanceof SnapshotData) {
        SnapshotData snapshotData = (SnapshotData) changeData;
        String dataInfoId = snapshotData.getDataInfoId();
        Map<String, Publisher> toBeDeletedPubMap = snapshotData.getToBeDeletedPubMap();
        Map<String, Publisher> snapshotPubMap = snapshotData.getSnapshotPubMap();
        Datum oldDatum = datumCache.get(dataServerConfig.getLocalDataCenter(), dataInfoId);
        long lastVersion = oldDatum != null ? oldDatum.getVersion() : 0l;
        Datum datum = datumCache.putSnapshot(dataInfoId, toBeDeletedPubMap, snapshotPubMap);
        long version = datum != null ? datum.getVersion() : 0l;
        notify(datum, changeData.getSourceType(), null);
    } else {
        Datum datum = changeData.getDatum();
        String dataCenter = datum.getDataCenter();
        String dataInfoId = datum.getDataInfoId();
        DataSourceTypeEnum sourceType = changeData.getSourceType();
        DataChangeTypeEnum changeType = changeData.getChangeType();

        if (changeType == DataChangeTypeEnum.MERGE
            && sourceType != DataSourceTypeEnum.BACKUP
            && sourceType != DataSourceTypeEnum.SYNC) {
            //update version for pub or unPub merge to cache
            //if the version product before merge to cache,it may be cause small version override big one
            datum.updateVersion();
        }

        long version = datum.getVersion();

        try {
            if (sourceType == DataSourceTypeEnum.CLEAN) {
                if (datumCache.cleanDatum(dataCenter, dataInfoId)) {
                }
            } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
                notifyTempPub(datum, sourceType, changeType);
            } else {
                MergeResult mergeResult = datumCache.putDatum(changeType, datum);
                Long lastVersion = mergeResult.getLastVersion();

                if (lastVersion != null
                    && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {
                    return;
                }

                //lastVersion null means first add datum
                if (lastVersion == null || version != lastVersion) {
                    if (mergeResult.isChangeFlag()) {
                        notify(datum, sourceType, lastVersion);
                    }
                }
            }
        }
    }

}

9.1 DataChangeHandler

DataChangeHandler 会定期提取DataChangeEventCenter中的消息,然后进行处理。

ChangeNotifier存储了Datum。因为此时版本号已经更新,所以不会再次通知,至此流程结束。

MergeResult mergeResult = datumCache.putDatum(changeType, datum);


//lastVersion null means first add datum
if (lastVersion == null || version != lastVersion) {
    if (mergeResult.isChangeFlag()) {
        notify(datum, sourceType, lastVersion);
     }
}

此时逻辑如下:

[Sender DataServer]

+--------------------------+                                                                                     +------------------------+
|                          |     +----------------------------------------------------------------------+        |                        |
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |        |   expireCheckExecutor  |
|                          |     |                                                                      |        |                        |
+--------+-----------------+     |                                                                      |        +--------------+---------+
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors  <---------------------------------+
         |   changeDataCheck     |                                                                      |     checkAcceptorsChangAndExpired
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                     +           +------------------------------------------------+-----+---------------+
                     |                                                            ^     |
NotifyDataSyncRequest| 1   +-----------------+  3     +--------------------+   4  |     |
                     |     | syncDataHandler +------> | SyncDataServiceImpl+------+     |
                     |     +-----+-----------+        +--------------------+            |
                     |           ^ 2                                                    |
                     |           |                                                      |  5
                     |           |                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------+
                     |           |  SyncDataRequest                                     |
[Other DataServer]   |           |                                                      |
                     |           |                                                      |
                     |           |                                                      |
                     |           |              +---------------------------------------+
                     |           |              |
                     |           |              |
                     v           |              v
              +------+-----------++ +-----------+-------+  6    +-----------------------+  7   +--------------------+  8   +-----------------+
              | GetSyncDataHandler| |  SyncDataCallback +-----> | DataChangeEventCenter | +--> |DataChangeEventQueue| +--> |DataChangeHandler|
              +-------------------+ +-------------------+       +-----------------------+      +--------------------+      +-----------------+

手机上如下:

[从源码学设计]蚂蚁金服SOFARegistry 之 服务注册和操作日志-LMLPHP

0x10 总结

回顾下“一次服务注册过程”的服务数据在内部流转过程。

  1. Client 调用 publisher.register 向 SessionServer 注册服务。
  2. SessionServer 收到服务数据 (PublisherRegister) 后,将其写入内存 (SessionServer 会存储 Client 的数据到内存,用于后续可以跟 DataServer 做定期检查),再根据 dataInfoId 的一致性 Hash 寻找对应的 DataServer,将 PublisherRegister 发给 DataServer。
  3. DataServer 接收到 PublisherRegister 数据,首先也是将数据写入内存 ,DataServer 会以 dataInfoId 的维度汇总所有 PublisherRegister。同时,DataServer 将该 dataInfoId 的变更事件通知给所有 SessionServer,变更事件的内容是 dataInfoId 和版本号信息 version。
  4. 同时,异步地,DataServer 以 dataInfoId 维度增量地同步数据给其他副本。因为 DataServer 在一致性 Hash 分片的基础上,对每个分片保存了多个副本(默认是3个副本)。
  5. SessionServer 接收到变更事件通知后,对比 SessionServer 内存中存储的 dataInfoId 的 version,若发现比 DataServer 发过来的小,则主动向 DataServer 获取 dataInfoId 的完整数据,即包含了所有该 dataInfoId 具体的 PublisherRegister 列表。
  6. 最后,SessionServer 将数据推送给相应的 Client,Client 就接收到这一次服务注册之后的最新的服务列表数据。

因为篇幅所限,上文讨论的是前两点,本文介绍第三,第四点。如果以后有时间,会介绍最后两点。

0xFF 参考

Eureka系列(六) TimedSupervisorTask类解析

Eureka的TimedSupervisorTask类(自动调节间隔的周期性任务)

java线程池ThreadPoolExecutor类使用详解

Java线程池ThreadPoolExecutor实现原理剖析

深入理解Java线程池:ThreadPoolExecutor

深入理解Java线程池:ThreadPoolExecutor

Java中线程池ThreadPoolExecutor原理探究

蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容

蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路

服务注册中心 Session 存储策略 | SOFARegistry 解析

海量数据下的注册中心 - SOFARegistry 架构介绍

服务注册中心数据分片和同步方案详解 | SOFARegistry 解析

蚂蚁金服开源通信框架SOFABolt解析之连接管理剖析

蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制

蚂蚁金服开源通信框架 SOFABolt 协议框架解析

蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析

蚂蚁通信框架实践

sofa-bolt 远程调用

sofa-bolt学习

SOFABolt 设计总结 - 优雅简洁的设计之道

SofaBolt源码分析-服务启动到消息处理

SOFABolt 源码分析

SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计

SOFARegistry 介绍

SOFABolt 源码分析13 - Connection 事件处理机制的设计

01-09 19:34