前言

这是一个原理非常重要,写法很常见的一个模式,值得深入理解和总结一下

可以想 zookeeper 等,有时系统需要定时(可插拔)接收或者监听其他服务的动态,这类需求经常见到,那么观察者模式就是做这个的:

通俗的解释,联系生活中的邮件订阅或者报纸,杂志的订阅服务

1、xx 报社的业务就是出版报纸

2、张三向 xx 报社订阅了 A 品牌的报纸,只要 xx 报社有新一期的 A 报纸出版,就会派快递员给张三送到家。不仅仅是张三,任何公民,只要你成为了 xx 报社的 A 报的订户,你就会一直收到 A 报纸

3、随着信息时代的发展,张三迷恋起了手机新闻类的 APP,不想浪费钱订阅纸质的 A 报纸了。此时,张三可以取消 A 报纸的订阅,下一期 xx 报社就不会再送新的 A 报纸给张三

4、只要 xx 报社还在正常营业,理论上就会一直有人(或其它单位,也就是多人)向他们订阅报纸或取消订阅报纸

这就是所谓的发布订阅模式的生活模型,也叫出版订阅模式

而出版者+多个订阅者(也可以为一个)= 观察者模式,在观察者模式里,学术性的叫法是管出版者称为“主题”,订阅者称为“观察者”,仅此而已

显然,观察者模式定义了对象之间的一对多的依赖关系——当一个对象改变状态时,它的所有依赖者都会受到通知并自动更新状态。它是对象的行为模式。

案例1:气象观测系统

当前有一个气象监测系统,它有三个子系统:

1、一个 WeatherData 系统,负责计算、追踪目前的天气状况(温度,湿度,气压)。

2、三种电子显示器(这里不涉及前端),分别显示给用户目前的天气状况、气象统计信息、及简单的天气预报。当 WeatherData 从气象站获得了最新的测量数据时,三种布告板必须被实时更新。

3、气象站,它是一个物理设备,能获取实际的天气数据。

按照OOP的一般原则,应该最好把该系统设计成一个可扩展的服务,比如:

1、比如只公布 API,隐藏内部实现

2、让其他服务的 RD 可以自定义气象显示器,并插入此应用中。 

当前的 demo 如下:

/**
 * 不关心这些数据到底如何从物理设备——气象站获取的
 * 这是硬件工程师和气象工程师的事情
 */
public class WeatherData {
    public int getTemperature() {
        return 0;
    }

    public int getHumidity() {
        return 0;
    }

    public int getPressure() {
        return 0;
    }

    public void measurementsChanged() {
        // 一旦气象测量更新,此方法会被调用
    }
}

如上 demo 可知现状:

1、WeatherData 类具有getter方法,可以从气象站取得测量值

2、当 WeatherData 从气象站获得了最新的测量数据时,measurementsChanged()方法必须要被调用

3、需要对接的 RD 实现天气预报的显示功能(三个显示器,这里不涉及前端),即:一旦 WeatherData 获取了新的测量数据,这些数据必须也同步更新到页面。

另外,要求此系统必须可扩展,比如 RD 可以自定义显示功能,还可以随意的更换或增删显示功能,而不会影响整个系统

气象观测系统的实现版本 1

有 RD 是这样实现WeatherData 类的 measurementsChanged 方法的:

/**
 * 不关心这些数据到底如何从物理设备——气象站获取的
 * 这是硬件工程师和气象工程师的事情
 */
public class WeatherData {
    // 这些方法实现,不属于我们管
    public float getTemperature() {
        return 0;
    }

    // 这些方法实现,不属于我们管
    public float getHumidity() {
        return 0;
    }

    // 这些方法实现,不属于我们管
    public float getPressure() {
        return 0;
    }

    public void measurementsChanged() {
        float temp = getTemperature();
        float humidity = getHumidity();
        float pressure = getPressure();

        // 三种显示器的实现类的对象:
        // currentConditionsDisplay 当前天气状态实时显示
        // statisticsDisplay 天气数据统计信息展示
        // forecastDisplay 天气预报展示
        currentConditionsDisplay.update(temp, humidity, pressure);
        statisticsDisplay.update(temp, humidity, pressure);
        forecastDisplay.update(temp, humidity, pressure);
    }

