"""Classes to handle Sonos UPnP Events and Subscriptions using asyncio.
The `Subscription` class from this module will be used in
:py:mod:`soco.services` if `config.EVENTS_MODULE` is set
to point to this module.
Example:
Run this code, and change your volume, tracks etc::
import logging
logging.basicConfig()
import soco
import asyncio
from pprint import pprint
from soco import events_asyncio
soco.config.EVENTS_MODULE = events_asyncio
def print_event(event):
try:
pprint(event.variables)
except Exception as e:
print("There was an error in print_event:", e)
def _get_device():
device = soco.discover().pop().group.coordinator
print(device.player_name)
return device
async def main():
# pick a device at random and use it to get
# the group coordinator
loop = asyncio.get_event_loop()
device = await loop.run_in_executor(None, _get_device)
sub = await device.renderingControl.subscribe()
sub2 = await device.avTransport.subscribe()
sub.callback = print_event
sub2.callback = print_event
async def before_shutdown():
await sub.unsubscribe()
await sub2.unsubscribe()
await events_asyncio.event_listener.async_stop()
await asyncio.sleep(100)
await before_shutdown()
if __name__ == "__main__":
asyncio.run(main())
"""
import logging
import socket
import time
import asyncio
from aiohttp import ClientSession, web
# Event is imported for compatibility with events.py
# pylint: disable=unused-import
from .events_base import Event # noqa: F401
from .events_base import ( # noqa: E402
get_listen_ip,
parse_event_xml,
EventNotifyHandlerBase,
EventListenerBase,
SubscriptionBase,
SubscriptionsMap,
)
from .exceptions import SoCoException # noqa: E402
log = logging.getLogger(__name__) # pylint: disable=C0103
[docs]class EventNotifyHandler(EventNotifyHandlerBase):
"""Handles HTTP ``NOTIFY`` Verbs sent to the listener server.
Inherits from `soco.events_base.EventNotifyHandlerBase`.
"""
def __init__(self):
super().__init__()
# The SubscriptionsMapAio instance created when this module is
# imported. This is referenced by
# soco.events_base.EventNotifyHandlerBase.
self.subscriptions_map = subscriptions_map
[docs] async def notify(self, request):
"""Serve a ``NOTIFY`` request by calling `handle_notification`
with the headers and content.
"""
content = await request.text()
seq = request.headers["seq"] # Event sequence number
sid = request.headers["sid"] # Event Subscription Identifier
# find the relevant service from the sid
# pylint: disable=no-member
subscription = self.subscriptions_map.get_subscription(sid)
# It might have been removed by another thread
if subscription:
timestamp = time.time()
service = subscription.service
self.log_event(seq, service.service_id, timestamp)
log.debug("Event content: %s", content)
if "x-sonos-http" in content:
# parse_event_xml will generate I/O if
# x-sonos-http is in the content
variables = await asyncio.get_event_loop().run_in_executor(
None, parse_event_xml, content
)
else:
variables = parse_event_xml(content)
# Build the Event object
event = Event(sid, seq, service, timestamp, variables)
# pass the event details on to the service so it can update
# its cache.
# pylint: disable=protected-access
service._update_cache_on_event(event)
# Pass the event on for handling
subscription.send_event(event)
else:
log.debug("No service registered for %s", sid)
return web.Response(text="OK", status=200)
# pylint: disable=no-self-use, missing-docstring
def log_event(self, seq, service_id, timestamp):
log.info("Event %s received for %s service at %s", seq, service_id, timestamp)
[docs]class EventListener(EventListenerBase): # pylint: disable=too-many-instance-attributes
"""The Event Listener.
Runs an http server which is an endpoint for ``NOTIFY``
requests from Sonos devices. Inherits from
`soco.events_base.EventListenerBase`.
"""
def __init__(self):
super().__init__()
self.sock = None
self.ip_address = None
self.port = None
self.runner = None
self.site = None
self.session = None
self.start_lock = None
[docs] def start(self, any_zone):
"""A stub since the first subscribe calls async_start."""
return
[docs] def listen(self, ip_address):
"""A stub since since async_listen is used."""
return
[docs] async def async_start(self, any_zone):
"""Start the event listener listening on the local machine under the lock.
Args:
any_zone (SoCo): Any Sonos device on the network. It does not
matter which device. It is used only to find a local IP
address reachable by the Sonos net.
"""
if not self.session:
self.session = ClientSession()
if not self.start_lock:
self.start_lock = asyncio.Lock()
async with self.start_lock:
if self.is_running:
return
# Use configured IP address if there is one, else detect
# automatically.
ip_address = get_listen_ip(any_zone.ip_address)
if not ip_address:
log.exception("Could not start Event Listener: check network.")
# Otherwise, no point trying to start server
return
port = await self.async_listen(ip_address)
if not port:
return
self.address = (ip_address, port)
self.is_running = True
log.debug("Event Listener started")
[docs] async def async_listen(self, ip_address):
"""Start the event listener listening on the local machine at
port 1400 (default). If this port is unavailable, the
listener will attempt to listen on the next available port,
within a range of 100.
Make sure that your firewall allows connections to this port.
This method is called by `soco.events_base.EventListenerBase.start`
Handling of requests is delegated to an instance of the
`EventNotifyHandler` class.
Args:
ip_address (str): The local network interface on which the server
should start listening.
Returns:
int: The port on which the server is listening.
Note:
The port on which the event listener listens is configurable.
See `config.EVENT_LISTENER_PORT`
"""
for port_number in range(
self.requested_port_number, self.requested_port_number + 100
):
try:
if port_number > self.requested_port_number:
log.warning("Trying next port (%d)", port_number)
# pylint: disable=no-member
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((ip_address, port_number))
sock.listen(200)
self.sock = sock
self.port = port_number
break
# pylint: disable=invalid-name
except socket.error as e:
log.warning(e)
continue
if not self.port:
return None
self.ip_address = ip_address
await self._async_start()
return self.port
async def _async_start(self):
"""Start the site."""
handler = EventNotifyHandler()
app = web.Application()
app.add_routes([web.route("notify", "", handler.notify)])
self.runner = web.AppRunner(app)
await self.runner.setup()
self.site = web.SockSite(self.runner, self.sock)
await self.site.start()
log.info("Event listener running on %s", (self.ip_address, self.port))
[docs] async def async_stop(self):
"""Stop the listener."""
await self.site.stop()
await self.runner.cleanup()
await self.session.close()
self.sock.close()
self.sock = None
self.port = None
self.ip_address = None
# pylint: disable=unused-argument
[docs] def stop_listening(self, address):
"""Stop the listener."""
asyncio.ensure_future(self.async_stop())
[docs]class Subscription(SubscriptionBase):
"""A class representing the subscription to a UPnP event.
Inherits from `soco.events_base.SubscriptionBase`.
"""
def __init__(self, service, callback=None):
"""
Args:
service (Service): The SoCo `Service` to which the subscription
should be made.
event_queue (:class:`~queue.Queue`): A queue on which received
events will be put. If not specified, a queue will be
created and used.
"""
super().__init__(service, None)
#: :py:obj:`function`: callback function to be called whenever an
#: `Event` is received. If it is set and is callable, the callback
#: function will be called with the `Event` as the only parameter and
#: the Subscription's event queue won't be used.
self.callback = callback
# The SubscriptionsMapAio instance created when this module is
# imported. This is referenced by soco.events_base.SubscriptionBase.
self.subscriptions_map = subscriptions_map
# The EventListener instance created when this module is imported.
# This is referenced by soco.events_base.SubscriptionBase.
self.event_listener = event_listener
# Used to keep track of the auto_renew loop
self._auto_renew_task = None
# pylint: disable=arguments-differ
[docs] def subscribe(self, requested_timeout=None, auto_renew=False, strict=True):
"""Subscribe to the service.
If requested_timeout is provided, a subscription valid for that number
of seconds will be requested, but not guaranteed. Check
`timeout` on return to find out what period of validity is
actually allocated.
This method calls `events_base.SubscriptionBase.subscribe`.
Note:
SoCo will try to unsubscribe any subscriptions which are still
subscribed on program termination, but it is good practice for
you to clean up by making sure that you call :meth:`unsubscribe`
yourself.
Args:
requested_timeout(int, optional): The timeout to be requested.
auto_renew (bool, optional): If `True`, renew the subscription
automatically shortly before timeout. Default `False`.
strict (bool, optional): If True and an Exception occurs during
execution, the Exception will be raised or, if False, the
Exception will be logged and the Subscription instance will be
returned. Default `True`.
Returns:
`Subscription`: The Subscription instance.
"""
self.subscriptions_map.subscribing()
future = asyncio.Future()
subscribe = super().subscribe
async def _async_wrap_subscribe():
try:
if not self.event_listener.is_running:
await self.event_listener.async_start(self.service.soco)
await subscribe(requested_timeout, auto_renew)
future.set_result(self)
except SoCoException as ex:
future.set_exception(ex)
except Exception as exc: # pylint: disable=broad-except
self._cancel_subscription(exc)
if strict:
future.set_exception(exc)
else:
self._log_exception(exc)
future.set_result(self)
finally:
self.subscriptions_map.finished_subscribing()
asyncio.ensure_future(_async_wrap_subscribe())
return future
def _log_exception(self, exc):
"""Log an exception during subscription."""
msg = (
"An Exception occurred: {}.".format(exc)
+ " Subscription to {},".format(
self.service.base_url + self.service.event_subscription_url
)
+ " sid: {} has been cancelled".format(self.sid)
)
log.exception(msg)
[docs] async def renew(
self, requested_timeout=None, is_autorenew=False, strict=True
): # pylint: disable=invalid-overridden-method
"""renew(requested_timeout=None)
Renew the event subscription.
You should not try to renew a subscription which has been
unsubscribed, or once it has expired.
This method calls `events_base.SubscriptionBase.renew`.
Args:
requested_timeout (int, optional): The period for which a renewal
request should be made. If None (the default), use the timeout
requested on subscription.
is_autorenew (bool, optional): Whether this is an autorenewal.
Default `False`.
strict (bool, optional): If True and an Exception occurs during
execution, the Exception will be raised or, if False, the
Exception will be logged and the Subscription instance will be
returned. Default `True`.
Returns:
`Subscription`: The Subscription instance.
"""
try:
return await self._wrap(super().renew, requested_timeout, is_autorenew)
except Exception as exc: # pylint: disable=broad-except
msg = (
"An Exception occurred. Subscription to"
+ " {}, sid: {} has been cancelled".format(
self.service.base_url + self.service.event_subscription_url,
self.sid,
)
)
log.exception(msg)
self._cancel_subscription(msg)
if self.auto_renew_fail is not None:
if hasattr(self.auto_renew_fail, "__call__"):
# pylint: disable=not-callable
self.auto_renew_fail(exc)
if strict:
raise
self._log_exception(exc)
return self
[docs] async def unsubscribe(
self, strict=True
): # pylint: disable=invalid-overridden-method
"""unsubscribe()
Unsubscribe from the service's events.
Once unsubscribed, a Subscription instance should not be reused
This method calls `events_base.SubscriptionBase.unsubscribe`.
Args:
strict (bool, optional): If True and an Exception occurs during
execution, the Exception will be raised or, if False, the
Exception will be logged and the Subscription instance will be
returned. Default `True`.
Returns:
`Subscription`: The Subscription instance.
"""
try:
return await self._wrap(super().unsubscribe)
except Exception as exc: # pylint: disable=broad-except
if strict:
raise
self._log_exception(exc)
return self
def _auto_renew_start(self, interval):
"""Starts the auto_renew loop."""
self._auto_renew_task = asyncio.get_event_loop().call_later(
interval, self._auto_renew_run, interval
)
def _auto_renew_run(self, interval):
asyncio.ensure_future(self.renew(is_autorenew=True))
self._auto_renew_start(interval)
def _auto_renew_cancel(self):
"""Cancels the auto_renew loop"""
if self._auto_renew_task:
self._auto_renew_task.cancel()
self._auto_renew_task = None
# pylint: disable=no-self-use, too-many-branches, too-many-arguments
def _request(self, method, url, headers, success):
"""Sends an HTTP request.
Args:
method (str): 'SUBSCRIBE' or 'UNSUBSCRIBE'.
url (str): The full endpoint to which the request is being sent.
headers (dict): A dict of headers, each key and each value being
of type `str`.
success (function): A function to be called if the
request succeeds. The function will be called with a dict
of response headers as its only parameter.
"""
async def _async_make_request():
response = await self.event_listener.session.request(
method, url, headers=headers
)
if response.ok:
success(response.headers)
return _async_make_request()
async def _wrap(self, method, *args):
"""Wrap a call into an awaitable."""
future = asyncio.Future()
def _wrap_action():
try:
future.set_result(method(*args))
except Exception as ex: # pylint: disable=broad-except
future.set_exception(ex)
asyncio.get_event_loop().call_soon(_wrap_action)
return await future
[docs]class nullcontext: # pylint: disable=invalid-name
"""Context manager that does no additional processing.
Backport from python 3.7+ for older pythons.
"""
def __init__(self, enter_result=None):
self.enter_result = enter_result
def __enter__(self):
return self.enter_result
def __exit__(self, *excinfo):
pass
[docs]class SubscriptionsMapAio(SubscriptionsMap):
"""Maintains a mapping of sids to `soco.events_asyncio.Subscription`
instances. Registers each subscription to be unsubscribed at exit.
Inherits from `soco.events_base.SubscriptionsMap`.
"""
def __init__(self):
super().__init__()
# A counter of calls to Subscription.subscribe
# that have started but not completed. This is
# to prevent the event listener from being stopped prematurely
self._pending = 0
self.subscriptions_lock = nullcontext()
[docs] def register(self, subscription):
"""Register a subscription by updating local mapping of sid to
subscription and registering it to be unsubscribed at exit.
Args:
subscription(`soco.events_asyncio.Subscription`): the subscription
to be registered.
"""
# Add the subscription to the local dict of subscriptions so it
# can be looked up by sid
self.subscriptions[subscription.sid] = subscription
[docs] def subscribing(self):
"""Called when the `Subscription.subscribe` method
commences execution.
"""
# Increment the counter
self._pending += 1
[docs] def finished_subscribing(self):
"""Called when the `Subscription.subscribe` method
completes execution.
"""
# Decrement the counter
self._pending -= 1
@property
def count(self):
"""
`int`: The number of active or pending subscriptions.
"""
return len(self.subscriptions) + self._pending
subscriptions_map = SubscriptionsMapAio() # pylint: disable=C0103
event_listener = EventListener() # pylint: disable=C0103