Loading...
墨滴
m

mingchiuli

2022/01/04  阅读:38  主题:默认主题

使用最新js,完成WebSocket在spring中的使用

WebSocket

WebSocket是一种全双工协议,一般用于通讯和游戏等方面。

对于http协议而言,客户端主动发送一个请求,服务器才能给客户端一个响应,如果客户端没有向服务器发送请求,那么服务器就不会给客户端响应。比如在某些购物网站上,你点一下屏幕数据才会变一下,你不点的话就不会有什么变化。

而在WebSocket协议里,服务器可以主动给客户端响应数据,客户端也可主动给服务器发请求,比如微信,你很可能什么都没干,就收到了一条消息,这种很可能就不是http协议了。之所以说是很可能,是因为有可能客户端使用http协议,但用定时轮询的方式向服务器发请求,从而收到了响应,但这样造成的客户端和服务器开销无疑是比较大的。使用WebSocket协议,可以有效降低开销,同时也可以更好的实时同步请求和响应。

ws实现:原生API

java本身提供了一些实现websocket协议的jar包,在javax.websocket里。

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author mingchiuli
 * @create 2021-12-21 11:11 AM
 */

@Component
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

在spring的上下文容器里配置一个bean,就可以了。

下面是一个demo,++内容不重要++,重点是用法:

import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.markerhub.entity.User;
import com.markerhub.service.UserService;
import com.markerhub.shiro.JwtToken;
import com.markerhub.util.JwtUtils;
import com.markerhub.util.SpringUtil;
import io.jsonwebtoken.Claims;
import lombok.SneakyThrows;
import org.apache.shiro.authc.AuthenticationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.net.SocketException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


@ServerEndpoint(value = "/imserver/{username}/{token}")
@Component
public class WebSocketServer {

    private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);

    /**
     * 记录当前在线连接数
     */

    public static final Map<String, Session> sessionMap = new ConcurrentHashMap<>();

    /**
     * 连接建立成功调用的方法
     */

    @OnOpen
    public synchronized void onOpen(Session session, @PathParam("token") String token, @PathParam("username") String username) {

        //先认证
        authentication(username, token);

        sessionMap.put(username, session);
        log.info("有新用户加入,username={}, 当前在线人数为:{}", username, sessionMap.size());

        JSONObject result = new JSONObject();
        JSONArray array = new JSONArray();
        result.set("users", array);
        UserService userService = SpringUtil.getBean(UserService.class);
        int num = 0;
        for (Object key : sessionMap.keySet()) {

            String avatar = userService.getOne(new QueryWrapper<User>().eq("username", key)).getAvatar();
            JSONObject jsonObject = new JSONObject();
            jsonObject.set("username", key);
            jsonObject.set("avatar", avatar);
            jsonObject.set("number", ++num);

            array.add(jsonObject);
        }
//        {"users": [{"username": "zhang"},{ "username": "admin"}]}
        sendAllMessage(JSONUtil.toJsonStr(result));  // 后台发送消息给所有的客户端
    }

    /**
     * 连接关闭调用的方法
     */

    @OnClose
    public void onClose(@PathParam("username") String username) {
        sessionMap.remove(username);
        log.info("有一连接关闭,移除username={}的用户session, 当前在线人数为:{}", username, sessionMap.size());
    }

    /**
     * 收到客户端消息后调用的方法
     * 后台收到客户端发送过来的消息
     * onMessage 是一个消息的中转站
     * 接受 浏览器端 socket.send 发送过来的 json数据
     * @param message 客户端发送过来的消息
     */

    @SneakyThrows
    @OnMessage
    public void onMessage(String message, Session session, @PathParam("username") String username) {
        log.info("服务端收到用户username={}的消息:{}", username, message);
        JSONObject obj = JSONUtil.parseObj(message);

        if (obj.containsKey("to") && obj.containsKey("from") && obj.containsKey("text")) {
            String toUsername = obj.getStr("to"); // to表示发送给哪个用户,比如 admin
            String text = obj.getStr("text"); // 发送的消息文本  hello
            // {"to": "admin", "text": "聊天文本"}
            Session toSession = sessionMap.get(toUsername); // 根据 to用户名来获取 session,再通过session发送消息文本
            if (toSession != null) {
                // 服务器端 再把消息组装一下,组装后的消息包含发送人和发送的文本内容
                // {"from": "zhang", "text": "hello"}
                JSONObject jsonObject = new JSONObject();
                jsonObject.set("from", username);  // from 是 zhang
                jsonObject.set("text", text);  // text 同上面的text
                this.sendMessage(jsonObject.toString(), toSession);
                log.info("发送给用户username={},消息:{}", toUsername, jsonObject.toString());
            } else {
                log.info("发送失败,未找到用户username={}的session", toUsername);
            }
        } else if (obj.containsKey("content") && obj.containsKey("from") || obj.containsKey("dead") && obj.get("from").equals("admin")) {
            for (Map.Entry<String, Session> entry : sessionMap.entrySet()) {
                if (!entry.getKey().equals(username)) {
                    entry.getValue().getBasicRemote().sendText(message);
                }
            }
        }
    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }

    /**
     * 服务端发送消息给客户端
     */

    private void sendMessage(String message, Session toSession) {
        try {
            log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
            toSession.getBasicRemote().sendText(message);
        } catch (Exception e) {
            log.error("服务端发送消息给客户端失败", e);
        }
    }

    /**
     * 服务端发送消息给所有客户端
     */

    private void sendAllMessage(String message) {
        try {
            for (Session session : sessionMap.values()) {
                log.info("服务端给客户端[{}]发送消息{}", session.getId(), message);
                session.getBasicRemote().sendText(message);
            }
        } catch (Exception e) {
            log.error("服务端发送消息给客户端失败", e);
        }
    }


    @SneakyThrows
    private void authentication(String username, String token) {

        JwtUtils jwtUtils = SpringUtil.getBean(JwtUtils.class);

        JwtToken jwtToken = new JwtToken(token);

        Claims claim = jwtUtils.getClaimByToken((String) jwtToken.getCredentials());

        if (claim == null || jwtUtils.isTokenExpired(claim.getExpiration())){
            throw new AuthenticationException("身份过期");
        }

        UserService userService = SpringUtil.getBean(UserService.class);

        String userId = claim.getSubject();
        User user = userService.getOne(new QueryWrapper<User>().eq("id", userId));

        if (!username.equals(user.getUsername())) {
            throw new AuthenticationException("连接拒绝");
        }

        if (sessionMap.size() == 2) {
            throw new SocketException("人数已满");
        }
    }
}