    // 这里是其他WeatherData方法
    // …………
}

挑出问题

问题:

1、显示器是针对具体实现编程,而非针对接口编程,面向具体的实现编程会导致我们以后在修改显示器的名字时,也必须修改 WeatherData 程序

2、其实第 1 点更想表达的问题是:它的可扩展性很差,如果产品需要增加新的显示器类型,那么每次增加(当然也包括删除),都要打开 measurementsChanged 方法,修改代码,在复杂的业务系统中,增加测试难度,而且反复修改稳定的代码容易衍生bug

3、无法再运行时动态地增加(或删除)显示器

4、没有区分变化和不变,更没有封装改变的部分

5、第 4 点也说明:WeatherData 类的封装并不好

改进:

1、measurementsChanged 里的三个 update 方法,很明显可以抽象出一个接口——面向接口编程

2、显示器的实现对象们,明显是经常需要改变的部分,应该拆分变化的部分,并且独立抽取出来,做封装

观察者模式的标准定义

观察者模式是对象的行为模式,也叫发布-订阅 (Publish/Subscribe)模式、模型-视图 (Model/View)模式(这里可以联系 MVC 架构模式)、源-监听器 (Source/Listener) 模式或从属者(Dependents)模式等等,其实说的都是一个东西。观察者模式定义了一种对象间的一对多的依赖关系——让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生变化时,会通知所有已经注册(订阅了自己的)观察者对象,使它们能够自动更新自己。

标准类图和角色

发布订阅/回调模型的核心技术——观察者模式复习总结-LMLPHP

观察者模式所涉及的角色有:

1、抽象主题(Subject)角色

抽象主题角色把所有对观察者对象的引用(注册的观察者们,订阅者们)保存在一个聚集(比如ArrayList对象)里,每个主题(其实主题也是可以有多个的)都可以有任何数量的观察者。抽象主题提供一个接口,可以增加和删除观察者对象,抽象主题角色又叫做抽象被观察者(Observable)角色。

2、具体主题(ConcreteSubject)角色

将有关状态存入具体观察者对象。在具体主题的内部状态改变时,给所有登记(注册)过的观察者发出通知(notify方法调用)。具体主题角色又叫做具体被观察者(Concrete Observable)角色。

3、抽象观察者(Observer)角色

为所有的具体观察者定义一个接口,实现 update 行为,在得到主题的通知(notify调用)时,update 被调用,从而能够更新自己,这个接口叫做更新接口。

4、具体观察者(ConcreteObserver)角色

存储与主题的状态自恰的状态。具体观察者角色实现抽象观察者角色所要求的更新接口,以便使本身的状态与主题的状态协调。如果需要,具体观察者角色可以保持一个指向具体主题对象的引用。

理解一对多的关联关系

所谓的对象间的一对多关系,是指主题是一个具有状态的主题,这个状态可以改变,另一方面,观察者们需要使用这个变化的状态,但是这个状态并不属于观察者自己维护,那么就需要观察者们去依赖主题的通知,让主题来告诉它们,何时状态发生了改变……

这就产生了一个关系——一个主题对应了多个观察者的关系。

因为主题对象是真正的维护变化的状态的一方,观察者是主题的依赖方,在主题的状态变化时,推送自己的变化给这些观察者们,比起让每个观察者自行维护该状态(一般是一个对象)要更加安全和 OO。

高内聚和低耦合的设计原则

在 遍历“容器”的优雅方法——总结迭代器模式 中,阐述了高内聚和单一职责,现在看下低耦合,也叫松耦合设计原则。

观察者模式恰恰能提供一种一对多对象依赖关系的设计,让主题和观察者之间松耦合。

1、主题只知道各个注册的观察者们实现了某个接口(也就是 Observer 接口,观察者的接口),主题不需要,也不应该知道各个观察者的具体实现类是谁,它们都做了些什么

