并发编程 - Event Driven 设计模式(EDA)-LMLPHP


EDA 概述

EDA(Event-Driven Architecture)是一种实现组件之间松耦合、易扩展的架构方式。

一个最简单的EDA设计需要包含如下几个组件:

  • Events:需要被处理的数据。

  • Event Handlers:处理Events的方式方法。

  • Event Loop:维护Events和Event Handlers之间的交互流程。

举个例子 :

并发编程 - Event Driven 设计模式(EDA)-LMLPHP

Event A将被Handler A处理,而Event B将被Handler B处理,这一切的分配都是由Event Loop所控制的。

Events是EDA中的重要角色,一个Event至少需要包含两个属性:类型和数据

  • Event的类型决定了它会被哪个Handler处理
  • Data 是在Handler中代加工的材料

初体验

Event

package com.artisan.eventdriven.event;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 * @desc:Event只包含了该Event所属的类型和所包含的数据
 */


public class Event {
    private final String type;
    private final String data;

    public Event(String type, String data) {
        this.type = type;
        this.data = data;
    }

    public String getType() {
        return type;
    }

    public String getData() {
        return data;
    }
}
    

Event Handlers

Event Handlers主要用于处理Event,比如一些filtering或者transforming数据的操作等,下面我们写两个比较简单的方法


    /**
     * 用于处理A 类型的Event
     *
     * @param e
     */
    public static void handleEventA(Event e) {
        System.out.println(e.getData().toLowerCase());
    }

    /**
     * 用于处理B类型的Event
     *
     * @param e
     */
    public static void handleEventB(Event e) {
        System.out.println(e.getData().toUpperCase());
    }
  • handleEventA方法只是简单地将Event中的data进行了lowerCase之后的输出
  • handleEventB方法也是足够的简单,直接将Event中的字符串数据变成大写进行了控制台输出

Event Loop

Event Loop处理接收到的所有Event,并且将它们分配给合适的Handler去处理

 Event e;
 while (!events.isEmpty()) {
     //从消息队列中不断移除,根据不同的类型进行处理
     e = events.remove();
     switch (e.getType()) {
         case "A":
             handleEventA(e);
             break;
         case "B":
             handleEventB(e);
             break;
     }
 }

在EventLoop中,每一个Event都将从Queue中移除出去,通过类型匹配交给合适的Handler去处理。


package com.artisan.eventdriven.demo;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */

import com.artisan.eventdriven.event.Event;

import java.util.LinkedList;
import java.util.Queue;

public class FooEventDrivenExample {

    /**
     * 用于处理A 类型的Event
     *
     * @param e
     */
    public static void handleEventA(Event e) {
        System.out.println(e.getData().toLowerCase());
    }

    /**
     * 用于处理B类型的Event
     *
     * @param e
     */
    public static void handleEventB(Event e) {
        System.out.println(e.getData().toUpperCase());
    }

    public static void main(String[] args) {
        Queue<Event> events = new LinkedList<>();
        events.add(new Event("A", "Hello"));
        events.add(new Event("A", "I am Event A"));
        events.add(new Event("B", "I am Event B"));
        events.add(new Event("B", "World"));

        Event e;
        while (!events.isEmpty()) {
            //从消息队列中不断移除,根据不同的类型进行处理
            e = events.remove();
            switch (e.getType()) {
                case "A":
                    handleEventA(e);
                    break;
                case "B":
                    handleEventB(e);
                    break;
            }
        }
    }
}

并发编程 - Event Driven 设计模式(EDA)-LMLPHP

虽然这个EDA的设计足够简单,但是通过它我们可以感受到EDA中三个重要组件之间的交互关系。


如何设计一个Event-Driven框架

一个基于事件驱动的架构设计,总体来讲会涉及如下几个重要组件:

  • 事件消息(Event)
  • 针对该事件的具体处理器(Handler)
  • 接受事件消息的通道(上个Demo中的queue)
  • 以及对事件消息如何进行分配(Event Loop)

同步EDA框架设计

我们先设计开发一个高度抽象的同步EDA框架,后续再考虑增加异步功能

Message

在基于Message的系统中,每一个Event也可以被称为Message,Message是对Event更高一个层级的抽象,每一个Message都有一个特定的Type用于与对应的Handler做关联

package com.artisan.eda.intf;

