一.websocket协议介绍

1.websocket
      websocket是一种标准协议,用于客户端和服务端之间进行双向数据传输,它是一种基于TCP协议的独立实现;其最大的特点是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话。websocket借用http来完成一次握手,只需要一次HTTP握手,服务端就能一直与客户端保持通讯,直到关闭连接。

2.http的三次握手

websoket-LMLPHP

第一次握手: 建立连接,客户端A发送SYN=1、随机产生Seq=client_isn的数据包到服务器B,等待服务器确认。

第二次握手: 服务器B收到请求后确认联机(可以接受数据),发起第二次握手请求,ACK=(A的Seq+1)、SYN=1,随机产生Seq=client_isn的数据包到A。

第三次握手: A收到后检查ACK是否正确,若正确,A会在发送确认包ACK=服务器B的Seq+1、ACK=1,服务器B收到后确认Seq值与ACK值,若正确,则建立连接。

http的三次握手和四次挥手解析:

浏览器在给服务器传输数据之前,有三次握手,握手成功之后,才可以传输数据

websoket-LMLPHP

 

1、浏览器需要先发送SYN码,客户端请求和服务器建立连接;(客户端->问服务器在吗?->服务端)

2、服务器接收到SYN码,再发送给客户端SYN+ACK码,我可以建立连接;(服务端->在呢,咋啦?->客户端)

3、客户端接收到ACK码,验证这个ACK是否正确,如果正确则客户端和服务端则建立起数据连接;双方的数据发送通道都将开启;(客户端->我给你说我今天......->服务端)

为什么要经过三次握手呢?

      为了防止服务端开启一些无用的链接,网络传输是有延时的,中间可能隔着非常远的距离,通过光纤或者中间代理服务器等,客户端发送一个请求,服务端收到之后如果直接创建一个链接,返回内容给到客户端,因为网络传输原因,这个数据包丢失了,客户端就一直接收不到服务器返回的这个数据,超过了客户端设置的时间就关闭了,那么这时候服务端是不知道的,它的端口就会开着等待客户端发送实际的请求数据,服务这个开销也就浪费掉了。

四次挥手:

1、当客户端无数据要传输了,会发送FIN码告诉服务器,我发送完毕了;

2、当服务端接收完毕后,告诉客户端ACK码,告诉客户端你可以把数据通道关闭了;

3、当服务器发送完毕之后,也会发送FIN码,告诉浏览器,数据发送完毕;

4、当客户端接收完毕 之后,同样发送ACK码,告诉服务器,数据接收完毕,你可以关闭;

三次握手和四次挥手的好处:确保数据的安全和完整

WebSocket
   
   WebSocket实现了双工通信,就是在客户端和服务端上建立一个长久的连接,然后两边可以任意发送数据,它属于应用层的协议,基于TCP传输协议并复用HTTP的握手通道。

 WebSocket的优势:

支持双向通信,实时性更强;
更好的二进制支持;
较少的控制开销(连接创建后,ws客户端与服务端数据交换时,协议控制的数据包头部较少)。
 

二.典型的websocket握手过程

握手请求信息:

websoket-LMLPHP

#两行表示发起的是websocket协议;
Upgrade:websocket
Connection:Upgrade
#websocket-key是由浏览器随机生成的,是一个Base64编码的值
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
#websocket-version表示ws的版本,询问服务端是否支持该版本
Sec-WebSocket-Version:13

 握手响应信息:

websoket-LMLPHP

#101状态码表示服务器已经理解了客户端的请求
HTTP/1.1 101
#websocket-accept是经过服务器确认后的值
Sec-WebSocket-Accept:HSmrc0sMlYUkAGmm5OPpG2HaGWk==

 三.服务端实现例子

步骤一: springboot底层帮我们自动配置了websokcet,引入maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

步骤二:如果是你采用springboot内置容器启动项目的,则需要配置一个Bean。如果是采用外部的容器,则可以不需要配置。

