首页 - 神途资讯 > 跟着源码一起学:手把手教你用WebSocket打造Web端IM聊天

跟着源码一起学:手把手教你用WebSocket打造Web端IM聊天

发布于:2024-03-23 作者:admin 阅读:143

代码目录内容是这样:

在本小节中,我们会使用 搭建一个 的示例。

提供如下消息的功能支持:

考虑到让示例更加易懂,我们先做成全局有且仅有一个大的聊天室,即建立上 的连接,都自动动进入该聊天室。

下面,开始遨游 这个鱼塘...

4.2、引入依赖

在 pom.xml 文件中,引入相关依赖。



    
        org.springframework.boot
        spring-boot-starter-parent
        2.1.10.RELEASE
         
    
    4.0.0
    lab-25-01
    
        
        
            org.springframework.boot
            spring-boot-starter-websocket
        
        
        
            com.alibaba
            fastjson
            1.2.62
        
    

具体每个依赖的作用,自己认真看下注释。

4.3、int

在 cn...lab25.. 包路径下,创建 int 类,定义 服务的端点()。

代码如下:

// WebsocketServerEndpoint.java
@Controller
@ServerEndpoint("/")
public class WebsocketServerEndpoint {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        logger.info("[onOpen][session({}) 接入]", session);
    }
    @OnMessage
    public void onMessage(Session session, String message) {
        logger.info("[onOpen][session({}) 接收到一条消息({})]", session, message); // 生产环境下,请设置成 debug 级别
    }
    @OnClose
    public void onClose(Session session, CloseReason closeReason) {
        logger.info("[onClose][session({}) 连接关闭。关闭原因是({})}]", session, closeReason);
    }
    @OnError
    public void onError(Session session, Throwable throwable) {
        logger.info("[onClose][session({}) 发生异常]", session, throwable);
    }
}

如代码所示:

这是最简版的 int 的代码。在下文,我们会慢慢把代码补全。

4.4、on

在 cn...lab24.. 包路径下,创建 int 配置类。

代码如下:

// WebSocketConfiguration.java
@Configuration
// @EnableWebSocket // 无需添加该注解,因为我们并不是使用 Spring WebSocket
public class WebSocketConfiguration {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

PS:在 #er() 方法中,创建 er Bean 。该 Bean 的作用,是扫描添加有 @ 注解的 Bean 。

4.5、

创建 .java 类,配置 @n 注解即可。

代码如下:

// Application.java
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

执行 启动该示例项目。

考虑到大家可能不会或者不愿意写前端代码,所以我们直接使用 在线测试工具,测试 连接。

如下图:

至此,最简单的一个 项目的骨架,我们已经搭建完成。下面,我们开始改造,把相应的逻辑补全。

4.6、消息

在 HTTP 协议中,是基于 / 请求响应的同步模型,进行交互。在 协议中,是基于 消息的异步模型,进行交互。这一点,是很大的不同的,等会看到具体的消息类,感受会更明显。

因为 协议,不像 HTTP 协议有 URI 可以区分不同的 API 请求操作,所以我们需要在 的 里,增加能够标识消息类型,这里我们采用 type 字段。

所以在这个示例中,我们采用的 采用 *** ON 格式编码。

格式如下:

{
    type: "", // 消息类型
    body: {} // 消息体
}

解释一下:

实际上:我们在该示例中,body 字段对应的 相关的接口和类,实在想不到名字了。所有的 们,我们都放在 cn...lab25.. 包路径下。

4.6.1

创建 接口,基础消息体,所有消息体都要实现该接口。

代码如下:

// Message.java
publicinterfaceMessage {
}

目前作为一个标记接口,未定义任何操作。

4.6.2 认证相关

创建 类,用户认证请求。

代码如下:

// AuthRequest.java
public class AuthRequest implements Message {
    public static final String TYPE = "AUTH_REQUEST";
    /**
     * 认证 Token
     */
    private String accessToken;
    // ... 省略 set/get 方法
}

解释一下:

对于第2)点,在 协议中,我们也需要认证当前连接,用户身份是什么。一般情况下,我们采用用户调用 HTTP 登录接口,登录成功后返回的访问令牌 。这里,我们先不拓展开讲,事后可以看看 《基于 Token 认证的 连接》 文章。

虽然说, 协议是基于 模型,进行交互。但是,这并不意味着它的操作,不需要响应结果。例如说,用户认证请求,是需要用户认证响应的。所以,我们创建 类,作为用户认证响应。