用@ServerEndpoint这个注解去定义ws协议的接入点,@Component 注入容器。这里重点有这样四个注解,这四个注解标记方法的参数都可以直接把session写进去。向客户端推送消息的时候,需要用到session,通过session.getBasicRemote().sendText(message)的方式向指定的session对应的客户端推送消息,这样发送消息的时候,就只有对应的客户端可以收到消息了,因为在客户端和session已经对应了。

  1. @OnOpen: 连接建立成功调用的方法。当前端写ws协议代码建立连接成功以后,就会执行这个注解标记的方法。也就是说,如果业务场景是一个聊天室,那么每个进入的用户都会执行一次这个方法。所以一般来讲是用来构建会话的。这里会话全部用一个ConcurrentHashMap去装载了。相应的,用户加入聊天室初始化需要什么功能都可以在这个方法里实现。

  2. @OnClose: 连接关闭调用的方法。对于每一个用户,连接关闭的时候都会执行这个方法,所以这里可以从ConcurrentHashMap把相应的用户会话移除掉。

  3. @OnMessage: 收到客户端消息后调用的方法,客户端向服务器发送的消息都会进入到这个方法里。所以如果客户端会向服务器发送很多类型的消息,就要做好甄别了。

  4. @OnError: 顾名思义,发生错误的时候调用。一般也没什么用。

总体来讲,服务器相当于一个消息的中转站,一个客户端想要向另一个客户端发送消息,必须经过服务器进行转发。服务器转发以后,客户端需要对消息进行监听,监听就在前端了。

用这种方式搭建ws服务,可以看到bean是不能直接注入的,需要使用的时候以SpringUtil.getBean(JwtUtils.class)的方式动态获取。

原生API前端