import org.springframework.boot.web.servlet.ServletContextInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import javax.servlet.ServletContext;
import javax.servlet.ServletException;

/**
 * Created on 2023-03-15 9:12
 *
 * @author xiegongmiao
 * description:websocket配置,用于开启websocket支持
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig implements ServletContextInitializer {
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }

    @Override
    public void onStartup(ServletContext servletContext) throws ServletException {

    }
}

步骤三:编写服务端核心代码

websoketSever.java

import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import www.wensi.com.xiurtc.api.response.BaseResponse;
import www.wensi.com.xiurtc.api.response.Constant;
import www.wensi.com.xiurtc.common.MessageCacheUtil;
import www.wensi.com.xiurtc.config.ServerEncoder;
import www.wensi.com.xiurtc.entity.WsMessageCache;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;


/**
 * Created on 2023-03-16 16:31
 *
 * @author description:长连接
 */
@Component
@Data
@Slf4j
@ServerEndpoint(value = "/c/websocket/{mac}", encoders = {ServerEncoder.class})
public class WebSocketSever {

    private Session session;

    // 查询数据库用的服务
    private static WebsocketService websocketService;

    // 注入的时候,给类的 service 注入
    @Autowired
    public void setWebsocketService(WebsocketService websocketService) {
        WebSocketSever.websocketService = websocketService;
    }

    /**
     * 保存连接的会话
     */
    private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();

    // concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
    private static CopyOnWriteArraySet<WebSocketSever> webSocketSet = new CopyOnWriteArraySet<>();

    /**
     * 建立连接时调用
     *
     * @param session
     * @param config
     */
    @OnOpen
    public void onOpen(@PathParam("mac") String mac, Session session, EndpointConfig config) {
        log.info("webSocket connecting,connect cloud phone mac:{}", mac);
        try {
            Session historySession = sessionPool.get(mac);
            // historySession不为空,说明已经有人登陆账号,应该删除登陆的WebSocket对象
            if (historySession != null) {
                webSocketSet.remove(historySession);
                historySession.close();
            }
        } catch (IOException e) {
            log.error("mac:{},duplicate login exception,error message:{}", mac, e.getMessage());
        }
        // 建立连接
        this.session = session;
        webSocketSet.add(this);
        sessionPool.put(mac, session);
        log.info("mac:{},connection completed,the current number of people online is:{}", mac, webSocketSet.size());
        BaseResponse message = websocketService.getPublicPort(mac);
        sendMessageByMac(mac, message);
    }

