Source code for webstompy.transporter.transporter

"""All transporter objects (WebSocket et al.) to transport the STOMP messages.
"""

from abc import ABC, abstractmethod

import time
import threading
import websocket
from websocket._exceptions import WebSocketConnectionClosedException
import webstompy

OPCODE_DATA = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY)


[docs]class BaseTransporter(ABC): """Abstract transporter class to adapt to various connection types """
[docs] @abstractmethod def receive(self): """Receive a message from the transporter connection (blocking) """ pass
[docs] @abstractmethod def send(self, frame): """Send a message to the transporter connection """ pass
[docs] @abstractmethod def alive(self): """Check transporter connection status """ pass
[docs]class WebSocketPinger(threading.Thread): """Helper class for WebSocketTransporter to to regular WebSocket pings as connection keepalive. """ def __init__(self, socket): threading.Thread.__init__(self) self.socket = socket
[docs] def run(self): while True: time.sleep(10) self.socket.ping()
[docs]class WebSocketTransporter(BaseTransporter): """webstompty transporter class through WebSockets """ def __init__(self, socket): # TODO: Check if socket is valid, throw up if not self.socket = socket self._alive = True self._pinger = WebSocketPinger(self.socket) self._pinger.daemon = True self._pinger.start()
[docs] def receive(self): """Receive a message from the transporter connection (blocking) """ try: frame = self.socket.recv_frame() except WebSocketConnectionClosedException: self._alive = False raise webstompy.exception.ConnectionClosedException( "Connection closed unexpectedly!" ) except websocket.WebSocketException: self.socket = None raise return None if not frame: raise websocket.WebSocketException("Not a valid frame %s" % frame) elif frame.opcode in OPCODE_DATA: return frame.data elif frame.opcode == websocket.ABNF.OPCODE_PONG: return frame.data elif frame.opcode == websocket.ABNF.OPCODE_CLOSE: self.socket.send_close() self.socket = None self._alive = False return None elif frame.opcode == websocket.ABNF.OPCODE_PING: self.socket.pong(frame.data) return frame.data
[docs] def send(self, frame): """Send a message to the transporter connection """ self.socket.send(frame)
@property def alive(self): """Check transporter connection status """ return self._alive