您好,欢迎来到爱go旅游网。
搜索
您的当前位置:首页SpringBoot整合WebSocket的客户端和服务端的实现

SpringBoot整合WebSocket的客户端和服务端的实现

来源:爱go旅游网
SpringBoot整合WebSocket的客户端和服务端的实现

本⽂是项⽬中使⽤了websocket进⾏⼀些数据的推送,对⽐项⽬做了⼀个demo,ws的相关问题不做细数,仅做⼀下记录。

此demo针对ws的搭建主要逻辑背景是⼀个服务端B:通讯层 产⽣消息推送出去,另外⼀个项⽬A充当客户端和服务端,A的客户端:是接收通讯层去⽆差别接收这些消息,A的服务端:根据地址ip去订阅。⽤户通过订阅A的ws,同时记录下⾃⼰的信息,项⽬B推送的消息,项⽬A接收到之后通过当初订阅的逻辑和⼀些权限过滤条件对项⽬B产⽣的消息进⾏过滤再推送到⽤户客户端上。

⼀、项⽬中服务端的创建

⾸先引⼊maven仓库

org.springframework.boot

spring-boot-starter-websocket

websocket的服务端搭建

同时注意springboot要开启ws服务启动类加上@EnableScheduling简要解读demo

/webSocket/{id}:链接的id是业务上的⼀个id,这边之前做过类似拍卖的,相当于⼀个服务端或者业务上的⼀个标识,是客户端指明链接到哪⼀个拍卖间的标识

@ServerEndpoint:作为服务端的注解。

package com.ghh.myproject.websocket;

import cn.hutool.core.lang.UUID;import com.alibaba.fastjson.JSON;import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Component;import javax.websocket.*;

