项目背景
根据项目要求,在各系统之间实现异步通知服务,从客户端到服务端,从服务端到客户端实现双向通知。以客户端主动,由客户端向服务端发起http请求,来实现任务的发起,服务端执行异步任务,由于非同步,所以客户端并不知道这个任务什么时候执行完毕,所以需要服务端在执行完任务的时候,主动向客户端发送一个请求,来告知客户端我已经完成任务了,来完成服务端客户端的双向通信。
websocket选择
一般来说,客户端与服务端的交互都是主动的,客户端与服务端对应的模式就是:客户端请求-服务端响应。而服务端主动向客户端推送消息,客户端不像服务端只有一个地址,要解决这一个问题,我考虑到以下两种方式。
Ajax轮询
所谓的Ajax轮询,其实就是定时的通过Ajax查询服务端,客户端按规定时间定时像服务端发送ajax请求,服务器接到请求后马上返回响应信息并关闭连接。
这种技术方式实现起来非常简单,但是这种方式会有非常严重的问题,就是需要不断的向服务器发送消息询问,这种方式会对服务器造成极大的性能浪费。不是实时通信,不顾及应用的状态改变而盲目检查更新,导致服务器资源的浪费,且会加重网络负载,拖累服务器。
websocket服务
WebSocket是HTML5开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。在WebSocket API中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。详情看
这里选择使用websocket
开发环境
Python + tornado + vue + websocket
前端(客户端)建立websocket
由客户端主动与服务端建立websocket连接,并保持连接通道,通道保持就可以实现双向通信,发消息,建立连接时,根据用户行为(登录等)初始化链接。代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| <template> <div> </div> </template> <script> export default { name : 'websocket', data(){ return { websock: null, reconnectTime: 0, reconnectData:null, lockReconnect:false, timeout:1000, timeoutObj:null, serverTimeoutObj:null, } }, created(){ this.initWebSocket() }, destroyed(){ this.lockReconnect = true this.websock.close() clearTimeout(this.reconnectData) clearTimeout(this.timeoutObj) clearTimeout(this.serverTimeoutObj) }, methods: { initWebSocket(){ const wsuri = ' ' # 链接地址 this.websock = new WebSocket(wsuri + '?token=' + this.$store.getters.token) this.websock.onmessage = this.websocketOnMessage this.websock.onopen = this.websocketOnOpen this.websock.onerror = this.websocketOnError this.websock.onclose = this.websocketClose }, websocketOnOpen(){ this.heartBeat() this.reconnectTime = 0 console.log('websocket服务连接成功') }, websocketOnError(){ this.reconnect() }, websocketSend(Data){ this.websock.send(JSON.stringify(Data)) }, websocketClose(e){ this.reconnect() }, websocketOnMessage(e){ this.heartBeat() }, heartBeat(){ this.timeoutObj && clearTimeout(this.timeoutObj) this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj) this.timeoutObj = setTimeout(()=>{ this.websocketSend('heart_beat') this.serverTimeoutObj = setTimeout(()=> { this.websock.close() }, 5000) }, this.timeout) }, reconnect(){ if(this.lockReconnect){ return } this.lockReconnect = true console.log('websocket服务异常关闭,正在重连中。。。') if (this.reconnectTime >= 5){ console.log('websocket服务重连失败, 请刷新浏览器') return } this.reconnectData && clearTimeout(this.reconnectData) this.reconnectData = setTimeout(()=>{ this.initWebSocket() this.reconnectTime++ this.lockReconnect = false },5000) }, } } </script>
|
用户行为操作后,或者前端项目启动后,加载这个组件,然后就会初始化websocket,与服务端进行连接,连接成功之后会触发心跳机制维持连接,可以通过websocket的事件进行收发消息
服务端启动websocket服务
websocket连接层
前端通过连接地址,指定websocket服务端启动的ip端口,继承websocket基类,创立连接
1 2 3 4 5 6 7 8 9
| from base.websocket import BaseWebSocket from lib.routes import route from services.connection import Connection
@route(r'/websocket') class AthenaWebsocket(BaseWebSocket): """websocket连接层,创建连接""" connection = Connection('athena')
|
websocket基类
建立连接成功后,可对该连接进行处理,将该用户访问的连接保存在本地,维护连接通道
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| from datetime import datetime
import ujson from tornado.websocket import WebSocketHandler
class BaseWebSocket(WebSocketHandler): """ websocket基类""" user = None connection = None # 维护的连接
async def open(self, *args, **kwargs): """ 建立websocket时,触发的方法,可对用户进行注册,保存本地连接实例 """ self.connection.register(self)
def on_message(self, info): """处理由web页面传递的消息 """ try: message = ujson.loads(info) # 心跳处理 if message == 'heart_beat': self.write_message('ok') except Exception as e: pass
def on_close(self): """websocket 关闭, 移除socket连接实例 """ if self.user: self.connection.unregister(self)
def check_origin(self, origin): """ 默认允许跨域请求 :param origin: """ return True
|
保存连接
用户通过前端建立连接后,将连接实例保存在本地(无法保存在redis,mysql等库中,为连接对象),之后通过连接实例进行收发消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| class Connection(object): """连接状态"""
def __init__(self, system): """ 连接初始化,若需重写连接状态的部分逻辑,替换对应的代理"
""" # 保存的websocket连接实例 self.websockets = defaultdict(list) self.message_delegate = MessageDelegate(system, self.websockets) # 发送消息代理类 self.redis_delegate = RedisListenDelegate(system, self) # redis 监听类
def register(self, item): """ 用户建立webscoket 1. 添加新连接, :param item: websocket instance """ if item in self.websockets[item.user]: return self.websockets[item.user].append(item)
def unregister(self, item): """ 用户退出websocket 1. 删除连接, 2.删除用户在线状态 :param item: websocket instance """ self.websockets[item.user].remove(item)
|
发送消息
服务端只要调用发消息的方法,从本地根据连接的用户属性,获取当前用户在建立连接时保存的连接,然后根据连接对象实现主动向客户端发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| class MessageDelegate(object): """ 消息代理"""
def __init__(self, system, websockets): """ init :param websockets: websoctets """ self.websockets = websockets # 和connection宿主维护同一个连接记录池
def send_message(self, message): """ 向指定用户发送请求,若用户在线直接发送 :param message: """ msg = ujson.loads(message) receiver = msg.get('receiver') if receiver: self.unicast(receiver, msg)
def unicast(self, receiver, message): """ 指定用户发送消息 :param receiver: 用户 :param message: 消息 # :return: """ websockets = self.websockets.get(receiver, []) if not websockets: return for websocket in websockets: self.write_message(websocket, message)
def write_message(self, socket, message): """ 向客户端发消息 :param socket: 需要发送信息的websocketHandler连接 :param message: 发送的信息 """ socket.write_message(message)
|