代码如下:

// AuthResponse.java
public class AuthResponse implements Message {
    public static final String TYPE = "AUTH_RESPONSE";
    /**
     * 响应状态码
     */
    private Integer code;
    /**
     * 响应提示
     */
    private String message;
    // ... 省略 set/get 方法
}

解释一下:

对于第1)点,实际上,我们在每个 实现类上,都增加了 TYPE 静态属性,作为消息类型。下面,我们就不重复赘述了。

在本示例中,用户成功认证之后,会广播用户加入群聊的通知 ,使用 t 。

代码如下:

// UserJoinNoticeRequest.java
public class UserJoinNoticeRequest implements Message {
    public static final String TYPE = "USER_JOIN_NOTICE_REQUEST";
    /**
     * 昵称
     */
    private String nickname;
    // ... 省略 set/get 方法
}

实际上,我们可以在需要使用到 / 模型的地方,将 进行拓展:

这样,在使用到同步模型的业务场景下, 实现类使用 / 作为后缀。例如说,用户认证请求、删除一个好友请求等等。

而在使用到异步模型能的业务场景下, 实现类还是继续 作为后缀。例如说,发送一条消息,用户操作完后,无需阻塞等待结果

4.6.3 发送消息相关

创建 类,发送给指定人的私聊消息的 。

代码如下:

// SendToOneRequest.java
public class SendToOneRequest implements Message {
    public static final String TYPE = "SEND_TO_ONE_REQUEST";
    /**
     * 发送给的用户
     */
    private String toUser;
    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    // ... 省略 set/get 方法
}

每个字段,自己看注释噢。

创建 类,发送给所有人的群聊消息的 。

代码如下:

// .java

class {

final TYPE = "";

/**

* 消息编号

*/

msgId;

/**

* 内容

*/

;

// ... 省略 set/get 方法

每个字段,自己看注释噢。

在服务端接收到发送消息的请求,需要异步响应发送是否成功。所以,创建 类,发送消息响应结果的 。

代码如下:

// SendResponse.java
public class SendResponse implements Message {
    public static final String TYPE = "SEND_RESPONSE";
    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 响应状态码
     */
    private Integer code;
    /**
     * 响应提示
     */
    private String message;
    // ... 省略 set/get 方法
}

重点看 msgId 字段:即消息编号。客户端在发送消息,通过使用 UUID 算法,生成全局唯一消息编号(唯一ID的生成技术见:《从新手到专家:如何设计一套亿级消息量的分布式IM系统》的“5、唯一ID的技术方案”章节)。这样,服务端通过 消息响应,通过 msgId 做映射。

在服务端接收到发送消息的请求,需要转发消息给对应的人。所以,创建 类,发送消息给一个用户的 。

代码如下:

// SendResponse.java
public class SendToUserRequest implements Message {
    public static final String TYPE = "SEND_TO_USER_REQUEST";
    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    // ... 省略 set/get 方法
}

相比 来说,少一个 字段。因为,我们可以通过 连接,已经知道发送给谁了。

4.7、消息处理器

每个客户端发起的 消息类型,我们会声明对应的 消息处理器。这个就类似在 中,每个 API 接口对应一个 的 方法。

所有的 们,我们都放在 cn...lab25.. 包路径下。

4.7.1

创建 接口,消息处理器接口。

代码如下:

// MessageHandler.java
public interface MessageHandler {
    /**
     * 执行处理消息
     *
     * @param session 会话
     * @param message 消息
     */
    void execute(Session session, T message);
    /**
     * @return 消息类型,即每个 Message 实现类上的 TYPE 静态字段
     */
    String getType();
}

解释一下:

4.7.2

创建 类,处理 消息。

代码如下:

// AuthMessageHandler.java
@Component
public class AuthMessageHandler implements MessageHandler {
    @Override
    public void execute(Session session, AuthRequest message) {
        // 如果未传递 accessToken
        if(StringUtils.isEmpty(message.getAccessToken())) {
            WebSocketUtil.send(session, AuthResponse.TYPE,
                    new AuthResponse().setCode(1).setMessage("认证 accessToken 未传入"));
            return;
        }
        // 添加到 WebSocketUtil 中
        WebSocketUtil.addSession(session, message.getAccessToken()); // 考虑到代码简化,我们先直接使用 accessToken 作为 User
        // 判断是否认证成功。这里,假装直接成功
        WebSocketUtil.send(session, AuthResponse.TYPE,newAuthResponse().setCode(0));
        // 通知所有人,某个人加入了。这个是可选逻辑,仅仅是为了演示
        WebSocketUtil.broadcast(UserJoinNoticeRequest.TYPE,
                newUserJoinNoticeRequest().setNickname(message.getAccessToken())); // 考虑到代码简化,我们先直接使用 accessToken 作为 User
    }
    @Override
    public String getType() {
        return AuthRequest.TYPE;
    }
}

代码比较简单,跟着代码读读即可。

关于 类,我们在「5.8、」一节中再来详细看看。

4.7.3

创建 类,处理 消息。

代码如下:

// SendToOneRequest.java
@Component
public class SendToOneHandler implements MessageHandler {
    @Override
    public void execute(Session session, SendToOneRequest message) {
        // 这里,假装直接成功
        SendResponse sendResponse = newSendResponse().setMsgId(message.getMsgId()).setCode(0);
        WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);
 