public interface Message {
    /**
     * 返回Message的类型
     */
    Class<? extends Message> getType();
}

Channel

第二个比较重要的概念就是Channels,Channel主要用于接受来自Event Loop分配的消息,每一个Channel负责处理一种类型的消息(当然这取决于我们对消息如何进行分配)

package com.artisan.eda.intf;

public interface Channel<E extends Message> {

    /**
     * dispatch方法用于负责Message的调度
     */
    void dispatch(E message);
}


Dynamic Router

Router的作用类似于上面的Event Loop,其主要是帮助Event找到合适的Channel并且传送给它

package com.artisan.eda.intf;

 
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public interface DynamicRouter<E extends Message> {

    /**
     * 针对每一种Message类型注册相关的Channel,只有找到合适的Channel该Message才会被处理
     */
    void registerChannel(Class<? extends E> messageType,
                         Channel<? extends E> channel);

    /**
     * 为相应的Channel分配Message
     */
    void dispatch(E message);
}

Router如何知道要将Message分配给哪个Channel呢?换句话说,Router需要了解到Channel的存在,因此registerChannel()方法的作用就是将相应的Channel注册给Router,dispatch方法则是根据Message的类型进行路由匹配。


Event

Event是对Message的一个最简单的实现,在以后的使用中,将Event直接作为其他Message的基类即可(这种做法有点类似于适配器模式)

package com.artisan.eda.event;

import com.artisan.eda.intf.Message;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class Event implements Message {
    @Override
    public Class<? extends Message> getType() {
        return getClass();
    }
}

EventDispatcher

EventDispatcher是对DynamicRouter的一个最基本的实现,适合在单线程的情况下进行使用,因此不需要考虑线程安全的问题

package com.artisan.eda.router;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */

import com.artisan.eda.intf.Channel;
import com.artisan.eda.intf.DynamicRouter;
import com.artisan.eda.intf.Message;

import java.util.HashMap;
import java.util.Map;

/**
 * EventDispatcher不是一个线程安全的类
 * @author artisan
 */
public class EventDispatcher implements DynamicRouter<Message> {
    /**
     * 用于保存Channel和Message之间的关系
     */
    private final Map<Class<? extends Message>, Channel> routerTable;

    public EventDispatcher() {
        //初始化RouterTable,但是在该实现中,我们使用HashMap作为路由表
        this.routerTable = new HashMap<>();
    }

    @Override
    public void dispatch(Message message) {
        if (routerTable.containsKey(message.getType())) {
            //直接获取对应的Channel处理Message
            routerTable.get(message.getType()).dispatch(message);
        } else
            throw new MessageMatcherException("Can't match the channel for [" + message.getType() + "] type");
    }

    @Override
    public void registerChannel(Class<? extends Message> messageType,
                                Channel<? extends Message> channel) {
        this.routerTable.put(messageType, channel);
    }
}

在EventDispatcher中有一个注册表routerTable,主要用于存放不同类型Message对应的Channel,如果没有与Message相对应的Channel,则会抛出无法匹配的异常。

package com.artisan.eda.exceptions;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class MessageMatcherException extends RuntimeException {

    public MessageMatcherException(String message) {
        super(message);
    }
}
    

测试

package com.artisan.eventdriven.eda;

import com.artisan.eda.event.Event;
import com.artisan.eda.intf.Channel;
import com.artisan.eda.router.EventDispatcher;


