1. 需要实现目标

  1. 用户在登录后会在后台记录在线状态
  2. 用户在登录的情况下会收到系统推送的消息通知

    2. Maven依赖

    在pom.xml中添加如下依赖
    <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    3. WebSocketConfig

    import javax.websocket.server.HandshakeRequest;
    import javax.websocket.server.ServerEndpointConfig;
    /**
    * Create by Alden He on 2019/5/31
    */
    @Configuration
    public class WebSocketConfig extends ServerEndpointConfig.Configurator {
     @Override
     public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response){
         HttpSession httpSession=(HttpSession)request.getHttpSession();
         if(httpSession!=null){
             sec.getUserProperties().put(HttpSession.class.getName(), httpSession);
         }
     }
     @Bean
     public ServerEndpointExporter serverEndpointExporter(){
         return new ServerEndpointExporter();
     }
    }

    4. WebSocketController

    package com.nevergetme.nevergetmeweb.controller;
    import com.nevergetme.nevergetmeweb.bean.SystemMessage;
    import com.nevergetme.nevergetmeweb.bean.User;
    import com.nevergetme.nevergetmeweb.config.WebSocketConfig;
    import com.nevergetme.nevergetmeweb.service.SystemMessageService;
    import com.nevergetme.nevergetmeweb.utility.ContentUtility;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import javax.servlet.http.HttpSession;
    import javax.websocket.*;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.util.concurrent.CopyOnWriteArraySet;
    /**
    * Create by Alden He on 2019/5/31
    */
    @ServerEndpoint(value = "/websocket",configurator = WebSocketConfig.class)
    @Component
    public class WebSocketController {
    //    @Autowired
    //    private SystemMessageService systemMessageService;
     private static int onlineCount=0;
     private static CopyOnWriteArraySet<WebSocketController> webSocketControllers=new CopyOnWriteArraySet<>();
     private Session session;
     private HttpSession httpSession;
     private boolean isSuccess=false;
     private int userId;
     private boolean hasAlreadyInArraySet(User user){
         for(WebSocketController w:webSocketControllers){
             if(w.userId==user.getId()){
                 return true;
             }
         }
         return false;
     }
     @OnOpen
      /**
        * @description: 目前是如果登录的情况下,每次打开一个页面,就会将这个socket添加到Set中
        * @param [session, config]
        * @return void
        * @throws
        * @author Alden He
        * @date 2019/6/1 18:31
        */
     public void onOpen(Session session, EndpointConfig config) throws IOException {
         HttpSession httpSession=(HttpSession)config.getUserProperties().get(HttpSession.class.getName());
         User user;
         if((user= ContentUtility.getCurrentUserBySession(httpSession))!=null&&!hasAlreadyInArraySet(user)){
             this.session=session;
             this.httpSession=httpSession;
             isSuccess=true;
             userId=user.getId();
             webSocketControllers.add(this);
             addOnlineCount();
             System.out.println("目前在线人数为:"+getOnlineCount());
    //            sendMessage(new SystemMessage("目前在线人数为:"+getOnlineCount()));
         }else{
             try {
                 session.close();
             } catch (IOException e) {
                 e.printStackTrace();
             }
         }
     }
     @OnClose
     public void onClose(){
         webSocketControllers.remove(this);
         subOnlineCount();
         System.out.println("离线,目前在线人数为:"+getOnlineCount());
     }
     @OnMessage
     public void onMessage(String message, Session session)throws IOException {
         System.out.println("来自客户端的消息:"+message);
     }
     public void sendMessage(SystemMessage systemMessage)throws IOException{
         this.session.getBasicRemote().sendText(systemMessage.getContent());
         systemMessage.setSendState(1);
    //        systemMessageService.addSystemMessage(systemMessage);
     }
     public synchronized void addOnlineCount(){
         WebSocketController.onlineCount++;
     }
     public synchronized int getOnlineCount(){
         return WebSocketController.onlineCount;
     }
     public synchronized void subOnlineCount(){
         if(isSuccess)
             WebSocketController.onlineCount--;
     }
     public static boolean sendMessageToUser(SystemMessage systemMessage){
         System.out.println("Begin to Send Info:"+systemMessage.getContent());
         for(WebSocketController w:webSocketControllers){
             if(systemMessage.getReceiver()==w.userId){
                 try {
                     if(w.isSuccess){
                         w.sendMessage(systemMessage);
                         return true;
                         //systemMessage.setSendState(1);
                     }
                 } catch (IOException e) {
                     e.printStackTrace();
                 }
                 break;
             }
         }
         return false;
     }
    }

    5. RequestListener

    需要把Session转为HttpSession。这样websocket才能够使用
    package com.nevergetme.nevergetmeweb.listener;
    import org.springframework.stereotype.Component;
    import javax.servlet.ServletRequestEvent;
    import javax.servlet.ServletRequestListener;
    import javax.servlet.annotation.WebListener;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpSession;
    /**
    * Create by Alden He on 2019/5/31
    */
    @WebListener
    public class RequestListener implements ServletRequestListener {
     @Override
     public void requestInitialized(ServletRequestEvent sre){
         HttpSession httpSession= ((HttpServletRequest) sre.getServletRequest()).getSession();
     }
     public RequestListener() {
     }
     public void requestDestroyed(ServletRequestEvent arg0)  {
     }
    }

    6. SystemMessage Bean

    package com.nevergetme.nevergetmeweb.bean;
    import lombok.Data;
    import java.io.Serializable;
    /**
    * Create by Alden He on 2019/6/1
    */
    @Data
    public class SystemMessage implements Serializable {
     private static final long serialVersionUID = 1L;
     private int id;
     private int receiver;
     private String content;
     private int sendState;
     private int readState;
     private String createTime;
     private int type;
     private int url;
     private int sender;
     public SystemMessage(){}
     public SystemMessage(String content){
         this.content=content;
     }
     public SystemMessage(int receiver,int sender,String content,int type,int url){
         this.receiver=receiver;
         this.sender=sender;
         this.content=content;
         sendState=0;
         readState=0;
         this.type=type;
         this.url=url;
     }
    }

    7. SystemMessageService

    package com.nevergetme.nevergetmeweb.service;
    import com.nevergetme.nevergetmeweb.bean.SystemMessage;
    import java.util.List;
    /**
    * Create by Alden He on 2019/6/1
    */
    public interface SystemMessageService {
     void addSystemMessage(SystemMessage message);
     void updateSystemMessageReadState(int id);
     void updateSystemMessageSendState(int id);
     List<SystemMessage> readSystemMessage(int receiver);
    }

    8. SystemMessageServiceImpl

    package com.nevergetme.nevergetmeweb.service.impl;
    import com.nevergetme.nevergetmeweb.bean.SystemMessage;
    import com.nevergetme.nevergetmeweb.mapper.SystemMessageMapper;
    import com.nevergetme.nevergetmeweb.service.SystemMessageService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import java.util.List;
    /**
    * Create by Alden He on 2019/6/1
    */
    @Service
    public class SystemMessageServiceImpl implements SystemMessageService {
     @Autowired
     private SystemMessageMapper messageMapper;
     @Override
     public void addSystemMessage(SystemMessage message) {
         messageMapper.addSystemMessage(message);
     }
     @Override
     public void updateSystemMessageReadState(int id) {
         messageMapper.updateSystemMessageReadState(id);
     }
     @Override
     public void updateSystemMessageSendState(int id) {
         messageMapper.updateSystemMessageSendState(id);
     }
     @Override
     public List<SystemMessage> readSystemMessage(int receiver) {
         return messageMapper.readSystemMessage(receiver);
     }
    }

    9. Mapper

    <?xml version="1.0" encoding="UTF-8" ?>
    <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
    <mapper namespace="com.nevergetme.nevergetmeweb.mapper.SystemMessageMapper">
     <insert id="addSystemMessage" parameterType="com.nevergetme.nevergetmeweb.bean.SystemMessage">
         insert into SystemMessage(receiver,content,sendState,readState,type,url,sender)
         values(#{receiver},#{content},#{sendState},#{readState},#{type},#{url},#{sender})
     </insert>
     <update id="updateSystemMessageReadState" parameterType="java.lang.Integer">
         update SystemMessage set readState=1 where id=#{id}
     </update>
     <update id="updateSystemMessageSendState" parameterType="java.lang.Integer">
         update SystemMessage set sendState=1 where id=#{id}
     </update>
     <select id="readSystemMessage" parameterType="java.lang.Integer" resultType="com.nevergetme.nevergetmeweb.bean.SystemMessage">
         select * from SystemMessage where receiver=#{receiver} order by id desc
     </select>
    </mapper>

    10. JS

    因为服务器是https,因此需要通过wss连接websocket
    // var wsUri = "ws://localhost:8088/websocket";
    var wsUri = "wss://www.nevergetme.com/websocket";
    function debug(message) {
     console.log(message);
    }
    function sendMessage(websocket,msg) {
     if (websocket != null) {
         websocket.send(msg);
         console.log("string sent :", '"' + msg + '"');
     } else {
         //initWebSocket();
         websocket.send(msg);
         console.log("string sent :", '"' + msg + '"');
     }
     // stopWebSocket();
    }
    function initWebSocket() {
     let websocket;
     try {
         if (typeof MozWebSocket == 'function')
             WebSocket = MozWebSocket;
         if (websocket && websocket.readyState == 1)
             websocket.close();
         websocket = new WebSocket(wsUri);
         websocket.onopen = function (evt) {
             // sendMessage("1");
             debug("CONNECTED");
         };
         websocket.onclose = function (evt) {
             debug("DISCONNECTED");
         };
         websocket.onmessage = function (evt) {
             console.log("Message received :", evt);
             changeMessageIcon();
             //$("#HeaderMessageButton").text('消息'+addMessageNotify());
         };
         websocket.onerror = function (evt) {
             debug('ERROR: ' + evt.data);
         };
     } catch (exception) {
         debug('ERROR: ' + exception);
     }
     return websocket;
    }
    function stopWebSocket(websocket) {
     if (websocket)
         websocket.close();
    }
    function checkSocket(websocket) {
     if (websocket != null) {
         var stateStr;
         switch (websocket.readyState) {
             case 0: {
                 stateStr = "CONNECTING";
                 break;
             }
             case 1: {
                 stateStr = "OPEN";
                 break;
             }
             case 2: {
                 stateStr = "CLOSING";
                 break;
             }
             case 3: {
                 stateStr = "CLOSED";
                 break;
             }
             default: {
                 stateStr = "UNKNOW";
                 break;
             }
         }
         debug("WebSocket state = " + websocket.readyState + " ( " + stateStr + " )");
     } else {
         debug("WebSocket is null");
     }
    }

    11. MySQL的系统通知表

    CREATE TABLE `SystemMessage` (
    `id` int(11) NOT NULL AUTO_INCREMENT,
    `receiver` int(11) NOT NULL,
    `content` varchar(255) NOT NULL,
    `sendState` tinyint(1) NOT NULL,
    `readState` tinyint(1) NOT NULL,
    `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
    `type` int(2) NOT NULL,
    `url` int(11) NOT NULL DEFAULT '0',
    `sender` int(11) NOT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8

    12. Nginx.conf

    一下设置是添加在http中的,这个搞了我好长时间
    map $http_upgrade $connection_upgrade {
     default upgrade;
     ''      close;
    }
    location /websocket {
     proxy_pass http://www.nevergetme.com:8080;
     proxy_http_version 1.1;
     proxy_set_header Upgrade $http_upgrade;
     proxy_set_header Connection $connection_upgrade;
    }

    12. 补充Websocket协议

    一个典型的websocket协议格式如下图
    Request
    图片说明
    Response
    图片说明
    websocket的优点
    比平常的http头多的部分是
    Upgrade: websocket
    Connection: Upgrade
  3. 支持双向通信
  4. 控制开销较小,连接创建后,ws客户端、服务端进行数据交换时,协议控制的数据包头部较小。在不包含头部的情况下,服务端到客户端的包头只有2~10字节(取决于数据包长度),客户端到服务端的的话,需要加上额外的4字节的掩码。而HTTP协议每次通信都需要携带完整的头部。