        // 创建转发的消息
        SendToUserRequest sendToUserRequest = newSendToUserRequest().setMsgId(message.getMsgId())
                .setContent(message.getContent());
        // 广播发送
        WebSocketUtil.send(message.getToUser(), SendToUserRequest.TYPE, sendToUserRequest);
    }
    @Override
    public String getType() {
        return SendToOneRequest.TYPE;
    }
}

代码比较简单,跟着代码读读即可。

4.7.4

创建 类,处理 消息。

代码如下:

// SendToAllRequest.java
@Component
public class SendToAllHandler implements MessageHandler {
    @Override
    public void execute(Session session, SendToAllRequest message) {
        // 这里,假装直接成功
        SendResponse sendResponse = newSendResponse().setMsgId(message.getMsgId()).setCode(0);
        WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);
        // 创建转发的消息
        SendToUserRequest sendToUserRequest = newSendToUserRequest().setMsgId(message.getMsgId())
                .setContent(message.getContent());
        // 广播发送
        WebSocketUtil.broadcast(SendToUserRequest.TYPE, sendToUserRequest);
    }
 
    @Override
    public String getType() {
        return SendToAllRequest.TYPE;
    }
}

代码比较简单,跟着代码读读即可。

4.8、

代码在 cn...lab25..util 包路径下。

创建 工具类,主要提供两方面的功能:

整体代码比较简单,自己瞅瞅哟。

代码在目录中的如下位置:

4.9、完善 int

在本小节,我们会修改 int 的代码,完善其功能。

4.9.1 初始化 ***

实现 接口,在 #() 方法中,扫描所有 Bean ,添加到 *** 中。

代码如下:

// WebsocketServerEndpoint.java
/**
 * 消息类型与 MessageHandler 的映射
 *
 * 注意,这里设置成静态变量。虽然说 WebsocketServerEndpoint 是单例,但是 Spring Boot 还是会为每个 WebSocket 创建一个 WebsocketServerEndpoint Bean 。
 */
private static final Map HANDLERS = newHashMap<>();
@Autowired
private ApplicationContext applicationContext;
@Override
public void afterPropertiesSet() throws Exception {
    // 通过 ApplicationContext 获得所有 MessageHandler Bean
    applicationContext.getBeansOfType(MessageHandler.class).values() // 获得所有 MessageHandler Bean.forEach(messageHandler -> HANDLERS.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中
    logger.info("[afterPropertiesSet][消息处理器数量:{}]", HANDLERS.size());
}

通过这样的方式,可以避免手动配置 与消息类型的映射。

4.9.2

重新实现 #( , ) 方法,实现连接时,使用 参数进行用户认证。

代码如下:

// WebsocketServerEndpoint.java
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
    logger.info("[onOpen][session({}) 接入]", session);
    // <1> 解析 accessToken
    List accessTokenValues = session.getRequestParameterMap().get("accessToken");
    String accessToken = !CollectionUtils.isEmpty(accessTokenValues) ? accessTokenValues.get(0) : null;
    // <2> 创建 AuthRequest 消息类型
    AuthRequest authRequest = newAuthRequest().setAccessToken(accessToken);
    // <3> 获得消息处理器
    MessageHandler messageHandler = HANDLERS.get(AuthRequest.TYPE);
    if(messageHandler == null) {
        logger.error("[onOpen][认证消息类型,不存在消息处理器]");
        return;
    }
    messageHandler.execute(session, authRequest);
}

如代码所示:

打开三个浏览器创建,分别设置服务地址如下:

然后,逐个点击「开启连接」按钮,进行 连接。