/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EventDispatcherExample
{
    /**
     * InputEvent中定义了两个属性X和Y,主要用于在其他Channel中的运算
     */
    static class InputEvent extends Event
    {
        private final int x;
        private final int y;

        public InputEvent(int x, int y)
        {
            this.x = x;
            this.y = y;
        }

        public int getX()
        {
            return x;
        }

        public int getY()
        {
            return y;
        }
    }
    /**
     * 用于存放结果的Event
     */
    static class ResultEvent extends Event
    {
        private final int result;

        public ResultEvent(int result)
        {
            this.result = result;
        }

        public int getResult()
        {
            return result;
        }
    }
    /**
     * 处理ResultEvent的Handler(Channel),只是简单地将计算结果输出到控制台
     */
    static class ResultEventHandler implements Channel<ResultEvent>
    {
        @Override
        public void dispatch(ResultEvent message)
        {
            System.out.println("The result is:" + message.getResult());
        }
    }

    /**
     * InputEventHandler需要向Router发送Event,因此在构造的时候需要传入Dispatcher
     */
    static class InputEventHandler implements Channel<InputEvent>
    {
        private final EventDispatcher dispatcher;

        public InputEventHandler(EventDispatcher dispatcher)
        {
            this.dispatcher = dispatcher;
        }

        /**
         *将计算的结果构造成新的Event提交给Router
         */
        @Override
        public void dispatch(InputEvent message)
        {
            System.out.printf("X:%d,Y:%d\n", message.getX(), message.getY());
            int result = message.getX() + message.getY();
            dispatcher.dispatch(new ResultEvent(result));
        }
    }

    public static void main(String[] args)
    {
        //构造Router
        EventDispatcher dispatcher = new EventDispatcher();
        //将Event和Handler(Channel)的绑定关系注册到Dispatcher
        dispatcher.registerChannel(InputEvent.class, new InputEventHandler(dispatcher));
        dispatcher.registerChannel(ResultEvent.class, new ResultEventHandler());
        dispatcher.dispatch(new InputEvent(1, 2));
    }
}

并发编程 - Event Driven 设计模式(EDA)-LMLPHP

由于所有的类都存放于一个文件中,因此看起来测试代码比较多,其实结构还是非常清晰的,

  • InputEvent是一个Message,它包含了两个Int类型的属性,
  • 而InputEventHandler是对InputEvent消息的处理,接收到了InputEvent消息之后,分别对X和Y进行相加操作,然后将结果封装成ResultEvent提交给EventDispatcher,
  • ResultEvent相对比较简单,只包含了计算结果的属性,ResultEventHandler则将计算结果输出到控制台上。

通过上面这个例子的运行会发现,不同数据的处理过程之间根本无须知道彼此的存在,一切都由EventDispatcher这个Router来控制,它会给你想要的一切,这是一种稀疏耦合(松耦合)的设计

EDA的设计除了松耦合特性之外,扩展性也是非常强的,比如Channel非常容易扩展和替换,另外由于Dispatcher统一负责Event的调配,因此在消息通过Channel之前可以进行很多过滤、数据验证、权限控制、数据增强(Enhance)等工作。

同步EDA架构类图

并发编程 - Event Driven 设计模式(EDA)-LMLPHP

异步EDA框架设计

上面的同步EDA框架,在应对高并发的情况下还是存在一些问题的,具体如下。

  • EventDispatcher不是线程安全的类,在多线程的情况下,registerChannel方法会引起数据不一致的问题。

  • 就目前而言,我们实现的所有Channel都无法并发消费Message,比如InputEventHandler只能逐个处理Message,低延迟的消息处理还会导致Dispatcher出现积压。

抽象基类 AsyncChannel

我们继续对EDA框架进行扩充,使其可支持并发任务的执行,下面定义了一个新的AsyncChannel作为基类,该类中提供了Message的并发处理能力。

 package com.artisan.eda.async;

import com.artisan.eda.event.Event;
import com.artisan.eda.intf.Channel;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public abstract class AsyncChannel implements Channel<Event> {
    /**
     * 在AsyncChannel 中将使用ExecutorService多线程的方式提交给Message
     */
    private final ExecutorService executorService;

    /**
     * 默认构造函数,提供了CPU的核数×2的线程数量
     */
    public AsyncChannel() {
        this(Executors.newFixedThreadPool(Runtime.getRuntime()
                .availableProcessors() * 2));
    }

    /**
     * 用户自定义的ExecutorService
     *
     * @param executorService
     */
    public AsyncChannel(ExecutorService executorService) {
        this.executorService = executorService;
    }

    /**
     * 重写dispatch方法,并且用final修饰,避免子类重写
     *
     * @param message
     */
    @Override
    public final void dispatch(Event message) {
        executorService.submit(() -> this.handle(message));
    }

    /**
     * 提供抽象方法,供子类实现具体的Message处理
     *
     * @param message
     */
    protected abstract void handle(Event message);

    /**
     * 提供关闭ExecutorService的方法
     */
    public void stop() {
        if (null != executorService && !executorService.isShutdown())
            executorService.shutdown();
    }
}

    
  • 为了防止子类在继承AsyncChannel基类的时候重写dispatch方法,用final关键字对其进行修饰(Template Method Design Pattern),
  • handle方法用于子类对Message进行具体的处理,
  • stop方法则用来停止ExecutorService。

