SpringCloud分布式集成WebSocket实现点对点聊天

内容纲要

file

文章案例代码地址:https://gitea.aiprose.com/nelson/im

案例是用一个新项目来写的,下载后修改配置文件中的redis地址。

如果只是一个简单的点对点聊天,用websocket其实很好做,其中有几个坑,大家可能会遇到

1.如果你的网关用的zuul,那就有点麻烦了,gateway默认是支持websocket的ws://协议转发的
2.websocket聊天中用的是websession,很多人可能第一反应就是利用redis序列化,这样不同微服务节点就可以都给目标用户发送消息了。但是websession是个接口,不能像httpsession一样序列化。

本文的方案是使用gateway网关,websession共享是通过redis的消息订阅实现来实现的。

一、websocket

增加websocket依赖,项目用的是gradle构建的。

implementation 'org.springframework.boot:spring-boot-starter-websocket'

配置ServerEndpoint

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

我们把websession放到一个ConcurrentHashMap中

@Component
public class WebSocketPool {
    public static final String HASHKEY = "imonline";
    public static final String CHANNEL = "channel:ceisim";

    private static Map<String, Session> sessions = null;

    @PostConstruct
    public void initMap(){
        if(sessions == null){
            sessions = new ConcurrentHashMap<>();
        }
    }

    public void open(Session session){
        if(sessions == null){
            sessions = new ConcurrentHashMap<>();
        }
        sessions.put(session.getId(),session);
    }

    public void close(String key){
        sessions.remove(key);
    }

    public Session get(String key){
        return sessions.get(key);
    }

    public Boolean has(String key){
        return sessions.containsKey(key);
    }
}

收发消息,这里有个坑,就是spring管理的对象默认都是单利的,ServerEndpoint是多利,所以会导致@ServerEndpoint中无法使用@Autowired自动注入,需要吧要要注入的对象申明成static,就是类的属性,然后用才能注入。
@OnOpen 开启会话
@OnMessage 收到消息
@OnClose 关闭会话

@Slf4j
@Component
@ServerEndpoint("/ws/imserver/{user}")
public class WebSocket {

    private static WebSocketPool socketPool;

    private static StringRedisTemplate redisTemplate;

    private static RedisUtil redisUtil;

    @Autowired
    public void setSocketPool(WebSocketPool socketPool) {
        WebSocket.socketPool = socketPool;
    }

    @Autowired
    public void setRedisTemplate(StringRedisTemplate redisTemplate) {
        WebSocket.redisTemplate = redisTemplate;
    }

    @Autowired
    public void setRedisUtil(RedisUtil redisUtil) {
        WebSocket.redisUtil = redisUtil;
    }

    @OnOpen
    public void open(@PathParam("user") String user, Session session) {
        socketPool.open(session);
        redisUtil.setHash(WebSocketPool.HASHKEY, user, session.getId());
        log.info(user + "连接成功");
        MsgVo msgVo = new MsgVo();
        msgVo.setMsgType(1);
        Set<String> onlineUsers = redisUtil.hashKeys(WebSocketPool.HASHKEY);
        System.out.println(onlineUsers);
        String onlineUserStr = JSON.toJSONString(onlineUsers);
        msgVo.setMessage(onlineUserStr);
        sendMessage(msgVo);
    }

