SpringBoot(java)实现websocket实现实时通信

一、认识WebSocket

WebSockets是一种在Web应用程序中实现实时通信的技术。它允许客户端和服务器之间建立持久的、双向的通信通道,从而使得服务器可以实时向客户端推送数据,而不需要客户端不断地向服务器发起请求。这种实时通信的能力对于需要即时更新数据的应用程序非常有用,比如在线聊天应用、实时游戏、股票市场更新等。

在使用WebSockets时,通常需要以下步骤:

  1. 建立连接:客户端向服务器发起WebSocket连接请求,服务器接受连接请求后,双方建立WebSocket连接。

  2. 通信:一旦建立了连接,客户端和服务器就可以通过该连接进行双向通信,可以发送和接收数据。

  3. 处理消息:客户端和服务器需要处理接收到的消息,并根据需要进行相应的操作。消息的格式和内容可以根据应用程序的需求来定义。

  4. 关闭连接:当通信结束时,可以通过发送关闭消息来关闭WebSocket连接,释放资源。

在实际开发中,可以使用各种现代Web框架和库来简化WebSocket的使用,例如:

  • 在前端,可以使用现代JavaScript框架如Vue.js、React.js或Angular来处理WebSocket连接和消息传递。
  • 在后端,常见的Web框架如Spring Boot(Java)、Express.js(Node.js)、Django(Python)等都提供了对WebSocket的支持。

    要实现实时通信,你需要在客户端和服务器端分别实现WebSocket连接的建立和消息的处理逻辑。具体的实现方式会根据你选择的编程语言、框架和库而有所不同。

    二、使用WebSocket(参照若依后端SpringBoot,前端Vue.js)

    1、在pom.xml中添加websocket依赖

     org.springframework.boot spring-boot-starter-websocket 

    2、配置匿名访问

    .antMatchers("/websocket/**").permitAll()

    3、若依的websocket实现实时通信代码

    (1)SemaphoreUtils.java

    package com.ruoyi.framework.websocket;
    import java.util.concurrent.Semaphore;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
     * 信号量相关处理
     * 
     * @author ruoyi
     */
    public class SemaphoreUtils
    {
        /**
         * SemaphoreUtils 日志控制器
         */
        private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class);
        /**
         * 获取信号量
         * 
         * @param semaphore
         * @return
         */
        public static boolean tryAcquire(Semaphore semaphore)
        {
            boolean flag = false;
            try
            {
                flag = semaphore.tryAcquire();
            }
            catch (Exception e)
            {
                LOGGER.error("获取信号量异常", e);
            }
            return flag;
        }
        /**
         * 释放信号量
         * 
         * @param semaphore
         */
        public static void release(Semaphore semaphore)
        {
            try
            {
                semaphore.release();
            }
            catch (Exception e)
            {
                LOGGER.error("释放信号量异常", e);
            }
        }
    }
    

    (2)WebSocketConfig.java

    package com.ruoyi.framework.websocket;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    /**
     * websocket 配置
     * 
     * @author ruoyi
     */
    @Configuration
    public class WebSocketConfig
    {
        @Bean
        public ServerEndpointExporter serverEndpointExporter()
        {
            return new ServerEndpointExporter();
        }
    }
    

    (3)WebSocketServer.java

    package com.ruoyi.framework.websocket;
    import java.util.concurrent.Semaphore;
    import javax.websocket.OnClose;
    import javax.websocket.OnError;
    import javax.websocket.OnMessage;
    import javax.websocket.OnOpen;
    import javax.websocket.Session;
    import javax.websocket.server.ServerEndpoint;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    /**
     * websocket 消息处理
     * 
     * @author ruoyi
     */
    @Component
    @ServerEndpoint("/websocket/message")
    public class WebSocketServer
    {
        /**
         * WebSocketServer 日志控制器
         */
        private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
        /**
         * 默认最多允许同时在线人数100
         */
        public static int socketMaxOnlineCount = 100;
        private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);
        /**
         * 连接建立成功调用的方法
         */
        @OnOpen
        public void onOpen(Session session) throws Exception
        {
            boolean semaphoreFlag = false;
            // 尝试获取信号量
            semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
            if (!semaphoreFlag)
            {
                // 未获取到信号量
                LOGGER.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);
                WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数:" + socketMaxOnlineCount);
                session.close();
            }
            else
            {
                // 添加用户
                WebSocketUsers.put(session.getId(), session);
                LOGGER.info("\n 建立连接 - {}", session);
                LOGGER.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size());
                WebSocketUsers.sendMessageToUserByText(session, "连接成功");
            }
        }
        /**
         * 连接关闭时处理
         */
        @OnClose
        public void onClose(Session session)
        {
            LOGGER.info("\n 关闭连接 - {}", session);
            // 移除用户
            WebSocketUsers.remove(session.getId());
            // 获取到信号量则需释放
            SemaphoreUtils.release(socketSemaphore);
        }
        /**
         * 抛出异常时处理
         */
        @OnError
        public void onError(Session session, Throwable exception) throws Exception
        {
            if (session.isOpen())
            {
                // 关闭连接
                session.close();
            }
            String sessionId = session.getId();
            LOGGER.info("\n 连接异常 - {}", sessionId);
            LOGGER.info("\n 异常信息 - {}", exception);
            // 移出用户
            WebSocketUsers.remove(sessionId);
            // 获取到信号量则需释放
            SemaphoreUtils.release(socketSemaphore);
        }
        /**
         * 服务器接收到客户端消息时调用的方法
         */
        @OnMessage
        public void onMessage(String message, Session session)
        {
            String msg = message.replace("你", "我").replace("吗", "");
            WebSocketUsers.sendMessageToUserByText(session, msg);
        }
    }
    

    (4)WebSocketUsers.java

    package com.ruoyi.framework.websocket;
    import java.io.IOException;
    import java.util.Collection;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import javax.websocket.Session;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
     * websocket 客户端用户集
     * 
     * @author ruoyi
     */
    public class WebSocketUsers
    {
        /**
         * WebSocketUsers 日志控制器
         */
        private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class);
        /**
         * 用户集
         */
        private static Map USERS = new ConcurrentHashMap();
        /**
         * 存储用户
         *
         * @param key 唯一键
         * @param session 用户信息
         */
        public static void put(String key, Session session)
        {
            USERS.put(key, session);
        }
        /**
         * 移除用户
         *
         * @param session 用户信息
         *
         * @return 移除结果
         */
        public static boolean remove(Session session)
        {
            String key = null;
            boolean flag = USERS.containsValue(session);
            if (flag)
            {
                Set> entries = USERS.entrySet();
                for (Map.Entry entry : entries)
                {
                    Session value = entry.getValue();
                    if (value.equals(session))
                    {
                        key = entry.getKey();
                        break;
                    }
                }
            }
            else
            {
                return true;
            }
            return remove(key);
        }
        /**
         * 移出用户
         *
         * @param key 键
         */
        public static boolean remove(String key)
        {
            LOGGER.info("\n 正在移出用户 - {}", key);
            Session remove = USERS.remove(key);
            if (remove != null)
            {
                boolean containsValue = USERS.containsValue(remove);
                LOGGER.info("\n 移出结果 - {}", containsValue ? "失败" : "成功");
                return containsValue;
            }
            else
            {
                return true;
            }
        }
        /**
         * 获取在线用户列表
         *
         * @return 返回用户集合
         */
        public static Map getUsers()
        {
            return USERS;
        }
        /**
         * 群发消息文本消息
         *
         * @param message 消息内容
         */
        public static void sendMessageToUsersByText(String message)
        {
            Collection values = USERS.values();
            for (Session value : values)
            {
                sendMessageToUserByText(value, message);
            }
        }
        /**
         * 发送文本消息
         *
         * @param userName 自己的用户名
         * @param message 消息内容
         */
        public static void sendMessageToUserByText(Session session, String message)
        {
            if (session != null)
            {
                try
                {
                    session.getBasicRemote().sendText(message);
                }
                catch (IOException e)
                {
                    LOGGER.error("\n[发送消息异常]", e);
                }
            }
            else
            {
                LOGGER.info("\n[你已离线]");
            }
        }
    }
    

    (5)webSocket.js

    /**
     * 参数说明:
     *  webSocketURL:String    webSocket服务地址    eg: ws://127.0.0.1:8088/websocket (后端接口若为restful风格可以带参数)  
     *  callback:为带一个参数的回调函数
     *  message:String 要传递的参数值(不是一个必要的参数)
     */
    export default{
        // 初始化webSocket
        webSocketInit(webSocketURL){      // ws://127.0.0.1:8088/websocket
            this.webSocket = new WebSocket(webSocketURL);
            this.webSocket.onopen = this.onOpenwellback;
            this.webSocket.onmessage = this.onMessageCallback;
            this.webSocket.onerror = this.onErrorCallback;
            this.webSocket.onclose = this.onCloseCallback;
        },
        // 自定义回调函数
        setOpenCallback(callback){ //  与服务端连接打开回调函数
            this.webSocket.onopen = callback;
        },
        setMessageCallback(callback){   //  与服务端发送消息回调函数
            this.webSocket.onmessage = callback;
        },
        setErrorCallback(callback){ //  与服务端连接异常回调函数
            this.webSocket.onerror = callback;
        },
        setCloseCallback(callback){ //  与服务端连接关闭回调函数
            this.webSocket.onclose = callback;
        },
        close(){    // 关闭连接
            this.webSocket.close();
        },
        sendMessage(message){   // 发送消息函数
            this.webSocket.send(message);
        },
    }
    

    (6)index.vue

      

    三、实现

    前后端同时打开:

    关闭后端:

     关闭前端:

    后端向前端发送信息(群发):

     WebSocketUsers.sendMessageToUsersByText(JSONObject.toJSONString(list));

     后端向前端发送信息(单发):

     WebSocketUsers.sendMessageToUserByText(value,JSONObject.toJSONString(list));

    参考:

    插件集成 | RuoYi