FastAPI 学习之路(四十九)WebSockets(五)修复接口测试中的问题
其实代码没有问题,但是我们忽略了一点,就是我们在正常的开发中,肯定是遇到这样的情况,我们频繁的有客户端链接,断开链接,我们需要统一的管理起来,那么我们应该如何去管理呢,其实这个时候,我们要去声明一个类去管理我们的这些链接。我们应该如何优化呢。
定义一个链接管理类,处理我们所有的链接。
class ConnectionManager: def __init__(self): # 存放**的链接 self.active_connections: List[Dict[str, WebSocket]] = [] async def connect(self, user: str, ws: WebSocket): # 链接 await ws.accept() self.active_connections.append({"user": user, "ws": ws}) def disconnect(self, user: str, ws: WebSocket): # 关闭时 移除ws对象 self.active_connections.remove({"user": user, "ws": ws})