import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;@ServerEndpoint(\"/webSocket/{id}\")@Component

public class WebSocket {

private Logger log = LoggerFactory.getLogger(WebSocket.class);

private static int onlineCount = 0;

/** 创建⼀个map存放 产⽣的ws链接推送 */

private static Map clients = new ConcurrentHashMap<>(); /** 创建⼀个map存放 当前接⼊的客户端 */

private static Map idMap = new ConcurrentHashMap<>();

private Session session; /** 链接进⼊的⼀个场景id */ private String id;

/** 每⼀个链接的⼀个唯⼀标识 */ private String userNo;

/**

* @Description: 第三⽅⽂接⼊当前项⽬websocket后的记录信息 * @DateTime: 2021/7/5 10:02 * @Author: GHH

* @Params: [id, session] * @Return void */

@OnOpen

public void onOpen(@PathParam(\"id\") String id, Session session) throws IOException { log.info(\"已连接到id:{}竞拍场,当前竞拍场⼈数:{}\ this.id = id;

this.session = session;

// ⽣成⼀个随机序列号来存储⼀个id下的所有⽤户 this.userNo = UUID.fastUUID().toString();

addOnlineCount();

//根据随机序列号存储⼀个socket连接 clients.put(userNo, this); idMap.put(userNo, id);

}

/**

* @Description: 关闭连接 * @DateTime: 2021/7/5 10:02 * @Author: GHH * @Params: [] * @Return void */

@OnClose

public void onClose() throws IOException { clients.remove(userNo); idMap.remove(userNo); subOnlineCount(); }

/**

* @Description: 客户端发送消息调⽤此⽅法 * @DateTime: 2021/6/16 15:35 * @Author: GHH

* @Params: [message] * @Return void */

@OnMessage

public void onMessage(String message) throws IOException {// JSONObject jsonTo = JSONObject.parseObject(message);// String mes = (String) jsonTo.get(\"message\");// if (!(\"All\").equals(jsonTo.get(\"To\"))) {

// sendMessageTo(mes, jsonTo.get(\"To\").toString());// } else {

// sendMessageAll(message);// }

log.info(\"onMessage⽅法成功\"); }

@OnError

public void onError(Session session, Throwable error) { log.error(\"{}\ }

public static void sendMessageTo(String message, String userNo) throws IOException { // session.getBasicRemote().sendText(message); //session.getAsyncRemote().sendText(message); WebSocket webSocket = clients.get(userNo);

if (webSocket != null && webSocket.session.isOpen()) {

webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message)); } }

/**

* @Description: 推送到指定的id值的记录 * @DateTime: 2021/6/15 17:11 * @Author: GHH

* @Params: [message, id] * @Return void */

public static void sendMessageToById(String message, String id) { // session.getBasicRemote().sendText(message); //session.getAsyncRemote().sendText(message); //根据id获取所有的userNo链接的⽤户

List userNos = getUserNosById(id);

for (WebSocket item : clients.values()) {

//遍历链接的value值,如果当前传⼊的id中链接的⽤户包含value值,则推送。 if (userNos.contains(item.userNo)) {

item.session.getAsyncRemote().sendText(message); } } }

/**

* @Description: 推送所有开启的信息 * @DateTime: 2021/6/15 17:13

* @Author: GHH

* @Params: [message] * @Return void */

public static void sendMessageAll(String message){ for (WebSocket item : clients.values()) {

item.session.getAsyncRemote().sendText(message); } }

public static synchronized int getOnlineCount() { return onlineCount; }

public static synchronized void addOnlineCount() { WebSocket.onlineCount++; }

public static synchronized void subOnlineCount() { WebSocket.onlineCount--; }

public static synchronized Map getClients() { return clients; }

/**

* @Description: 根据相应场景的⼀些逻辑处理 * @DateTime: 2021/7/5 10:03 * @Author: GHH * @Params: [id]

* @Return java.util.List */

public static List getUserNosById(String id) { ArrayList userNos = new ArrayList<>(); for (Map.Entry entry : idMap.entrySet()) { if (entry.getValue().equals(id)) { userNos.add(entry.getKey()); } }

return userNos; }}

 demo中模拟的是定时器推送,第⼀个参数是消息内容,第⼆个是推送到哪⼀个拍卖间或者其他业务上的内容。⽅法的具体内容上⼀段代码有详细解释,有通过id,或者发送给全部ws链接的客户端

WebSocket.sendMessageToById(\"\"+count,2+\"\");

@Scheduled(cron = \"*/5 * * * * ?\") public void job1(){

log.info(\"测试⽣成次数:{}\

redisTemplate.opsForValue().set(\"测试\"+count, \"\"+count++); if (count%2==0){

WebSocket.sendMessageToById(\"\"+count,2+\"\"); }else {

WebSocket.sendMessageToById(\"\"+count,1+\"\"); }

log.info(\"websocket发送\"+count); }

⼆、java充当客户端链接ws。

上述是java作为ws服务端推送当前业务信息的⼀个demo。我们项⽬⽬前做的是⼀个通讯层的概念,只能够推送数据内容,却⽆法根据⽤户权限去推送不同的数据。

ws客户端的搭建,⾸先链接ws服务端。⾸先是我们另外⼀个服务的ws配置信息,我这边demo是模拟链接上⾯的ws服务 1、ws客户端的配置

package com.ghh.websocketRecive.wsMessage;import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

import javax.websocket.ContainerProvider;import javax.websocket.Session;

import javax.websocket.WebSocketContainer;import java.net.URI;

/**

* @author ghh

* @date 2019-08-16 16:02 */

@Component@Slf4j

public class WSClient {

public static Session session;

public static void startWS() { try {

if (WSClient.session != null) { WSClient.session.close(); }

WebSocketContainer container = ContainerProvider.getWebSocketContainer(); //设置消息⼤⼩最⼤为10M

container.setDefaultMaxBinaryMessageBufferSize(10*1024*1024); container.setDefaultMaxTextMessageBufferSize(10*1024*1024); // 客户端,开启服务端websocket。

String uri = \"ws://192.168.0.108:8082/webSocket/1\";

Session session = container.connectToServer(WSHandler.class, URI.create(uri)); WSClient.session = session; } catch (Exception ex) {

log.info(ex.getMessage()); } }}

2、配置信息需要在项⽬启动的时候去启⽤和链接ws服务

package com.ghh.websocketRecive;

import com.ghh.websocketRecive.wsMessage.WSClient;import lombok.extern.slf4j.Slf4j;

import org.mybatis.spring.annotation.MapperScan;import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.EnableScheduling;import javax.annotation.PostConstruct;

@Slf4j

@EnableScheduling@SpringBootApplication

@MapperScan(\"com.ghh.websocketRecive.dao\")public class WebsocketReciveApplication { public static void main(String[] args) {

SpringApplication.run(WebsocketReciveApplication.class, args); }

@PostConstruct public void init(){

log.info(\"初始化应⽤程序\");     // 初始化ws,链接服务端 WSClient.startWS(); }}

3、接收服务端推送的消息进⾏权限过滤demo

@ClientEndpoint:作为ws的客户端注解,@OnMessage接收服务端推送的消息。package com.ghh.websocketRecive.wsMessage;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

import com.ghh.websocketRecive.entity.Student;

import com.ghh.websocketRecive.service.UserService;import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.websocket.*;import java.util.Objects;

import java.util.Set;

import static com.ghh.websocketRecive.wsMessage.WSClient.startWS;

@ClientEndpoint@Slf4j

@Component

public class WSHandler { @Autowired

RedisTemplate redisTemplate;

private static RedisTemplate redisTemplateService; @PostConstruct public void init() {

redisTemplateService=redisTemplate; }

@OnOpen

public void onOpen(Session session) { WSClient.session = session; }

@OnMessage

public void processMessage(String message) {

log.info(\"websocketRecive接收推送消息\"+message); int permission = Integer.parseInt(message)%5; //查询所有订阅的客户端的ip。

Set keys = redisTemplateService.keys(\"ip:*\"); for (String key : keys) {

// 根据登录后存储的客户端ip,获取权限地址

String s = redisTemplateService.opsForValue().get(key); String[] split = s.split(\ for (String s1 : split) {

//向含有推送过来的数据权限地址的客户端推送告警数据。 if (s1.equals(permission+\"\")){

WebSocket.sendMessageToByIp(message,key.split(\":\")[1]); } } } }

@OnError

public void processError(Throwable t) { WSClient.session = null; try {

Thread.sleep(5000); startWS();

} catch (InterruptedException e) {

log.error(\"---websocket processError InterruptedException---\ }

log.error(\"---websocket processError error---\ }

@OnClose

public void processClose(Session session, CloseReason closeReason) { log.error(session.getId() + closeReason.toString()); }

public void send(String sessionId, String message) { try {

log.info(\"send Msg:\" + message);

if (Objects.nonNull(WSClient.session)) {

WSClient.session.getBasicRemote().sendText(message); } else {

log.info(\"---websocket error----\"); }

} catch (Exception e) {

log.error(\"---websocket send error---\ } }}

4、ws客户端推送消息,推送消息和上⾯服务端类似。这边是根据ip

package com.ghh.websocketRecive.wsMessage;import cn.hutool.core.lang.UUID;import com.alibaba.fastjson.JSON;

import com.ghh.websocketRecive.service.UserService;import lombok.Builder;import lombok.Data;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.websocket.*;

import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;@ServerEndpoint(\"/webSocket/{ip}\")@Component

public class WebSocket {

private Logger log = LoggerFactory.getLogger(WebSocket.class); private static int onlineCount = 0;

private static Map clients = new ConcurrentHashMap<>(); private Session session;

/** 当前连接服务端的客户端ip */ private String ip;

@Autowired

RedisTemplate redisTemplate;

private static RedisTemplate redisTemplateService; @PostConstruct public void init() {

redisTemplateService = redisTemplate; }

@OnOpen

public void onOpen(@PathParam(\"ip\") String ip, Session session) throws IOException { log.info(\"ip:{}客户端已连接:,当前客户端数量:{}\ this.ip = ip;

this.session = session;

// 接⼊⼀个websocket则⽣成⼀个随机序列号 addOnlineCount();

//根据随机序列号存储⼀个socket连接 clients.put(ip, this); }

@OnClose

public void onClose() throws IOException { clients.remove(ip); onlineCount--; subOnlineCount(); }

/**

* @Description: 客户端发送消息调⽤此⽅法 * @DateTime: 2021/6/16 15:35 * @Author: GHH

* @Params: [message] * @Return void */

@OnMessage

public void onMessage(String message) throws IOException { log.info(\"客户端发送消onMessage⽅法成功\"); }

@OnError

public void onError(Session session, Throwable error) { log.error(\"{}\ }

public static void sendMessageTo(String message, String userNo) throws IOException { WebSocket webSocket = clients.get(userNo);

if (webSocket != null && webSocket.session.isOpen()) {

webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message)); } }

/**

* @Description: 推送到指定的ip值的记录

* @DateTime: 2021/6/15 17:11 * @Author: GHH

* @Params: [message, id] * @Return void */

public static void sendMessageToByIp(String message, String ip) { for (WebSocket item : clients.values()) {

//遍历链接的value值,如果当前传⼊的ip中链接的⽤户包含value值,则推送。 if (item.ip.equals(ip)) {

item.session.getAsyncRemote().sendText(message); } } }

/**

* @Description: 推送所有开启的信息 * @DateTime: 2021/6/15 17:13 * @Author: GHH

* @Params: [message] * @Return void */

public static void sendMessageAll(String message){ for (WebSocket item : clients.values()) {

item.session.getAsyncRemote().sendText(message); } }

public static synchronized int getOnlineCount() { return onlineCount; }

public static synchronized void addOnlineCount() { WebSocket.onlineCount++; }

public static synchronized void subOnlineCount() { WebSocket.onlineCount--; }

public static synchronized Map getClients() { return clients; }}

概述:

如有疑惑部分,欢迎⼤家积极探讨

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- igat.cn 版权所有 赣ICP备2024042791号-1

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务