最终效果如下图:

如上图所示:

4.9.3

重新实现 #( , ) 方法,实现不同的消息,转发给不同的 消息处理器。

代码如下:

// WebsocketServerEndpoint.java
@OnMessage
public void onMessage(Session session, String message) {
    logger.info("[onOpen][session({}) 接收到一条消息({})]", session, message); // 生产环境下,请设置成 debug 级别
    try{
        // <1> 获得消息类型
         *** ONObject jsonMessage =  *** ON.parseObject(message);
        String messageType = jsonMessage.getString("type");
        // <2> 获得消息处理器
        MessageHandler messageHandler = HANDLERS.get(messageType);
        if(messageHandler == null) {
            logger.error("[onMessage][消息类型({}) 不存在消息处理器]", messageType);
            return;
        }
        // <3> 解析消息
        Class messageClass = this.getMessageClass(messageHandler);
        // <4> 处理消息
        Message messageObj =  *** ON.parseObject(jsonMessage.getString("body"), messageClass);
        messageHandler.execute(session, messageObj);
    } catch(Throwable throwable) {
        logger.info("[onMessage][session({}) message({}) 发生异常]", session, throwable);
    }
}

代码中:

代码如下:

// WebsocketServerEndpoint.java
private Class getMessageClass(MessageHandler handler) {
    // 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。
    Class targetClass = AopProxyUtils.ultimateTargetClass(handler);
    // 获得接口的 Type 数组
    Type[] interfaces = targetClass.getGenericInterfaces();
    Class superclass = targetClass.getSuperclass();
    while((Objects.isNull(interfaces) || 0== interfaces.length) && Objects.nonNull(superclass)) { // 此处,是以父类的接口为准
        interfaces = superclass.getGenericInterfaces();
        superclass = targetClass.getSuperclass();
    }
    if(Objects.nonNull(interfaces)) {
        // 遍历 interfaces 数组
        for(Type type : interfaces) {
            // 要求 type 是泛型参数
            if(type instanceof ParameterizedType) {
                ParameterizedType parameterizedType = (ParameterizedType) type;
                // 要求是 MessageHandler 接口
                if(Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) {
                    Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
                    // 取首个元素
                    if(Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
                        return(Class) actualTypeArguments[0];
                    } else{
                        thrownewIllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
                    }
                }
            }
        }
    }
    throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
}

这是参考 - 项目的 #() 方法,进行略微修改。

如果大家对 Java 的泛型机制没有做过一点了解,可能略微有点硬核。可以先暂时跳过,知道意图即可。

处,调用 #(, ) 方法,执行处理请求。

另外:这里增加了 try-catch 代码,避免整个执行的过程中,发生异常。如果在 事件的处理中,发生异常,该消息对应的 会话会被自动关闭。显然,这个不符合我们的要求。例如说,在 处理消息的过程中,发生一些异常是无法避免的。

继续基于上述创建的三个浏览器,我们先点击「清空消息」按钮,清空下消息,打扫下上次测试展示出来的接收得到的 。当然, 的连接,不需要去断开。

在之一个浏览器中,分别发送两种聊天消息。

一条 私聊消息:

{
    type: "SEND_TO_ONE_REQUEST",
    body: {
        toUser: "番茄",
        msgId: "eaef4a3c-35dd-46ee-b548-f9c4eb6396fe",
        content: "我是一条单聊消息"
    }
}

一条 群聊消息:

{
    type: "SEND_TO_ALL_REQUEST",
    body: {
        msgId: "838e97e1-6ae9-40f9-99c3-f7127ed64747",
        content: "我是一条群聊消息"
    }
}

最终结果如下图:

如上图所示:

4.9.4

重新实现 #( , ) 方法,实现移除关闭的 。

代码如下:

// WebsocketServerEndpoint.java
@OnClose
public void onClose(Session session, CloseReason closeReason) {
    logger.info("[onClose][session({}) 连接关闭。关闭原因是({})}]", session, closeReason);
    WebSocketUtil.removeSession(session);
}

4.9.5

#( , ) 方法,保持不变。

代码如下:

// WebsocketServerEndpoint.java
@OnError
public void onError(Session session, Throwable throwable) {
    logger.info("[onClose][session({}) 发生异常]", session, throwable);
}

五、 实战入门5.0、基础介绍

示例代码下载:

二维码

扫一扫关注我们

版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,请告知我们,本站将立刻删除涉嫌侵权内容。

相关文章