    /**
     * 发生错误
     *
     * @param throwable e
     */
    @OnError
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
    }

    /**
     * 连接关闭
     */
    @OnClose
    public void onClose(@PathParam("mac") String mac) {
        webSocketSet.remove(this);
        sessionPool.remove(session);
        MessageCacheUtil.clearOnly(mac);
        log.info("mac:{},connection close,the current number of people online is:{}", mac, webSocketSet.size());
    }

    /**
     * 消息到达时调用
     *
     * @param mac
     * @param message
     */
    @OnMessage
    public void onMessage(@PathParam("mac") String mac, String message) {
        log.info("received a message from the client,mac:{},message:{}", mac, message);
        if (StringUtils.isBlank(message)) {
            return;
        }
        try {
            JSONObject json = JSONObject.parseObject(message);
            if ("state".equals(json.get("type"))) {
                websocketService.updateVmState(mac, json.getInteger("state"));
                return;
            } else if ("PING".equals(json.get("type"))) {
                heartCheck(mac);
                return;
            }
        } catch (Exception e) {
            MessageCacheUtil.deleteForExpired(Constant.TIME_OUT_WEBSOCKET_POLLING_CACHE_TIME);
            WsMessageCache wsMessageCache = new WsMessageCache();
            wsMessageCache.setMessage(message);
            wsMessageCache.setDate(new Date().getTime());
            MessageCacheUtil.setMessageCache(mac, wsMessageCache);
        }
    }

    /**
     * 推送消息到指定用户
     *
     * @param mac     云手机mac地址
     * @param message 发送的消息
     */
    public void sendMessageByMac(String mac, String message) {
        log.info("push content to cloud phone,mac:{},message:{}", mac, message);
        MessageCacheUtil.deleteForExpired(Constant.TIME_OUT_WEBSOCKET_POLLING_CACHE_TIME);
        Session session = sessionPool.get(mac);
        if (ObjectUtils.isEmpty(session)) {
            log.error("push content to cloud phone,cloud phone is not connected,mac:{},message:{}", mac, message);
            return;
        }
        try {
            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.error("push content to cloud phone,mac:{},message:{},error:{}", mac, message, e.getMessage());
        } catch (Exception e) {
            log.error("push content to cloud phone,mac:{},message:{},error:{}", mac, message, e.getMessage());
        }
    }

    /**
     * 推送消息到指定用户
     *
     * @param mac     云手机mac地址
     * @param message 发送的消息
     */
    public void sendMessageByMac(String mac, Object message) {
        log.info("push content to cloud phone,mac:{},message:{}", mac, message);
        MessageCacheUtil.deleteForExpired(Constant.TIME_OUT_WEBSOCKET_POLLING_CACHE_TIME);
        Session session = sessionPool.get(mac);
        try {
            session.getBasicRemote().sendObject(message);
        } catch (EncodeException | IOException e) {
            log.error("push content to cloud phone,mac:{},message:{},error:{}", mac, message, e.getMessage());
        }
    }

    /**
     * 群发消息
     *
     * @param message 发送的消息
     */
    public static void sendAllMessage(String message) {
        log.info("mass message:{}", message);
        for (WebSocketSever webSocket : webSocketSet) {
            try {
                webSocket.session.getBasicRemote().sendText(message);
            } catch (Exception e) {
                log.error("mass message:{},error:{}", message, e);
            }
        }
    }

    /**
     * 心跳检测机制
     *
     * @param mac mac地址
     */
    public static void heartCheck(String mac) {
        Map<String, Object> params = new HashMap<String, Object>();
        params.put("type", "PONG");
        Session session = sessionPool.get(mac);
        if (ObjectUtils.isEmpty(session)) {
            log.error("heart check,cloud phone is not connected,mac:{},message:{}", mac, params);
            return;
        }
        try {
            session.getBasicRemote().sendObject(params);
        } catch (IOException e) {
            log.error("heart check cloud phone,mac:{},message:{},error:{}", mac, params, e.getMessage());
        } catch (Exception e) {
            log.error("heart check cloud phone,mac:{},message:{},error:{}", mac, params, e.getMessage());
        }
    }
}
我这里没有用nosql缓存,自己编写了个MessageCacheUtil缓存工具类。用于缓存客户端发来的信息。

WsMessageCache.java
import lombok.Data;

@Data
public class WsMessageCache {

    private Long date;

    private String message;
}

MessageCacheUtil.java

import org.springframework.util.ObjectUtils;

import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;


public class MessageCacheUtil {
    private static Map<Object, WsMessageCache> cacheMap = new HashMap<Object, WsMessageCache>();

    /**
     * 单实例构造方法
     */
    private MessageCacheUtil() {
        super();
    }

    /**
     * 根据键获取时间long
     *
     * @param key
     * @return
     */
    public static WsMessageCache getMessageCache(String key) {
        return (WsMessageCache) cacheMap.get(key);
    }

    /**
     * 设置缓存
     *
     * @param key
     * @param wsMessageCache
     * @return
     */
    public synchronized static Map<Object, WsMessageCache> setMessageCache(String key, WsMessageCache wsMessageCache) {
        cacheMap.put(key, wsMessageCache);
        return cacheMap;
    }

    /**
     * 判断是否存在一个缓存
     *
     * @param key
     * @return
     */
    public synchronized static boolean hasCache(String key) {
        return cacheMap.containsKey(key);
    }

