"""StompConnection: a STOMP connection
"""
import queue
import logging
from webstompy.transporter import WebSocketTransporter
from webstompy.receiver import StompReceiver
from webstompy.logging import get_logger
import webstompy
[docs]class StompConnection(object):
"""StompConnection: a STOMP connection
Parameters
----------
connector:
The connector to communicate over, to be used in `transporter`
transporter: webstompy.transporter.transporter.BaseTransporter
The transporter to handle the actual communication with the
connector (in most cases a
`webstompy.transporter.transporter.WebSocketTransporter`)
"""
def __init__(self, connector=None, transporter=WebSocketTransporter):
"""StompConnection constructor
"""
self.logger = get_logger(f"{__name__}.{self.__class__.__name__}")
# TODO: Add logging
# TODO: Check if transporter is valid, throw up if not
self._connector = connector
self._transporter = transporter(self._connector)
self._queue_frames = queue.Queue()
self._queue_listener = queue.Queue()
self._receiver = StompReceiver(
transporter=self._transporter,
queue_frames=self._queue_frames,
queue_listener=self._queue_listener,
)
self.logger.info(
"New webstompy connection initializing. "
"Starting receiver daemon."
)
self._receiver.daemon = True
self._receiver.start()
self._frame_connected = None
def _send_frame(self, frame):
"""Send a StompFrame to the server
Parameters
----------
frame: webstompy.StompFrame
Frame to send to the server via the transporter.
"""
if __debug__:
self.logger.debug(f"Sending frame to server: {frame}")
self._transporter.send(frame.payload)
[docs] def add_listener(self, listener):
"""Add a listener to be invoked in case of events.
Parameters
----------
listener: webstompy.StompListener
Listener to be derived from `webstompy.StompListener`.
"""
self.logger.info(
f"Adding listener {listener.__class__.__name__} to "
"StompConnection."
)
self._queue_listener.put(listener)
[docs] def connect(self, login=None, passcode=None, timeout=None):
"""Connect to the STOMP server
Parameters
----------
login: str
The user id used to authenticate against a secured STOMP server
(None to login without user id).
passcode: str
The password used to authenticate against a secured STOMP server
(None to login without password).
timeout: int
Timeout to wait for the confirmation of the connection from the
server (None means: wait forever).
Returns
-------
frame_connected: webstompy.StompFrame
The CONNECTED frame upon success.
"""
header = [("accept-version", "1.1")]
if login is not None:
header.append(("login", login))
if passcode is not None:
header.append(("passcode", passcode))
frame = webstompy.StompFrame("CONNECT", header, None)
self._send_frame(frame)
frame_connected = self._queue_frames.get(timeout=timeout)
if frame_connected.command == b"CONNECTED":
self._frame_connected = frame_connected
self.logger.info(f"Connection successfully initialized.")
return frame_connected
else:
raise webstompy.exception.ConnectFailedException(
"Did not receive a valid CONNECTED response!"
)
[docs] def send(
self,
destination,
message="",
content_type="text/plain",
content_length=None,
):
"""Send a message to the STOMP server
Parameters
----------
destination str
The destination to send this message to (required).
message str
The message body to send.
content_type str
Content type of the message body.
content_length: int
Content length of the message body (byte count).
"""
header = [("destination", destination), ("content-type", content_type)]
if content_length is not None:
header.append(("content-length", content_length))
frame = webstompy.StompFrame("SEND", header, message)
self._send_frame(frame)
[docs] def subscribe(self, destination, id):
"""Subscribe to messages under a given destination/topic.
Parameters
----------
destination str
The destination to subscribe to (required).
id str
The id to subscribe under (must be unique for each subscription).
"""
self.logger.info(f"Subscribing to destination {destination}.")
header = [("destination", destination), ("id", id)]
frame = webstompy.StompFrame("SUBSCRIBE", header, None)
self._send_frame(frame)
@property
def alive(self):
"""Detect whether connection is still alive.
Returns
-------
is_alive: bool
Whether connection is still alive.
"""
return self._transporter.alive