AsyncEventDispatcher 并发分发消息

其次,还需要提供新的EventDispatcher类AsyncEventDispatcher负责以并发的方式dispatch Message,其中Event对应的Channel只能是AsyncChannel类型,并且也对外暴露了shutdown方法

package com.artisan.eda.router;

import com.artisan.eda.async.AsyncChannel;
import com.artisan.eda.event.Event;
import com.artisan.eda.exceptions.MessageMatcherException;
import com.artisan.eda.intf.Channel;
import com.artisan.eda.intf.DynamicRouter;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class AsyncEventDispatcher implements DynamicRouter<Event> {

    /**
     * 使用线程安全的ConcurrentHashMap替换HashMap
     */
    private final Map<Class<? extends Event>, AsyncChannel> routerTable;

    public AsyncEventDispatcher() {
        this.routerTable = new ConcurrentHashMap<>();
    }

    @Override
    public void registerChannel(Class<? extends Event> messageType,
                                Channel<? extends Event> channel) {
        //在AsyncEventDispatcher中,Channel必须是AsyncChannel类型
        if (!(channel instanceof AsyncChannel)) {
            throw new IllegalArgumentException("The channel must be AsyncChannel Type.");
        }
        this.routerTable.put(messageType, (AsyncChannel) channel);
    }

    @Override
    public void dispatch(Event message) {
        if (routerTable.containsKey(message.getType())) {
            routerTable.get(message.getType()).dispatch(message);
        } else {
            throw new MessageMatcherException("Can't match the channel for ["
                    + message.getType() + "] type");
        }

    }

    public void shutdown() {
        //关闭所有的Channel以释放资源
        routerTable.values().forEach(AsyncChannel::stop);
    }

}
    

在AsyncEventDispatcher中,routerTable使用线程安全的Map定义,在注册Channel的时候,如果其不是AsyncChannel的类型,则会抛出异常。


测试

package com.artisan.eventdriven.eda;

import com.artisan.eda.async.AsyncChannel;
import com.artisan.eda.event.Event;
import com.artisan.eda.router.AsyncEventDispatcher;

import java.util.concurrent.TimeUnit;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class AsyncEventDispatcherExample {

    //主要用于处理InputEvent,但是需要继承AsyncChannel
    static class AsyncInputEventHandler extends AsyncChannel {
        private final AsyncEventDispatcher dispatcher;

        AsyncInputEventHandler(AsyncEventDispatcher dispatcher) {
            this.dispatcher = dispatcher;
        }

        //不同于以同步的方式实现dispatch,异步的方式需要实现handle
        @Override
        protected void handle(Event message) {
            EventDispatcherExample.InputEvent inputEvent =
                    (EventDispatcherExample.InputEvent) message;
            System.out.printf("X:%d,Y:%d\n", inputEvent.getX(), inputEvent.getY());
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int result = inputEvent.getX() + inputEvent.getY();
            dispatcher.dispatch(new EventDispatcherExample.ResultEvent(result));
        }
    }

    //主要用于处理InputEvent,但是需要继承AsyncChannel
    static class AsyncResultEventHandler extends AsyncChannel {
        @Override
        protected void handle(Event message) {
            EventDispatcherExample.ResultEvent resultEvent =
                    (EventDispatcherExample.ResultEvent) message;
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("The result is:" + resultEvent.getResult());
        }
    }

    public static void main(String[] args) {
        //定义AsyncEventDispatcher
        AsyncEventDispatcher dispatcher = new AsyncEventDispatcher();

        //注册Event和Channel之间的关系
        dispatcher.registerChannel(EventDispatcherExample.InputEvent.class, new AsyncInputEventHandler(dispatcher));
        dispatcher.registerChannel(EventDispatcherExample.ResultEvent.class, new AsyncResultEventHandler());
        //提交需要处理的Message
        dispatcher.dispatch(new EventDispatcherExample.InputEvent(1, 2));
    }
}
    

当dispatcher分配一个Event的时候,如果执行非常缓慢也不会影响下一个Event被dispatch,这主要得益于我们采用了异步的处理方式(ExecutorService本身存在的任务队列可以允许异步提交一定数量级的数据)

并发编程 - Event Driven 设计模式(EDA)-LMLPHP

07-10 08:53