    @OnMessage
    public void message(@PathParam("user") String user, String message) {
        long timeMillis = System.currentTimeMillis();
        log.info("收到消息{}", message);
        MsgVo msgVo = JSONObject.parseObject(message, MsgVo.class);
        if (StringUtils.isBlank(msgVo.getReceiveUserId())) {
            return;
        }
        if (StringUtils.isBlank(msgVo.getMessage())) {
            return;
        }
        if (StringUtils.isBlank(msgVo.getUuid())) {
            return;
        }
        MsgVo backMsg = new MsgVo();
        backMsg.setSendDate(timeMillis);
        try {
            // TODO 保存聊天记录到数据库中

            // 返回消息发送状态
            backMsg.setMsgType(3);
            backMsg.setReceiveUserId(user);
            backMsg.setUuid(msgVo.getUuid());
            backMsg.setMsgStatus(0);
            sendMessage(backMsg);
        } catch (Exception e) {
            e.printStackTrace();
            backMsg.setMsgStatus(1);
            sendMessage(backMsg);
        }

        try {
            // 发送消息给目标用户
            msgVo.setMsgType(2);
            msgVo.setSendDate(timeMillis);
            sendMessage(msgVo);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @OnClose
    public void close(@PathParam("user") String user) {
        if (redisUtil.hasKeyHash(WebSocketPool.HASHKEY, user)) {
            String key = redisUtil.getHashIndex(WebSocketPool.HASHKEY, user).toString();
            socketPool.close(key);
            redisUtil.deleteHash(WebSocketPool.HASHKEY, user);
        }
        log.info(user + "关闭会话");
    }

    @OnError
    public void close(Session session, Throwable error) {
        socketPool.close(session.getId());
        log.error(error.getMessage());
    }

    private void sendMessage(MsgVo msgVo) {
        redisTemplate.convertAndSend(WebSocketPool.CHANNEL, JSONObject.toJSONString(msgVo));
    }
}

消息实体

@Data
public class MsgVo implements Serializable {
    /**发送用户ID*/
    private String sendUserId;
    /**目标用户ID*/
    private String receiveUserId;
    /**消息内容*/
    private String message;
    /**
     * 1在线用户列表  2点对点聊天消息  3返回消息发送状态
     */
    private Integer msgType;

    /** 0 成功  1失败 */
    private Integer msgStatus = 0;

    /** 单条消息标识,返回消息发送状态 */
    private String uuid;

    /**发送时间*/
    private Long sendDate;

    private String appId;
}

二、redis消息发布与订阅

发送消息是通过websession来实现的,但是有个问题就是websession不能共享,不能序列化到redis中,可以通过mq消息队列来实现广播给各个节点,这里采用的是redis的消息发布与订阅,所有的微服务节点订阅同一个CHANNEL,一旦这个CHANNEL有新的消息,所有的节点都会检查是否有目标的session,然后获取到websession给目标用户发送消息。

RedisTemplate如果用springdata自带的话,生成的key前面会有前缀。

@Slf4j
@Configuration
public class RedisConfig {
    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 监听msgToAll
        container.addMessageListener(listenerAdapter, new PatternTopic(WebSocketPool.CHANNEL));
        log.info("Subscribed Redis channel: " + WebSocketPool.CHANNEL);
        return container;
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter(RedisReceiver redisReceiver){
        return new MessageListenerAdapter(redisReceiver,"receiveMessage");
    }

    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        // 使用Jackson2JsonRedisSerialize 替换默认序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        // 设置value的序列化规则和 key的序列化规则
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());

        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

消息订阅,并给目标用户发送消息

@Component
public class RedisReceiver {

    @Autowired
    private WebSocketPool socketPool;

    @Autowired
    private RedisUtil redisUtil;

    public void receiveMessage(String message){
        MsgVo msgVo = JSON.parseObject(message, MsgVo.class);
        try {
            // 点对点发送消息
            if (StringUtils.isNotBlank(msgVo.getReceiveUserId())) { // p2p
                if(redisUtil.hasKeyHash(WebSocketPool.HASHKEY,msgVo.getReceiveUserId())){
                    String sessionid = redisUtil.getHashIndex(WebSocketPool.HASHKEY, msgVo.getReceiveUserId()).toString();
                    if(socketPool.has(sessionid)){
                        socketPool.get(sessionid).getAsyncRemote().sendText(message);
                    }
                }
            } else { // 群发
                List<Object> sessions = redisUtil.getHash(WebSocketPool.HASHKEY);
                for (Object item : sessions) {
                    //如果某个session不存在,分布式环境下可能session在其他的节点,继续给其他人广播
                    try {
                        if(socketPool.has(item.toString())){
                            socketPool.get(item.toString()).getAsyncRemote().sendText(message);
                        }
                    }catch (Exception e1){
                        e1.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

这里给大家提供一个好用redis工具类

@Component
public class RedisUtil {

    @Autowired
    private RedisTemplate redisTemplate;

    @Resource(name="redisTemplate")
    private ValueOperations<String, Object> valueOperations;
    @Resource(name="redisTemplate")
    private HashOperations<String, String, Object> hashOperations;
    @Resource(name="redisTemplate")
    private ListOperations<Object, Object> listOperations;

    /**
     * @param key
     * @param value
     * @param expireTime
     * @Title: set
     * @Description: set cache.
     */
    public void set(String key, int value, Date expireTime) {
        RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
        counter.set(value);
        counter.expireAt(expireTime);
    }

    /**
     * @param key
     * @param value
     * @param timeout
     * @param unit
     * @Title: set
     * @Description: set cache.
     */
    public void set(String key, int value, long timeout, TimeUnit unit) {
        RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
        counter.set(value);
        counter.expire(timeout, unit);
    }

    /**
     * @param key
     * @return
     * @Title: generate
     * @Description: Atomically increments by one the current value.
     */
    public long generate(String key) {
        RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
        return counter.incrementAndGet();
    }

    /**
     * @param key
     * @return
     * @Title: generate
     * @Description: Atomically increments by one the current value.
     */
    public long generate(String key, Date expireTime) {
        RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
        counter.expireAt(expireTime);
        return counter.incrementAndGet();
    }

    /**
     * @param key
     * @param increment
     * @return
     * @Title: generate
     * @Description: Atomically adds the given value to the current value.
     */
    public long generate(String key, int increment) {
        RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
        return counter.addAndGet(increment);
    }

    /**
     * @param key
     * @param increment
     * @param expireTime
     * @return
     * @Title: generate
     * @Description: Atomically adds the given value to the current value.
     */
    public long generate(String key, int increment, Date expireTime) {
        RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
        counter.expireAt(expireTime);
        return counter.addAndGet(increment);
    }

    /**
     * 保存对象
     * @param key
     * @param value
     */
    public void setObject(String key,Object value) {
        valueOperations.set(key, value);
    }

    /**
     * 获取对象
     * @param key
     * @return
     */
    public Object getObect(String key) {
        return valueOperations.get(key);
    }

    /**
     * 删除对象
     * @param key
     */
    public void deleteObject(String key) {
        redisTemplate.delete(key);
    }

    /**
     * 插入HaspMap
     * @param key
     * @param hashKey
     * @param value
     */
    public void setHash(String key, String hashKey, Object value) {
        hashOperations.put(key, hashKey, value);
    }

    /**
     * 获取知道下标HaspMap
     * @param key
     * @param hashKey
     * @return
     */
    public Object getHashIndex(String key, String hashKey) {
        return hashOperations.get(key, hashKey);
    }

    /**
     * 获取HaspMap
     * @param key
     * @return
     */
    public List<Object> getHash(String key) {
        return hashOperations.values(key);
    }

    /**
     * 判断是否存在某个key
     * @param key
     * @param hashKey
     * @return
     */
    public Boolean hasKeyHash(String key, String hashKey) {
        return hashOperations.hasKey(key, hashKey);
    }

    /**
     * 获取Map
     * @param key
     * @return
     */
    public Map<String, Object> entriesHasp(String key){
        return hashOperations.entries(key);
    }

    /**
     * 获取Map
     * @param key
     * @return
     */
    public Set<String> hashKeys(String key){
        return hashOperations.keys(key);
    }

    /**
     * 大小
     * @param key
     * @return
     */
    public long sizeHash(String key) {
        return hashOperations.size(key);
    }

    /**
     * 删除HaspMap
     * @param key
     * @param value
     */
    public void deleteHash(String key, Object value) {
        hashOperations.delete(key, value);
    }

    /**
     * 数组里面添加元素
     * @param key
     * @param value
     */
    public void setLeftList(String key,Object value)
    {
        listOperations.leftPush(key, value);
    }

    /**
     * 全部添加
     * @param key
     * @param list
     */
    public void setleftAllList(String key, List list) {
        listOperations.leftPushAll(key, list);
    }

    /**
     * 对指定下标的数组元素进行替换
     * @param key
     * @param index
     * @param value
     */
    public void setList(String key,long index,Object value) {
        listOperations.set(key, index, value);
    }

    /**
     * 数组大小
     * @param key
     */
    public long sizeList(String key) {
        return listOperations.size(key);
    }

    /**
     * 获取指定下标元素
     * @param key
     * @param index
     * @return
     */
    public Object getListIndex(String key,long index) {
        return listOperations.index(key, index);
    }

    /**
     * 获取list 指定开始-结束
     * @param key
     * @param start 开始
     * @param end 结束
     * @return
     */
    public Object getList(String key,long start, long end) {
        return listOperations.range(key, start, end);
    }

}

三、gateway网关配置

我们的websocket地址是@ServerEndpoint("/ws/imserver/{user}"),但是前端登录后拿到的是token,所以前端带着token去请求,new WebSocket("ws://192.168.0.70:8801/ws/imserver?token="+token),在gateway校验token后,修改请求地址,改成websocket的地址。

配置转发规则

一般的http转发规则是lb://social-service,websocket的转发规则就是lb:ws://social-service,多了个ws。

      - id: ws_im_route
        uri: lb:ws://social-service
        predicates:
          - Path=/ws/imserver/**
          - Header=Upgrade

token校验处理

token校验通过后,获取到登录用户ID,修改请求路径。

@Component
@Slf4j
public class AccessFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpResponse response = exchange.getResponse();

        // 处理websocket  token start
        URI requestUrl = exchange.getRequest().getURI();
        if(requestUrl.getPath().contains("/ws/imserver")){
            List<String> token = request.getQueryParams().get("token");
            if(token != null && token.size() >0){
                String userid = getUserIdByToken(token.get(0));
                if(StringUtils.isNotBlank(userid)){
                    // token校验通过后,修改请求路径
                    String newPath =request.getPath()+"/"+userid;
                    ServerHttpRequest newRequest = request.mutate().path(newPath).build();
                    exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, newRequest.getURI());
                    return chain.filter(exchange.mutate().request(newRequest).build());
                }
            }
            response.setStatusCode(HttpStatus.UNAUTHORIZED);
            response.getHeaders().add("Content-Type", "application/json; charset=utf-8");
            String message = "{\"status\":99,\"message\":\"请登录后尝试\"}";
            DataBuffer buffer = response.bufferFactory().wrap(message.getBytes());
            return response.writeWith(Mono.just(buffer));
        }
        // 处理websocket  token stop
        // 处理其余的HTTP请求
    }

四、前端

前端这里使用原生的websocket,代码提供了几个案例,模拟三个人聊天,在后面的截图里。

    // const token= ""
    // var ws = new WebSocket("ws://192.168.0.70:8801/ws/imserver?token="+token)
    var ws = new WebSocket("ws://192.168.0.70:8999/ws/imserver/zhangsan")
    ws.open = function (msg) {
        console.log("open:" + msg)
    }

    ws.onmessage = function (msg) {
        console.log("message:" + msg)
    }

    function sendHandler(userid) {
        var param = {}
        param.sendUserId = 'zhangsan'
        param.receiveUserId = userid
        param.message = '来自 zhangsan 消息'
        param.msgType = 2
        param.uuid = new Date() * 1
        ws.send(JSON.stringify(param))
    }

文章案例代码地址:https://gitea.aiprose.com/nelson/im

file

标签

发表评论