前端全都是Vue。++内容不重要++,重点是用法。因为后端使用的是原生API,那前端也用原生,贯彻到底了。:(

<script>

let socket;

export default {
  name"Cooperate",
  data() {
    return {
      user: {},
      users: [],
      chatUser'',
      text"",
      messages: [],
      contentBlank1'',
      contentBlank2'',
      content1'',
      content2'',
      created'',

      ruleForm: {
        id'',
        title'',
        description'',
        content''
      },

      rules: {
        title: [
          {requiredtruemessage'请输入标题'trigger'blur'},
          {min3max25message'长度在 3 到 25 个字符'trigger'blur'}
        ],
        description: [
          {requiredtruemessage'请输入摘要'trigger'blur'}
        ],
        content: [
          {requiredtruemessage'请输入内容'trigger'blur'}
        ]
      },
    }
  },

  created() {
    this.init()
  },
  watch: {
    content1: {
      handlerfunction ({
        this.sync()
      },
      deeptrue
    },
    content2: {
      handlerfunction ({
        this.sync()
      },
      deeptrue
    },

  },

  methods: {

    sync() {
      if (typeof (WebSocket) == "undefined") {
        console.log("您的浏览器不支持WebSocket");
      } else {
        console.log("您的浏览器支持WebSocket");
        // 组装待发送的消息 json
        // {"from": "zhang", "to": "admin", "text": "聊天文本"}

        let num = this.user.number
        let target = 'content' + num
        let message = {fromthis.user.username, contentthis[target]}
        socket.send(JSON.stringify(message));  // 将组装好的json发送给服务端,由服务端进行转发
        // 构建消息内容,本人消息
      }
    },





    send() {
      if (!this.chatUser) {
        this.$message({type'warning'message"请选择聊天对象"})
        return;
      }
      if (!this.text) {
        this.$message({type'warning'message"请输入内容"})
      } else {
        if (typeof (WebSocket) == "undefined") {
          console.log("您的浏览器不支持WebSocket");
        } else {
          console.log("您的浏览器支持WebSocket");
          // 组装待发送的消息 json
          // {"from": "zhang", "to": "admin", "text": "聊天文本"}
          let message = {fromthis.user.username, tothis.chatUser, textthis.text}
          socket.send(JSON.stringify(message));  // 将组装好的json发送给服务端,由服务端进行转发
          this.text = ''
          // 构建消息内容,本人消息
        }
      }
    },


    init() {
      let _this = this;
      let blogId = this.$route.params.blogId
      this.$axios.get('/blogAuthorized/' + blogId,{
        headers: {
          "Authorization": sessionStorage.getItem("myToken")
        }
      }).then(res => {
        const blog = res.data.data
        _this.ruleForm.id = blog.id
        _this.ruleForm.title = blog.title
        _this.ruleForm.description = blog.description
        _this.content1 = blog.content
        _this.created = blog.created
      })
      // this.user = sessionStorage.getItem("myUserInfo") ? JSON.parse(sessionStorage.getItem("myUserInfo")) : {}
      let username = JSON.parse(sessionStorage.getItem("myUserInfo")).username;
      console.log(username)

      if (typeof (WebSocket) == "undefined") {
        console.log("您的浏览器不支持WebSocket");
      } else {
        console.log("您的浏览器支持WebSocket");
        let token = sessionStorage.getItem("myToken")
        let socketUrl = "ws://localhost:8081/imserver/" + username + '/' + token;
        // let socketUrl = "ws://119.91.233.182:8081/imserver/" + username + '/' + token;
        if (socket != null) {
          socket.close();
          socket = null;
        }
        // 开启一个websocket服务

        socket = new WebSocket(socketUrl);
        //打开事件
        socket.onopen = function ({
          console.log("websocket已打开");
        };
        //  浏览器端收消息,获得从服务端发送过来的文本消息
        socket.onmessage = function (msg{
          console.log("收到数据====" + msg.data)
          let data = JSON.parse(msg.data)
          console.log(data)
          // 对收到的json数据进行解析, 类似这样的: {"users": [{"username": "zhang"},{ "username": "admin"}]}
          if (data.users) {  // 获取在线人员信息
            // 获取当前连接的所有用户信息,并且排除自身,自己不会出现在自己的聊天列表里
            for (let i = 0, j = 0; i < data.users.length; i++) {

              if (data.users[i].username !== username) {

                _this.$set(_this.users, j , data.users[i])

                j++
              } else {
                _this.user = data.users[i]
              }
            }

            console.log(_this.user)
            // _this.users = data.users.filter(user => user.username !== username)  // 获取当前连接的所有用户信息,并且排除自身,自己不会出现在自己的聊天列表里
          } else if (data.content !== undefined) {
            console.log(data.content)
            console.log(_this.ruleForm)

            let target = 'contentBlank'
            for (let i = 0; i < _this.users.length; i++) {
              if (_this.users[i].username === data.from) {
                target += _this.users[i].number
                break
              }
            }

            // console.log(data)
            // console.log(target)
            // console.log(data.content)
            // console.log(_this[target])
            // _this[target] = data.content.content1

            _this[target] = data.content

          } else if (data.dead === 'dead') {
            socket.close()
            _this.$router.push('/blogs/1')
          } else {
            console.log(data.text)
            _this.$message({
              showClosetrue,
              message: data.from + '对你说: ' + data.text,
              type'success',
              duration20 * 1000,
              onClose:() => {
              }
            });
          }
        };
        //关闭事件
        socket.onclose = function ({
          console.log("websocket已关闭");
        };
        //发生了错误事件
        socket.onerror = function ({
          console.log("websocket发生了错误");
        }
      }
    }
  }
}

</script>

除了一些乱七八糟的东西,重点其实在以下几点:

  • 首先,用socket = new WebSocket(socketUrl)的方式开启ws的服务。

  • socket.onopen: 执行服务器的@onopen标记的方法,其他没什么用,就是在控制台输出一句话。

  • socket.onmessage: 浏览器端收消息,获得从服务端发送过来的文本消息。服务端发送过来所有消息,都会进入到这个方法。也就是说这个方法的方法体内,才是监听的核心内容。客户端会用这种方式监听服务器转发过来的消息,把消息的内容获取以后进行相应操作。这里,不同消息可以通过消息内容的json格式进行区分。

  • socket.onclose和socket.onerror: 调用的时候也会调用后端相应注解的方法。

ws实现:spring的API

Spring比较强大的地方就在于对很多操作都进行了很好的封装,比如数据库操作有JDBCTemplate,缓存操作有RedisTemplate,ES有ElasticSearchRestTemplate,消息队列有amqpTemplate等等,所以对于WebSocket消息的对应操作,Spring也提供了SimpMessageTemplate进行操作,可以让消息的转发更加灵活:

import com.markerhub.shiro.JwtToken;
import com.markerhub.util.JwtUtils;
import io.jsonwebtoken.Claims;
import lombok.NonNull;
import org.apache.shiro.authc.AuthenticationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * @author mingchiuli
 * @create 2021-12-21 11:11 AM
 */

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{

    JwtUtils jwtUtils;

    @Autowired
    public void setJwtUtils(JwtUtils jwtUtils) {
        this.jwtUtils = jwtUtils;
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        //客户端订阅消息的前缀
        registry.enableSimpleBroker("/topic""/queue""/user");
        //用户级别订阅消息的前缀(默认已经配了)
//        registry.setUserDestinationPrefix("/user");

    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptor() {
            @Override
            public Message<?> preSend(@NonNull Message<?> message, @NonNull MessageChannel channel) {
                StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

                if (accessor != null) {
                    if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                        String token = accessor.getFirstNativeHeader("Authorization");
                        //验证token是否有效
                        JwtToken jwtToken = new JwtToken(token);
                        Claims claim = jwtUtils.getClaimByToken((String) jwtToken.getCredentials());
                        if (claim == null || jwtUtils.isTokenExpired(claim.getExpiration())){
                            throw new AuthenticationException("token验证失败");
                        }
                    }
                    return message;
                }
                return null;
            }
        });
    }
}

这时需要做这个配置类了。@EnableWebSocketMessageBroker代表使用Stomp协议进行文本信息的传输。Stomp其实就是对WebSocket进行了更高层次的封装,专门用来传输文本信息,因为WebSocket比较底层,所以一般用这个。这时候也可以用@MessageMapping这个注解了。

  • registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS()分别是设置了ws的端点,允许跨域和使用SockJS。SockJS就解决有些低级浏览器不支持ws协议问题的,它会降级为ajax轮询等方式。

  • registry.setApplicationDestinationPrefixes("/app")表示客户端向服务器发消息的时候要加上前缀"/app",一般会配合注解@MessageMapping去用。

  • registry.enableSimpleBroker("/topic", "/queue", "/user")表示浏览器订阅消息的前缀,只有注册的前缀才能收到服务器的消息,不注册是收不到消息的。

  • 重写的configureClientInboundChannel方法主要目的是利用jwt去做权限校验。之前那种websocket的实现方式,权限校验的代码侵入了业务的代码,不如这样去写。这样写的话只要前端带上jwt对应的请求头,那第一次建立连接的时候就会走ChannelInterceptor这个拦截器,也就实现了握手的时候进行一次认证。

然后是业务层的代码:

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.markerhub.common.lang.Const;
import com.markerhub.entity.Content;
import com.markerhub.entity.Message;
import com.markerhub.entity.User;
import com.markerhub.service.UserService;
import com.markerhub.shiro.JwtToken;
import com.markerhub.util.JwtUtils;
import com.markerhub.util.MyUtils;
import io.jsonwebtoken.Claims;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.util.LinkedMultiValueMap;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author mingchiuli
 * @create 2021-12-27 7:03 PM
 */

@Controller
@Slf4j
public class WebSocketController {

    UserService userService;

    @Autowired
    public void setUserService(UserService userService) {
        this.userService = userService;
    }

    RedisTemplate<String, Object> redisTemplate;

    @Autowired
    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    SimpMessagingTemplate simpMessagingTemplate;

    @Autowired
    public void setSimpMessagingTemplate(SimpMessagingTemplate simpMessagingTemplate) {
        this.simpMessagingTemplate = simpMessagingTemplate;
    }

    JwtUtils jwtUtils;

    @Autowired
    public void setJwtUtils(JwtUtils jwtUtils) {
        this.jwtUtils = jwtUtils;
    }

    @MessageMapping("/init/{blogId}")
    public synchronized void init(@Headers Map<String, Object> headers, @DestinationVariable Long blogId) {
        LinkedMultiValueMap<String, String> map = (LinkedMultiValueMap<String, String>) headers.get("nativeHeaders");
        List<String> authorization = map.get("Authorization");
        if (authorization != null) {
            String token = authorization.get(0);

            JwtToken jwtToken = new JwtToken(token);
            Claims claim = jwtUtils.getClaimByToken((String) jwtToken.getCredentials());
            String userId = claim.getSubject();

            User user = userService.getBaseMapper().selectOne(new QueryWrapper<User>().eq("id", userId).select("id""username""avatar"));

            user.setNumber(redisTemplate.opsForHash().size(Const.WS_PREFIX + blogId));


            if (!redisTemplate.opsForHash().hasKey(Const.WS_PREFIX + blogId, userId)) {
                redisTemplate.opsForHash().put(Const.WS_PREFIX + blogId, userId, user);
            }


            Map<Object, Object> entries = redisTemplate.opsForHash().entries(Const.WS_PREFIX + blogId);

            ArrayList<User> users = new ArrayList<>();

            for (Map.Entry<Object, Object> entry : entries.entrySet()) {

                User value = MyUtils.jsonToObj(entry.getValue(), User.class);
                users.add(value);
            }


            simpMessagingTemplate.convertAndSend("/topic/user", users);

            log.info("{}号用户加入编辑室", userId);

        }
    }

    @MessageMapping("/destroy/{blogId}")
    public void destroy(@Headers Map<String, Object> headers, @DestinationVariable Long blogId) {
        LinkedMultiValueMap<String, String> map = (LinkedMultiValueMap<String, String>) headers.get("nativeHeaders");
        List<String> authorization = map.get("Authorization");
        if (authorization != null) {
            String token = authorization.get(0);

            JwtToken jwtToken = new JwtToken(token);
            Claims claim = jwtUtils.getClaimByToken((String) jwtToken.getCredentials());
            String userId = claim.getSubject();

            redisTemplate.opsForHash().delete(Const.WS_PREFIX + blogId, userId);

            log.info("{}号用户退出编辑室", userId);

        }
    }

    @MessageMapping("/chat/{from}/{to}")
    public void chat(String msg, @DestinationVariable String from, @DestinationVariable Long to) {

        Message message = new Message();

        message.setMessage(msg);
        message.setFrom(from);
        message.setTo(to);

        simpMessagingTemplate.convertAndSendToUser(to.toString(), "/queue/chat", message);


    }


    @MessageMapping("/sync/{from}")
    public void syncContent(@DestinationVariable Long from, String content) {
        Content msg = new Content();
        msg.setContent(content);
        msg.setFrom(from);
        simpMessagingTemplate.convertAndSend("/topic/content", msg);
    }


    @MessageMapping("/taskOver/{from}")
    public void taskOver(@DestinationVariable Long from) {
        simpMessagingTemplate.convertAndSend("/topic/over", from);
    }

}

我这里选用了redis作为用户会话的保存方式,同时把会话信息也做了简化。此时,ws的操作就和普通的http接口无异了。首先类上需要加上@Controller注解,然后,这里可以随便注入bean。@MessageMapping这个注解类似于@GetMapping,只是注意客户端往服务器发送消息的时候注意带上/app这个前缀。可以看到,服务器发送消息主要用到了simpMessagingTemplate.convertAndSend和simpMessagingTemplate.convertAndSendToUser这两个方法:

  • convertAndSend: 一般是用于广播式的推送

  • convertAndSendToUser: 一般是用于点对点的推送

但是观察源码可以看到,其实convertAndSendToUser最终还是调用了convertAndSend,所以这些方法本质上就没有区别,前端订阅的时候把订阅地址写清楚就好,随便订阅。convertAndSendToUser的地址spring会把/user/这个前缀给自动加上,也就是订阅地址类似于/user/1/queue/chat这种。切记把/user也注册了,否则会监听不到这个前缀的消息的。

Spring的API前端

因为用到了stomp和sockjs,所以我就找了找相应的js库。sockjs还好,但是在stomp出了问题:

Screenshot 20211230 at 11.23.09 PM.png
Screenshot 20211230 at 11.23.09 PM.png

网上的文章基本都是用的第一个js库,但是可以看到已经是9年前的东西了,最好不要用。所以用第二个吧。从https://stomp-js.github.io/guide/stompjs/using-stompjs-v5.html可以看到它的官方指南。网上的文章都是用的第一个库,有点坑爹啊,但其实第二个才是最新版本,作者不知为何迁移了。

let stompClient

import SockJS from 'sockjs-client';
import { Client, Message } from '@stomp/stompjs';

export default {
  name"Test",
  data() {
    return {
      user: {},
      users: [],
      chatUser'',
      text"",
      blogId0,
      username"",
      messages: [],
      contentBlank0'',
      contentBlank1'',
      content0'',
      content1'',
      created'',

      ruleForm: {
        id'',
        title'',
        description'',
        content''
      },

      rules: {
        title: [
          {requiredtruemessage'请输入标题'trigger'blur'},
          {min3max25message'长度在 3 到 25 个字符'trigger'blur'}
        ],
        description: [
          {requiredtruemessage'请输入摘要'trigger'blur'}
        ],
        content: [
          {requiredtruemessage'请输入内容'trigger'blur'}
        ]
      },
    }
  },

  created() {
    this.initWebSocket();
  },

  watch: {
    content0: {
      handlerfunction ({
        this.sync()
      },
      deeptrue
    },
    content1: {
      handlerfunction ({
        this.sync()
      },
      deeptrue
    },

  },

  methods: {

    sync() {
      let num = this.user.number
      let target = 'content' + num
      stompClient.publish({
        destination'/app/sync/' + this.user.id,
        bodythis[target] === '' ? ' ' : this[target]
      })
    },

   

    send() {
      const _this = this
      if (!this.chatUser) {
        this.$message({type'warning'message"请选择聊天对象"})
        return;
      }
      if (!this.text) {
        this.$message({type'warning'message"请输入内容"})
        return;
      }

      stompClient.publish({
        destination'/app/chat/' + this.user.username + '/' + this.chatUser,
        body: _this.text
      })


    },

    initWebSocket() {
      const _this = this;

      let blogId = this.$route.params.blogId
      _this.$axios.get('/blogAuthorized/' + blogId,{
        headers: {
          "Authorization": sessionStorage.getItem("myToken")
        }
      }).then(res => {
        const blog = res.data.data
        console.log(blog)
        _this.ruleForm.id = blog.id
        _this.ruleForm.title = blog.title
        _this.ruleForm.description = blog.description
        _this.content0 = blog.content
        _this.created = blog.created

        _this.connectWebSocket();
        // websocket断开重连, 每5s检查一次


      })

      _this.webSocketTimer = setInterval(() => {
        if (!stompClient.connected) {
          console.log("websocket重连中 ...");
          _this.connectWebSocket();
        }
      }, 10000);

    },

    connectWebSocket() {
      const _this = this;

      // const socket = new SockJS("http://127.0.0.1:8081/ws");
      // const webStompOptions = {
      //   debug: false
      // }
      // _this.stompClient = Stomp.client(socket);
       stompClient = new Client({
        connectHeaders: {"Authorization": sessionStorage.getItem("myToken")},
        debugfunction (str{
          //debug日志,调试时候开启
          console.log(str);
        },
        reconnectDelay10000,//重连时间
        heartbeatIncoming4000,
        heartbeatOutgoing4000,
      });

      stompClient.webSocketFactory = function ({
        //因为服务端监听的是/ws路径下面的请求,所以跟服务端保持一致
        return new SockJS("http://127.0.0.1:8081" + '/ws'null, {
          timeout10000
        });
      };


      stompClient.onConnect = (frame) => {



        stompClient.publish({
          destination'/app/init/' + _this.$route.params.blogId,
          headers: { Authorization: sessionStorage.getItem("myToken") },
        })

        stompClient.subscribe('/topic/user', (res) => {

          let username = JSON.parse(sessionStorage.getItem("myUserInfo")).username;
          _this.username = username
          console.log(res.body)
          let users = JSON.parse(res.body);
          console.log(users)

          for (let i = 0; i < users.length; i++) {
            let user = users[i]

            let num = user.number

            _this.$set(_this.users, num, user)

            _this.blogId = _this.$route.params.blogId

            if (username === user.username) {
              _this.user = user
              console.log(_this.user)
            }
          }
        });


        stompClient.subscribe('/topic/content', (res) => {

          let msg = JSON.parse(res.body)

          console.log(msg)

          let from = msg.from



          let content = msg.content

          let target = 'contentBlank'

          console.log(_this.users)


          for (let i = 0; i < _this.users.length; i++) {
            if (_this.users[i].id === from) {
              target += _this.users[i].number
              console.log(target)
              break
            }
          }

          console.log(content)
          _this[target] = content


        });

        //subscribe用不了_this.user.id,原因不明
        stompClient.subscribe('/user/' + JSON.parse(sessionStorage.getItem("myUserInfo")).id + '/queue/chat', (res) => {

          console.log(_this.user.id)

          console.log(res.body)

          let obj = JSON.parse(res.body)

          _this.$message({
            showClosetrue,
            message: obj.from + '对你说: ' + obj.message,
            type'success',
            duration20 * 1000,
            onClose:() => {
            }
          });

          // let result = JSON.parse(res.body);
          // console.log(result)
        });


        stompClient.subscribe('/topic/over', (res) => {
          let from = res.body

          console.log(from)

          if (_this.user.id !== from) {
            _this.disconnectWebSocket()
            _this.$router.push('/blogs/1')
          }
        })

      }

      stompClient.onStompError = function (frame{
        // Will be invoked in case of error encountered at Broker
        // Bad login/passcode typically will cause an error
        // Complaint brokers will set `message` header with a brief message. Body may contain details.
        // Compliant brokers will terminate the connection after any error
        console.log('Broker reported error: ' + frame.headers['message']);
        console.log('Additional details: ' + frame.body);
      };


      stompClient.activate();
    },

    disconnectWebSocket() {
      if (stompClient !== null) {

        stompClient.publish({
          destination'/app/destroy/' + this.blogId,
          headers: { Authorization: sessionStorage.getItem("myToken") },
        })

        stompClient.deactivate()
      }
    },

  },

  destroyedfunction ({
    clearInterval(this.webSocketTimer);
    this.disconnectWebSocket();
  },
}
</script>

可以看到和9年前的版本相比,写法有了很大的区别,主要体现在下面几点:

  • stompClient = new Client去做客户端的对象

  • stompClient.webSocketFactory去指定sockjs

  • stompClient.onConnect,所有的订阅监听都写在这里面

  • stompClient.subscribe的url拼接并不能使用data的数据,但是可以用sessionStorage里的数据

  • stompClient.subscribe去订阅,stompClient.publish去发送

m

mingchiuli

2022/01/04  阅读:38  主题:默认主题

作者介绍

m
mingchiuli