支持持续接收数、可发送数据、可多端口连接。
废话少说,直接上代码!
如果写的可以,记得点个赞~

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;

@Slf4j
@AllArgsConstructor
public class TcpIpService {

    // 保存消息数据的容器,会单独开一个线程进行监听里面是否有数据然后进行业务处理
    //(我设置的最大存放个数,视需求而定)
    public static ArrayBlockingQueue<HashMap> messageQueue = new ArrayBlockingQueue<HashMap>(10);

    /**
     * 开始加载监听Tcp/Ip端口
     * @throws IOException
     */
    public static void ServerSocketD(int portNumber) throws IOException {
        MQMonitor mqMonitor = new MQMonitor();
        mqMonitor.start();

        InterlinkageMonitorServer thread = new InterlinkageMonitorServer(portNumber);
        thread.start();
    }

    /**
     * 发送消息
     * @param data
     * @return
     * @throws IOException
     */
    public static ReturnDataState giveOrder(String data) throws IOException, InterruptedException {
        //向客户端发送消息
        try {
            log.info("发:"+data);
            ServerThread.outputStream = ServerThread.socket.getOutputStream();
            ServerThread.outputStream.write(data.getBytes("GBK"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return new ReturnDataState(0, "");
    }
}

/**
 * 监听用户链接
 */
@Slf4j
class InterlinkageMonitorServer extends Thread {
    //监听端口
    private static int PORT = 0;

    public static Socket socket;

    /**
     * @param portNumber:端口号
     */
    public InterlinkageMonitorServer(int portNumber){
        this.PORT = portNumber;
    }

    @SneakyThrows
    public void run(){
        log.info("TcpIp消息:>>>>>>>>>>>>>>> 开始监听用户链接 <<<<<<<<<<<<<");
        ServerSocket serverSocket = null;
        try {
            //建立服务器的 Socket,并设定一个监听的端口 PORT
            if (PORT<1024){
                log.error("TcpIp消息:监听的端口数值不能为0或小于1024");
            }else if (PORT > 65535){
                log.error("TcpIp消息:监听的端口数值不能大于65535");
            }
            serverSocket = new ServerSocket(PORT);
            //由于需要进行循环监听,因此获取消息的操作应放在一个 while 大循环中
            while(true){
                try {
                    //建立跟客户端的连接
                    socket = serverSocket.accept();
                } catch (Exception e) {
                    log.info("TcpIp消息:建立与客户端的连接出现异常");
                    e.printStackTrace();
                }
                if (socket != null){
                    log.info("TcpIp消息:>>>>>>>>>>>>>>> 有客户端链接,开启新线程 <<<<<<<<<<<<<");
                    //注:视需求做,我们的需求是就没什么人链接,就新开了一个线程做处理
                    MessageServerThread thread = new MessageServerThread(socket);
                    thread.start();
                    socket = null;
                }
                // 不能让他跑的太快,需要在一定程度上让他跑慢点(视需求而定)
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            serverSocket.close();
        }
    }
}

/**
 * 每一个链接上的,都会单独进行监听消息
 */
@Slf4j
class MessageServerThread extends Thread {
    public static Socket socket ;
    InputStream inputStream;

    public  MessageServerThread(Socket socket){
        this.socket=socket;
    }
    public void run(){
        try {
            //接收客户端的消息并打印
            inputStream = socket.getInputStream();
            byte[] bytes = new byte[5120];

            while (inputStream.read(bytes) != -1){
                //解决乱码的问题
                String string = new String(bytes, "GB2312");
                //解决 byte 数组为空或者填不满的问题
                HashMap originalData = JSON.parseObject(string.trim(), HashMap.class);
                log.info("收:"+originalData.toString());

                // 消息存入
                produce(originalData);

                //推送后清空数组
                bytes = new byte[5120];
            }
        } catch (Exception e) {
            log.error("WebSocket消息:客户端的主动断开连接了,关闭线程");
        }
        //操作结束,关闭socket
        try{socket.close();}catch(IOException e){ log.error("WebSocket消息:关闭连接出现异常"); }
    }

    // 生产消息
    public static void produce(HashMap msg) {
        if (Server.messageQueue.offer(msg)) {
            log.info("MQ消息:成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + Server.messageQueue.size());
        } else {
            log.info("MQ消息:消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
        }
        log.info("=======================");
    }
}

/**
 * 收到数据后的业务处理
 */
@Slf4j
class MQMonitor extends Thread {

    public MQMonitor() {}

    @SneakyThrows
    public void run(){
        while (true) {
            // 取出收到的值
            HashMap<String, Object> consume = consume();
            while (!ObjectUtils.isEmpty(consume)) {
                log.info("MQ消息:开始消化数据");
                // 业务内容。。。。。
                break;
            }
            // 不为空时会快速对数据进行处理
            if (TcpIpService.messageQueue.size() == 0)
                Thread.sleep( 1000 );
        }
    }

    // 消费消息
    public static HashMap consume() {
        HashMap msg = Server.messageQueue.poll();
        if (msg != null) {
            // 消费条件满足情况,从消息容器中取出一条消息
            log.info("MQ消息:已经消费消息:" + msg + ",当前暂存的消息数量是:" + Server.messageQueue.size());
        } else {
            log.info("MQ消息:消息处理中心内没有消息可供消费!");
        }
        log.info("=======================");
        return msg;
    }
}
12-01 13:32