2、任何时候,RD 都可以为系统增加新的观察者。因为主题唯一依赖的东西是一个实现了 Observer 接口的对象列表,所以 RD 可以随时增加观察者。同样的,也可以在任何时候删除某些观察者。

3、在运行时,可以用新的观察者实现取代现有的观察者实现,而主题不会受到任何影响——代码不需要修改,假如以后扩展了新的业务,需要增加一个新的业务对象做为观察者,RD 不需要为了兼容新类型而修改主题的代码,所有要做的工作就是让新的业务类实现观察者接口——Observer ,并向主题注册为观察者即可,主题不 care 向其注册的对象具体都是谁,它只 care 何时发送什么通知,给所有实现了观察者接口的对象。

阶段小结

低耦合的设计能够建立有弹性的OO系统,将对象间的依赖降到最低,较容易的应对变化

气象观测系统的实现版本 2——基于推模型的观察者模式

气象观测系统的 WeatherData 类正是观察者模式中的“一”,“多”正是使用天气观测的各种显示器对象。

/**
 * 主题接口
 */
public interface Subject {
    void registerObserver(Observer o);
    void removeObserver(Observer o);
    void notifyObservers();
}
 
/////////////////////////////
/**
 * 观察者接口
 */
public interface Observer {
    void update(float temp, float humidity, float pressure);
}

/////////////////////////////
/**
 * 显示器的接口,因为每个显示器都有一个展示的方法,故抽象出来,设计为接口
 */
public interface DisplayElement {
    void display();
}
 
/////////////////////////////
import java.util.ArrayList;
import java.util.List;

/**
 * 具体的主题——气象观测系统
 */
public class WeatherData implements Subject {
    private List<Observer> observers; // 主题聚合了观察者,多用聚合(组合)慎用继承
    private float temperature;
    private float humidity;
    private float pressure;

    public WeatherData() {
        observers = new ArrayList<>();
    }

    public float getTemperature() {
        return temperature;
    }

    public float getHumidity() {
        return humidity;
    }

    public float getPressure() {
        return pressure;
    }

    @Override
    public void registerObserver(Observer o) {
        observers.add(o); // 注册观察者
    }

    @Override
    public void removeObserver(Observer o) {
        observers.remove(o);
    }

    @Override
    public void notifyObservers() {
        for (Observer obs : observers) {
            obs.update(temperature, humidity, pressure);
        }
    }

    // 之前 demo 里的方法,抽取封装了变化的 update 代码,且面向接口编程,这里还能额外进行校验等
    private void measurementsChanged() {
        notifyObservers();
    }

    // 被动接收气象站的数据更新,或者主动抓取,这里可以实现爬虫等功能
    public void setMeasurements(float temperature, float humidity, float pressure) {
        this.temperature = temperature;
        this.humidity = humidity;
        this.pressure = pressure;
        // 一旦数据更新了,就立即同步观察者们,这就是所谓的 push——推模型的观察者设计模式的实现,对应的还有
        // 一种基于拉模型的——pull模型的实现
        measurementsChanged();
    }
}
/////////////////////////////
/**
 * 各个显示器类也是具体的观察者们
 */
public class CurrentConditionsDisplay implements DisplayElement, Observer {
    private float temperature;
    private float humidity;
    private Subject weatherData; // 非必须,必要的时候,可以聚合主题接口的引用(指针),指向具体的主题对象

    // 实现向上转型,面向接口编程,解耦合
    public CurrentConditionsDisplay(Subject weatherData) {
        this.weatherData = weatherData;
        // 将自己(订阅者)注册到主题中(发布者中)
        weatherData.registerObserver(this);
    }

    @Override
    public void display() {
        System.out.println("Current conditions: " + temperature
                + "degrees and " + humidity + "is % humidity");
    }

    @Override
    public void update(float temp, float humidity, float pressure) {
        this.temperature = temp;
        this.humidity = humidity;
        display();
    }
}
 
/////////////////////////////
/**
 * 各个显示器类也是具体的观察者们
 */