    /**
     * 判断是否存在缓存
     *
     * @param key
     * @return
     */
    public synchronized static boolean hasOneCache(String key) {
        return !ObjectUtils.isEmpty(cacheMap);
    }

    /**
     * 清除所有缓存
     */
    public synchronized static void clearAll() {
        cacheMap.clear();
    }

    /**
     * 清除指定的缓存
     *
     * @param key
     */
    public synchronized static void clearOnly(String key) {
        cacheMap.remove(key);
    }

    /**
     * 获取缓存中的大小
     *
     * @return
     */
    public static int getCacheSize() {
        return cacheMap.size();
    }

    /**
     * 删除过期消息
     *
     * @param time
     */
    public static void deleteForExpired(Long time) {
        Iterator<Entry<Object, WsMessageCache>> it = cacheMap.entrySet().iterator();
        while (it.hasNext()) {
            Entry<Object, WsMessageCache> entry = it.next();
            WsMessageCache value = entry.getValue();
            if ((new Date()).getTime() - (value.getDate()) >= time) {
                it.remove();
            }
        }
    }
}
WebSoketTestController.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import www.wensi.com.xiurtc.api.enums.StatusCode;
import www.wensi.com.xiurtc.api.response.BaseResponse;
import www.wensi.com.xiurtc.common.MessageCacheUtil;
import www.wensi.com.xiurtc.entity.WsMessageCache;
import www.wensi.com.xiurtc.service.WebSocketSever;
import www.wensi.com.xiurtc.utils.MessageUtils;
import www.wensi.com.xiurtc.utils.ResultUtil;

import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.TimeUnit;

@RestController
@Slf4j
@RequestMapping("/webSoketTest")
public class WebSoketTestController {
    @Autowired
    private WebSocketSever wsSever;

    @RequestMapping("/r/forCustomer/send")
    public BaseResponse getCandidate(HttpServletRequest request) {
        String message = "我的消息";
        String mac = "6e:66:88:08:00:01";
        // 发送消息给客户端
        wsSever.sendMessageByMac(mac, message);
        // 在缓存中,获取到客户端返回的信息
        BaseResponse response = getWsMessageCache(mac);
        return response;
    }

    /**
     * 获取缓存消息
     *
     * @param mac
     * @return
     */
    private BaseResponse getWsMessageCache(String mac) {
        Long time = 0L;
        BaseResponse response = ResultUtil.success();
        while (true) {
            // 如果时间达到没有设置成功,则超时
            if (time > 2000L) {
                String msg = "connect timed out by mac:" + mac;
                response.setstatus(StatusCode.FAIL.getStatus());
                response.setMessage(MessageUtils.message("error.connect.timed.out"));
                log.error(msg);
                break;
            }
            if (MessageCacheUtil.hasCache(mac)) {
                WsMessageCache wsMessageCache = MessageCacheUtil.getMessageCache(mac);
                if (ObjectUtils.isEmpty(wsMessageCache)) {
                    String msg = "connect timed out by mac:" + mac;
                    response.setstatus(StatusCode.FAIL.getStatus());
                    response.setMessage(MessageUtils.message("error.connect.timed.out"));
                    log.error(msg);
                    break;
                }
                String messageCache = wsMessageCache.getMessage();
                response.setData(messageCache);
                MessageCacheUtil.clearOnly(mac);
                log.info("send message to client success,mac:{},response:{}", mac, response);
                break;
            } else {
                try {
                    TimeUnit.MILLISECONDS.sleep(100L);
                    time = time + 100L;
                } catch (InterruptedException e) {
                    String msg = "connect timed out by mac:" + mac;
                    response.setstatus(StatusCode.FAIL.getStatus());
                    response.setMessage(MessageUtils.message("error.connect.timed.out"));
                    log.error(msg);
                    break;
                }
                continue;
            }
        }
        return response;
    }
}

启动服务

测试页面
http://www.jsons.cn/websocket

ws://localhost:9998/dc-xiurtc/c/websocket/6e:66:88:08:00:01

websoket-LMLPHP

 

05-26 06:39