SpringCloud分布式集成WebSocket实现点对点聊天
文章案例代码地址:https://gitea.aiprose.com/nelson/im
案例是用一个新项目来写的,下载后修改配置文件中的redis地址。
如果只是一个简单的点对点聊天,用websocket其实很好做,其中有几个坑,大家可能会遇到
1.如果你的网关用的zuul,那就有点麻烦了,gateway默认是支持websocket的ws://协议转发的
2.websocket聊天中用的是websession,很多人可能第一反应就是利用redis序列化,这样不同微服务节点就可以都给目标用户发送消息了。但是websession是个接口,不能像httpsession一样序列化。
本文的方案是使用gateway网关,websession共享是通过redis的消息订阅实现来实现的。
一、websocket
增加websocket依赖,项目用的是gradle构建的。
implementation 'org.springframework.boot:spring-boot-starter-websocket'
配置ServerEndpoint
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
我们把websession放到一个ConcurrentHashMap中
@Component
public class WebSocketPool {
public static final String HASHKEY = "imonline";
public static final String CHANNEL = "channel:ceisim";
private static Map<String, Session> sessions = null;
@PostConstruct
public void initMap(){
if(sessions == null){
sessions = new ConcurrentHashMap<>();
}
}
public void open(Session session){
if(sessions == null){
sessions = new ConcurrentHashMap<>();
}
sessions.put(session.getId(),session);
}
public void close(String key){
sessions.remove(key);
}
public Session get(String key){
return sessions.get(key);
}
public Boolean has(String key){
return sessions.containsKey(key);
}
}
收发消息,这里有个坑,就是spring管理的对象默认都是单利的,ServerEndpoint是多利,所以会导致@ServerEndpoint中无法使用@Autowired自动注入,需要吧要要注入的对象申明成static,就是类的属性,然后用才能注入。
@OnOpen 开启会话
@OnMessage 收到消息
@OnClose 关闭会话
@Slf4j
@Component
@ServerEndpoint("/ws/imserver/{user}")
public class WebSocket {
private static WebSocketPool socketPool;
private static StringRedisTemplate redisTemplate;
private static RedisUtil redisUtil;
@Autowired
public void setSocketPool(WebSocketPool socketPool) {
WebSocket.socketPool = socketPool;
}
@Autowired
public void setRedisTemplate(StringRedisTemplate redisTemplate) {
WebSocket.redisTemplate = redisTemplate;
}
@Autowired
public void setRedisUtil(RedisUtil redisUtil) {
WebSocket.redisUtil = redisUtil;
}
@OnOpen
public void open(@PathParam("user") String user, Session session) {
socketPool.open(session);
redisUtil.setHash(WebSocketPool.HASHKEY, user, session.getId());
log.info(user + "连接成功");
MsgVo msgVo = new MsgVo();
msgVo.setMsgType(1);
Set<String> onlineUsers = redisUtil.hashKeys(WebSocketPool.HASHKEY);
System.out.println(onlineUsers);
String onlineUserStr = JSON.toJSONString(onlineUsers);
msgVo.setMessage(onlineUserStr);
sendMessage(msgVo);
}
@OnMessage
public void message(@PathParam("user") String user, String message) {
long timeMillis = System.currentTimeMillis();
log.info("收到消息{}", message);
MsgVo msgVo = JSONObject.parseObject(message, MsgVo.class);
if (StringUtils.isBlank(msgVo.getReceiveUserId())) {
return;
}
if (StringUtils.isBlank(msgVo.getMessage())) {
return;
}
if (StringUtils.isBlank(msgVo.getUuid())) {
return;
}
MsgVo backMsg = new MsgVo();
backMsg.setSendDate(timeMillis);
try {
// TODO 保存聊天记录到数据库中
// 返回消息发送状态
backMsg.setMsgType(3);
backMsg.setReceiveUserId(user);
backMsg.setUuid(msgVo.getUuid());
backMsg.setMsgStatus(0);
sendMessage(backMsg);
} catch (Exception e) {
e.printStackTrace();
backMsg.setMsgStatus(1);
sendMessage(backMsg);
}
try {
// 发送消息给目标用户
msgVo.setMsgType(2);
msgVo.setSendDate(timeMillis);
sendMessage(msgVo);
}catch (Exception e){
e.printStackTrace();
}
}
@OnClose
public void close(@PathParam("user") String user) {
if (redisUtil.hasKeyHash(WebSocketPool.HASHKEY, user)) {
String key = redisUtil.getHashIndex(WebSocketPool.HASHKEY, user).toString();
socketPool.close(key);
redisUtil.deleteHash(WebSocketPool.HASHKEY, user);
}
log.info(user + "关闭会话");
}
@OnError
public void close(Session session, Throwable error) {
socketPool.close(session.getId());
log.error(error.getMessage());
}
private void sendMessage(MsgVo msgVo) {
redisTemplate.convertAndSend(WebSocketPool.CHANNEL, JSONObject.toJSONString(msgVo));
}
}
消息实体
@Data
public class MsgVo implements Serializable {
/**发送用户ID*/
private String sendUserId;
/**目标用户ID*/
private String receiveUserId;
/**消息内容*/
private String message;
/**
* 1在线用户列表 2点对点聊天消息 3返回消息发送状态
*/
private Integer msgType;
/** 0 成功 1失败 */
private Integer msgStatus = 0;
/** 单条消息标识,返回消息发送状态 */
private String uuid;
/**发送时间*/
private Long sendDate;
private String appId;
}
二、redis消息发布与订阅
发送消息是通过websession来实现的,但是有个问题就是websession不能共享,不能序列化到redis中,可以通过mq消息队列来实现广播给各个节点,这里采用的是redis的消息发布与订阅,所有的微服务节点订阅同一个CHANNEL,一旦这个CHANNEL有新的消息,所有的节点都会检查是否有目标的session,然后获取到websession给目标用户发送消息。
RedisTemplate如果用springdata自带的话,生成的key前面会有前缀。
@Slf4j
@Configuration
public class RedisConfig {
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 监听msgToAll
container.addMessageListener(listenerAdapter, new PatternTopic(WebSocketPool.CHANNEL));
log.info("Subscribed Redis channel: " + WebSocketPool.CHANNEL);
return container;
}
@Bean
public MessageListenerAdapter messageListenerAdapter(RedisReceiver redisReceiver){
return new MessageListenerAdapter(redisReceiver,"receiveMessage");
}
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 使用Jackson2JsonRedisSerialize 替换默认序列化
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 设置value的序列化规则和 key的序列化规则
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
消息订阅,并给目标用户发送消息
@Component
public class RedisReceiver {
@Autowired
private WebSocketPool socketPool;
@Autowired
private RedisUtil redisUtil;
public void receiveMessage(String message){
MsgVo msgVo = JSON.parseObject(message, MsgVo.class);
try {
// 点对点发送消息
if (StringUtils.isNotBlank(msgVo.getReceiveUserId())) { // p2p
if(redisUtil.hasKeyHash(WebSocketPool.HASHKEY,msgVo.getReceiveUserId())){
String sessionid = redisUtil.getHashIndex(WebSocketPool.HASHKEY, msgVo.getReceiveUserId()).toString();
if(socketPool.has(sessionid)){
socketPool.get(sessionid).getAsyncRemote().sendText(message);
}
}
} else { // 群发
List<Object> sessions = redisUtil.getHash(WebSocketPool.HASHKEY);
for (Object item : sessions) {
//如果某个session不存在,分布式环境下可能session在其他的节点,继续给其他人广播
try {
if(socketPool.has(item.toString())){
socketPool.get(item.toString()).getAsyncRemote().sendText(message);
}
}catch (Exception e1){
e1.printStackTrace();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
这里给大家提供一个好用redis工具类
@Component
public class RedisUtil {
@Autowired
private RedisTemplate redisTemplate;
@Resource(name="redisTemplate")
private ValueOperations<String, Object> valueOperations;
@Resource(name="redisTemplate")
private HashOperations<String, String, Object> hashOperations;
@Resource(name="redisTemplate")
private ListOperations<Object, Object> listOperations;
/**
* @param key
* @param value
* @param expireTime
* @Title: set
* @Description: set cache.
*/
public void set(String key, int value, Date expireTime) {
RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
counter.set(value);
counter.expireAt(expireTime);
}
/**
* @param key
* @param value
* @param timeout
* @param unit
* @Title: set
* @Description: set cache.
*/
public void set(String key, int value, long timeout, TimeUnit unit) {
RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
counter.set(value);
counter.expire(timeout, unit);
}
/**
* @param key
* @return
* @Title: generate
* @Description: Atomically increments by one the current value.
*/
public long generate(String key) {
RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
return counter.incrementAndGet();
}
/**
* @param key
* @return
* @Title: generate
* @Description: Atomically increments by one the current value.
*/
public long generate(String key, Date expireTime) {
RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
counter.expireAt(expireTime);
return counter.incrementAndGet();
}
/**
* @param key
* @param increment
* @return
* @Title: generate
* @Description: Atomically adds the given value to the current value.
*/
public long generate(String key, int increment) {
RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
return counter.addAndGet(increment);
}
/**
* @param key
* @param increment
* @param expireTime
* @return
* @Title: generate
* @Description: Atomically adds the given value to the current value.
*/
public long generate(String key, int increment, Date expireTime) {
RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
counter.expireAt(expireTime);
return counter.addAndGet(increment);
}
/**
* 保存对象
* @param key
* @param value
*/
public void setObject(String key,Object value) {
valueOperations.set(key, value);
}
/**
* 获取对象
* @param key
* @return
*/
public Object getObect(String key) {
return valueOperations.get(key);
}
/**
* 删除对象
* @param key
*/
public void deleteObject(String key) {
redisTemplate.delete(key);
}
/**
* 插入HaspMap
* @param key
* @param hashKey
* @param value
*/
public void setHash(String key, String hashKey, Object value) {
hashOperations.put(key, hashKey, value);
}
/**
* 获取知道下标HaspMap
* @param key
* @param hashKey
* @return
*/
public Object getHashIndex(String key, String hashKey) {
return hashOperations.get(key, hashKey);
}
/**
* 获取HaspMap
* @param key
* @return
*/
public List<Object> getHash(String key) {
return hashOperations.values(key);
}
/**
* 判断是否存在某个key
* @param key
* @param hashKey
* @return
*/
public Boolean hasKeyHash(String key, String hashKey) {
return hashOperations.hasKey(key, hashKey);
}
/**
* 获取Map
* @param key
* @return
*/
public Map<String, Object> entriesHasp(String key){
return hashOperations.entries(key);
}
/**
* 获取Map
* @param key
* @return
*/
public Set<String> hashKeys(String key){
return hashOperations.keys(key);
}
/**
* 大小
* @param key
* @return
*/
public long sizeHash(String key) {
return hashOperations.size(key);
}
/**
* 删除HaspMap
* @param key
* @param value
*/
public void deleteHash(String key, Object value) {
hashOperations.delete(key, value);
}
/**
* 数组里面添加元素
* @param key
* @param value
*/
public void setLeftList(String key,Object value)
{
listOperations.leftPush(key, value);
}
/**
* 全部添加
* @param key
* @param list
*/
public void setleftAllList(String key, List list) {
listOperations.leftPushAll(key, list);
}
/**
* 对指定下标的数组元素进行替换
* @param key
* @param index
* @param value
*/
public void setList(String key,long index,Object value) {
listOperations.set(key, index, value);
}
/**
* 数组大小
* @param key
*/
public long sizeList(String key) {
return listOperations.size(key);
}
/**
* 获取指定下标元素
* @param key
* @param index
* @return
*/
public Object getListIndex(String key,long index) {
return listOperations.index(key, index);
}
/**
* 获取list 指定开始-结束
* @param key
* @param start 开始
* @param end 结束
* @return
*/
public Object getList(String key,long start, long end) {
return listOperations.range(key, start, end);
}
}
三、gateway网关配置
我们的websocket地址是@ServerEndpoint("/ws/imserver/{user}")
,但是前端登录后拿到的是token,所以前端带着token去请求,new WebSocket("ws://192.168.0.70:8801/ws/imserver?token="+token)
,在gateway校验token后,修改请求地址,改成websocket的地址。
配置转发规则
一般的http转发规则是lb://social-service
,websocket的转发规则就是lb:ws://social-service
,多了个ws。
- id: ws_im_route
uri: lb:ws://social-service
predicates:
- Path=/ws/imserver/**
- Header=Upgrade
token校验处理
token校验通过后,获取到登录用户ID,修改请求路径。
@Component
@Slf4j
public class AccessFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
// 处理websocket token start
URI requestUrl = exchange.getRequest().getURI();
if(requestUrl.getPath().contains("/ws/imserver")){
List<String> token = request.getQueryParams().get("token");
if(token != null && token.size() >0){
String userid = getUserIdByToken(token.get(0));
if(StringUtils.isNotBlank(userid)){
// token校验通过后,修改请求路径
String newPath =request.getPath()+"/"+userid;
ServerHttpRequest newRequest = request.mutate().path(newPath).build();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, newRequest.getURI());
return chain.filter(exchange.mutate().request(newRequest).build());
}
}
response.setStatusCode(HttpStatus.UNAUTHORIZED);
response.getHeaders().add("Content-Type", "application/json; charset=utf-8");
String message = "{\"status\":99,\"message\":\"请登录后尝试\"}";
DataBuffer buffer = response.bufferFactory().wrap(message.getBytes());
return response.writeWith(Mono.just(buffer));
}
// 处理websocket token stop
// 处理其余的HTTP请求
}
四、前端
前端这里使用原生的websocket,代码提供了几个案例,模拟三个人聊天,在后面的截图里。
// const token= ""
// var ws = new WebSocket("ws://192.168.0.70:8801/ws/imserver?token="+token)
var ws = new WebSocket("ws://192.168.0.70:8999/ws/imserver/zhangsan")
ws.open = function (msg) {
console.log("open:" + msg)
}
ws.onmessage = function (msg) {
console.log("message:" + msg)
}
function sendHandler(userid) {
var param = {}
param.sendUserId = 'zhangsan'
param.receiveUserId = userid
param.message = '来自 zhangsan 消息'
param.msgType = 2
param.uuid = new Date() * 1
ws.send(JSON.stringify(param))
}
文章案例代码地址:https://gitea.aiprose.com/nelson/im
发表评论