public class ForecastDisplay implements DisplayElement, Observer {
    private float currentPressure = 29.92f;
    private float lastPressure;
    private Subject weatherData;

    public ForecastDisplay(WeatherData weatherData) {
        this.weatherData = weatherData;
        weatherData.registerObserver(this);
    }

    @Override
    public void display() {
        System.out.print("Forecast: ");
        if (currentPressure > lastPressure) {
            System.out.println("Improving weather on the way!");
        } else if (currentPressure == lastPressure) {
            System.out.println("More of the same");
        } else if (currentPressure < lastPressure) {
            System.out.println("Watch out for cooler, rainy weather");
        }
    }

    @Override
    public void update(float temp, float humidity, float pressure) {
        lastPressure = this.currentPressure;
        this.currentPressure = pressure;
        display();
    }
}
/////////////////////////////
/**
 * 各个显示器类也是具体的观察者们
 */
public class StatisticsDisplay implements Observer, DisplayElement {
    private float maxTemp = 0.0f;
    private float minTemp = 200;
    private float tempSum = 0.0f;
    private int numReadings;
    private Subject weatherData;

    public StatisticsDisplay(WeatherData weatherData) {
        this.weatherData = weatherData;
        weatherData.registerObserver(this);
    }

    @Override
    public void display() {
        System.out.println("Avg/Max/Min temperature = " + (tempSum / numReadings)
                + "/" + maxTemp + "/" + minTemp);
    }

    @Override
    public void update(float temp, float humidity, float pressure) {
        this.tempSum += temp;
        this.numReadings++;

        if (temp > this.maxTemp) {
            this.maxTemp = temp;
        }

        if (temp < this.minTemp) {
            this.minTemp = temp;
        }

        display();
    }
}
 
///////////////////////////// 测试类
/**
 * 气象站,模拟物理设备
 */
public class WeatherStation {
    public static void main(String[] args) {
        WeatherData weatherData = new WeatherData(); // 主题
        // 各个观察者注册到主题
        Observer currentDisplay = new CurrentConditionsDisplay(weatherData);
        Observer statisticsDisplay = new StatisticsDisplay(weatherData);
        Observer forecastDisplay = new ForecastDisplay(weatherData);
        // 本设备会给气象观测系统推送变化的数据
        weatherData.setMeasurements(80, 65, 30.4f);
        weatherData.setMeasurements(82, 70, 29.2f);
        weatherData.setMeasurements(78, 90, 29.2f);
    }
}

问题发现

虽然基于观察者模式实现了该系统,但是还有不完美的地方:

要更新的数据的传递方式

在观察者接口的 update 方法中,其参数是各个经常变化的,被观测的气象参数,我们把观测值直接传入观察者中,并不是一种好的实现方法。比如,这些观测值的种类和个数在未来有可能改变,如果以后会改变,这些变化并没有被很好地封装。会牵一发动全身——需要修改许多地方的代码,再下一版中修改。

update 和 display 方法的位置

乍一看,update的同时,就把变化显示,是合理的。但是还有更好的设计方式,比如 MVC 架构模式中的实现,再下一版中说明。

具体观察者不写主题的引用的后果

如果不写这个主题引用,那么以后想增加取消订阅的方法(或者其他可能的方法),就不太方便。故还是一开始就保留引用。

push 模型

当前的实现是基于推模型的观察者模式实现,即主题主动推送更新给观察者,这样做的理由:

1、主题可以集齐所有数据,灵活的决定发送的数据量,可以一次性的推送完整的数据给观察者

2、观察者不需要主动的反复拉取数据,责任被分割

但是,有时候也得结合业务来看,比如当观察者很多很多的时候:

1、主题也许并不能完全掌握每个观察者的需求,那么让观察者主动 pull 数据,也许是比较好的实现。

2、在极多个观察者的场景下,如果仅仅是某些个别的观察者需要一点儿数据,那么主题仍然会通知全部的观察者,导致和该业务无关的观察者都要被通知,这是没有必要的。

3、外一以后观察者需要扩展一些状态,如果采用推模型,那么主题除了要新增必要的状态属性外,还要修改通知的代码逻辑。如果基于拉模型,主题只需要提供一些对外的getter方法,让观察者调用(主动拉取数据),那么当观察者扩展状态属性时,主题就不需要修改对各个观察者的调用代码。仅仅增加属性和对应的getter方法即可。

不过,生产环境中,也有很多是两个模型都实现了。

针对 pull 模型,JDK 中已经实现的观察者模式 API 也给我们实现好了,也就是说,JDK 有自带的观察者模式 API,且可以实现 push 或者 pull 模型的观察者模式。

气象监测系统的实现版本3——基于拉模型

使用 JDK 内置的观察者模式 API 实现,java.util 包内含有最基本的 Observer 接口——观察者,与 Observable 类(注意,JDK 设计的是类)——主题,这和第 4 节中的 Subject 接口与 Observer 接口十分相似。

基于 JDK 内置支持,实现观察者模式的拉模型

WeatherData 直接扩展 java.util.Observable 类,并继承到一些增加、删除、通知观察者的方法等,就摇身一变成了主题类。

各个观察者只需要实现观察者接口——java.uitl.Observer。

import java.util.Observable;

public class WeatherData extends Observable {
    private float temperature;
    private float humidity;
    private float pressure;

    public WeatherData() {
    }

    public float getTemperature() {
        return temperature;
    }

    public float getHumidity() {
        return humidity;
    }

    public float getPressure() {
        return pressure;
    }

    private void measurementsChanged() {
        // 从java.util.Observable;继承,线程安全
        // 拉模型实现,只是设置一个状态,如果状态位变了,就说明数据变更
        setChanged();
        // 从java.util.Observable;继承,线程安全,只有当状态位为true,通知才有效,之后观察者们会主动拉取数据
        notifyObservers();
    }

    public void setMeasurements(float temperature, float humidity, float pressure) {
        this.temperature = temperature;
        this.humidity = humidity;
        this.pressure = pressure;

        measurementsChanged();
    }
}
 
/////////////////// 省略 DisplayElement 接口,和之前一样
import java.util.Observable;
import java.util.Observer;

public class CurrentConditionsDisplay implements Observer, DisplayElement {
    private Observable observable; // java.util.Observable;
    private float temperature;
    private float humidity;

    public CurrentConditionsDisplay(Observable observable) {
        this.observable = observable;
        // 继承自 java.util.Observable;
        this.observable.addObserver(this);
    }

    public void removeObserver() {
        this.observable.deleteObserver(this);
    }

    @Override
    public void display() {
        System.out.println("Current conditions: " + temperature
                + "F degrees and " + humidity + "% humidity");
    }

    // 从 java.util.Observer; 实现来的,实现的拉模型,arg 是空
    // 额外的多了 Observable o 参数,让观察者能知道:到底是哪个主题通知的
    @Override
    public void update(Observable o, Object arg) {
        // 非常灵活的设计,可以指定观察者只响应特定主题的通知,而不是默认强制的全部被通知
        if (o instanceof WeatherData) {
            WeatherData weatherData = (WeatherData) o;
            // 拉数据,让观察者具有了很大灵活性——自己决定需要什么数据
            this.temperature = weatherData.getTemperature();
            this.humidity = weatherData.getHumidity();
            display();
        }
    }
}

注册观察者就是调用 Obsecable 的 addObserver()方法。取消注册,即调用 deleteObserver() 方法。

当需要发出通知时:

1、调用 Obsecable 的 setChanged 方法,会修改一个状态位为 true,说明状态已经改变了

2、调用 notifyObservers(); 或者 notifyObservers(Object obj); 前者是拉模型,后者是推模型

 3、notifyObservers(Object obj); 是把变化的数据整合为了一个对象,在自定义实现的时候,如果 notify 的参数较多,也可以这样封装

回调机制和观察者模式

所谓回调:A类调用B类的方法C,然后B类反过来调用A类的方法D,D方法就叫回调方法,和观察者模式原理是一样的,只不过观察者只有一个,即一对一的关系。而且结合多线程,可以实现异步回调。

一般写程序是你调用系统的API,如果把关系反过来,你写一个函数,让系统调用你的函数,那就是回调了,那个被系统调用的函数就是回调函数。

示例代码

/**
 * 一请求一线程模式
 */
public class ClientHandler {
    private final Socket clientSocket;
    private final ClientReadHandler clientReadHandler;
    private final CloseNotify closeNotify; // 回调接口

    public ClientHandler(Socket clientSocket, CloseNotify closeNotify) throws IOException {
        this.clientSocket = clientSocket;
        this.clientReadHandler = new ClientReadHandler(clientSocket.getInputStream());
        this.closeNotify = closeNotify; // 回调
    }

    private void exitByMyself() {
        // 一些关闭的业务代码
        closeNotify.onSelfClosed(this);
    }

    /**
     * 回调接口:告诉服务器,某个客户端已经退出了,服务器要删除这个 client handler
     */
    public interface CloseNotify {
        void onSelfClosed(ClientHandler thisHandler);
    }

    private class ClientReadHandler extends Thread {
        private final InputStream inputStream;
        private boolean done = false;

        ClientReadHandler(InputStream inputStream) {
            this.inputStream = inputStream;
        }

        @Override
        public void run() {
            try {
                InputStreamReader inputStreamReader = new InputStreamReader(ClientReadHandler.this.inputStream);
                BufferedReader bufferedReader = new BufferedReader(inputStreamReader);

                do {
                    String line = bufferedReader.readLine();
                    if (line == null) {
                        ClientHandler.this.exitByMyself(); // line 收到的是 null,就认为异常了,需要通知服务端去除该异常客户端

                        break;
                    }

                    System.out.println(line);
                } while (!this.done);
            } catch (Exception e) {
                if (!this.done) {
                    ClientHandler.this.exitByMyself();
                }
            }
        }
    }
}

服务器端,需要实现回调接口,这里是使用的内部接口

public class TCPServer {
    private final int portServer;
    private final List<ClientHandler> clientHandlerList = new ArrayList<>(); // 保存连接的客户端实例
    private ClientListener clientListener;

    public TCPServer(int portServer) {
        this.portServer = portServer;
    }

    public boolean start(String name) {
        try {
            ClientListener clientListener = new ClientListener(name, this.portServer);
            this.clientListener = clientListener;
            clientListener.start();
        } catch (Exception e) {
            return false;
        }

        return true;
    }

    private class ClientListener extends Thread {
        private ServerSocket serverSocket;
        private boolean done = false;

        ClientListener(String name, int portServer) throws IOException {
            super(name);
            this.serverSocket = create(portServer); // 监听 portServer 端口
        }

        @Override
        public void run() {
            int count = 0;
            do {
                Socket clientSocket;
                try {
                    clientSocket = this.serverSocket.accept();
                } catch (IOException e) {
                    if (!this.done) {
                        e.printStackTrace();
                    }
                    continue;
                } finally {
                    count++;
                }

                ClientHandler clientHandler;
                try {
                    clientHandler = new ClientHandler(clientSocket, new ClientHandler.CloseNotify() {
                        @Override
                        public void onSelfClosed(ClientHandler thisHandler) {
                            TCPServer.this.clientHandlerList.remove(thisHandler);
                        }
                    });
                    clientHandler.readAndPrint();
                    TCPServer.this.clientHandlerList.add(clientHandler);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            } while (!this.done);
        }
    }
} 

观察者模式的典型应用

1、侦听事件驱动程序设计中的外部事件

2、侦听/监视某个对象的状态变化

3、发布者/订阅者(publisher/subscriber)模型中,当一个外部事件(新的产品,消息的出现等等)被触发时,通知邮件列表中的订阅者

……

开源框架中应用了观察者模式的例子

最经典的就是SpringMVC了,因为 MVC 架构模式就应用了观察者模式——多个 view 注册监听 model。

另外,比如 Tomcat、Netty 等著名的开源软件,也都广泛应用了观察者模式,包括任何具有回调方法的软件中,都有观察者模式的思想。

非常重要的一个设计模式 

02-10 18:27