Source code for soco.events_asyncio

"""Classes to handle Sonos UPnP Events and Subscriptions using asyncio.

The `Subscription` class from this module will be used in
:py:mod:`soco.services` if `config.EVENTS_MODULE` is set
to point to this module.

Example:

    Run this code, and change your volume, tracks etc::

        import logging

        logging.basicConfig()
        import soco
        import asyncio
        from pprint import pprint

        from soco import events_asyncio

        soco.config.EVENTS_MODULE = events_asyncio


        def print_event(event):
            try:
                pprint(event.variables)
            except Exception as e:
                print("There was an error in print_event:", e)


        def _get_device():
            device = soco.discover().pop().group.coordinator
            print(device.player_name)
            return device


        async def main():
            # pick a device at random and use it to get
            # the group coordinator
            loop = asyncio.get_event_loop()
            device = await loop.run_in_executor(None, _get_device)
            sub = await device.renderingControl.subscribe()
            sub2 = await device.avTransport.subscribe()
            sub.callback = print_event
            sub2.callback = print_event

            async def before_shutdown():
                await sub.unsubscribe()
                await sub2.unsubscribe()
                await events_asyncio.event_listener.async_stop()

            await asyncio.sleep(100)
            await before_shutdown()


        if __name__ == "__main__":
            asyncio.run(main())

"""

import logging
import socket
import time
import asyncio

from aiohttp import ClientSession, web

# Event is imported for compatibility with events.py
# pylint: disable=unused-import
from .events_base import Event  # noqa: F401

from .events_base import (  # noqa: E402
    get_listen_ip,
    parse_event_xml,
    EventNotifyHandlerBase,
    EventListenerBase,
    SubscriptionBase,
    SubscriptionsMap,
)

from .exceptions import SoCoException  # noqa: E402

log = logging.getLogger(__name__)  # pylint: disable=C0103


[docs]class EventNotifyHandler(EventNotifyHandlerBase): """Handles HTTP ``NOTIFY`` Verbs sent to the listener server. Inherits from `soco.events_base.EventNotifyHandlerBase`. """ def __init__(self): super().__init__() # The SubscriptionsMapAio instance created when this module is # imported. This is referenced by # soco.events_base.EventNotifyHandlerBase. self.subscriptions_map = subscriptions_map
[docs] async def notify(self, request): """Serve a ``NOTIFY`` request by calling `handle_notification` with the headers and content. """ content = await request.text() seq = request.headers["seq"] # Event sequence number sid = request.headers["sid"] # Event Subscription Identifier # find the relevant service from the sid # pylint: disable=no-member subscription = self.subscriptions_map.get_subscription(sid) # It might have been removed by another thread if subscription: timestamp = time.time() service = subscription.service self.log_event(seq, service.service_id, timestamp) log.debug("Event content: %s", content) if "x-sonos-http" in content: # parse_event_xml will generate I/O if # x-sonos-http is in the content variables = await asyncio.get_event_loop().run_in_executor( None, parse_event_xml, content ) else: variables = parse_event_xml(content) # Build the Event object event = Event(sid, seq, service, timestamp, variables) # pass the event details on to the service so it can update # its cache. # pylint: disable=protected-access service._update_cache_on_event(event) # Pass the event on for handling subscription.send_event(event) else: log.debug("No service registered for %s", sid) return web.Response(text="OK", status=200)
# pylint: disable=no-self-use, missing-docstring def log_event(self, seq, service_id, timestamp): log.info("Event %s received for %s service at %s", seq, service_id, timestamp)
[docs]class EventListener(EventListenerBase): # pylint: disable=too-many-instance-attributes """The Event Listener. Runs an http server which is an endpoint for ``NOTIFY`` requests from Sonos devices. Inherits from `soco.events_base.EventListenerBase`. """ def __init__(self): super().__init__() self.sock = None self.ip_address = None self.port = None self.runner = None self.site = None self.session = None self.start_lock = None
[docs] def start(self, any_zone): """A stub since the first subscribe calls async_start.""" return
[docs] def listen(self, ip_address): """A stub since since async_listen is used.""" return
[docs] async def async_start(self, any_zone): """Start the event listener listening on the local machine under the lock. Args: any_zone (SoCo): Any Sonos device on the network. It does not matter which device. It is used only to find a local IP address reachable by the Sonos net. """ if not self.session: self.session = ClientSession() if not self.start_lock: self.start_lock = asyncio.Lock() async with self.start_lock: if self.is_running: return # Use configured IP address if there is one, else detect # automatically. ip_address = get_listen_ip(any_zone.ip_address) if not ip_address: log.exception("Could not start Event Listener: check network.") # Otherwise, no point trying to start server return port = await self.async_listen(ip_address) if not port: return self.address = (ip_address, port) self.is_running = True log.debug("Event Listener started")
[docs] async def async_listen(self, ip_address): """Start the event listener listening on the local machine at port 1400 (default). If this port is unavailable, the listener will attempt to listen on the next available port, within a range of 100. Make sure that your firewall allows connections to this port. This method is called by `soco.events_base.EventListenerBase.start` Handling of requests is delegated to an instance of the `EventNotifyHandler` class. Args: ip_address (str): The local network interface on which the server should start listening. Returns: int: The port on which the server is listening. Note: The port on which the event listener listens is configurable. See `config.EVENT_LISTENER_PORT` """ for port_number in range( self.requested_port_number, self.requested_port_number + 100 ): try: if port_number > self.requested_port_number: log.warning("Trying next port (%d)", port_number) # pylint: disable=no-member sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind((ip_address, port_number)) sock.listen(200) self.sock = sock self.port = port_number break # pylint: disable=invalid-name except socket.error as e: log.warning(e) continue if not self.port: return None self.ip_address = ip_address await self._async_start() return self.port
async def _async_start(self): """Start the site.""" handler = EventNotifyHandler() app = web.Application() app.add_routes([web.route("notify", "", handler.notify)]) self.runner = web.AppRunner(app) await self.runner.setup() self.site = web.SockSite(self.runner, self.sock) await self.site.start() log.info("Event listener running on %s", (self.ip_address, self.port))
[docs] async def async_stop(self): """Stop the listener.""" await self.site.stop() await self.runner.cleanup() await self.session.close() self.sock.close() self.sock = None self.port = None self.ip_address = None
# pylint: disable=unused-argument
[docs] def stop_listening(self, address): """Stop the listener.""" asyncio.ensure_future(self.async_stop())
[docs]class Subscription(SubscriptionBase): """A class representing the subscription to a UPnP event. Inherits from `soco.events_base.SubscriptionBase`. """ def __init__(self, service, callback=None): """ Args: service (Service): The SoCo `Service` to which the subscription should be made. event_queue (:class:`~queue.Queue`): A queue on which received events will be put. If not specified, a queue will be created and used. """ super().__init__(service, None) #: :py:obj:`function`: callback function to be called whenever an #: `Event` is received. If it is set and is callable, the callback #: function will be called with the `Event` as the only parameter and #: the Subscription's event queue won't be used. self.callback = callback # The SubscriptionsMapAio instance created when this module is # imported. This is referenced by soco.events_base.SubscriptionBase. self.subscriptions_map = subscriptions_map # The EventListener instance created when this module is imported. # This is referenced by soco.events_base.SubscriptionBase. self.event_listener = event_listener # Used to keep track of the auto_renew loop self._auto_renew_task = None # pylint: disable=arguments-differ
[docs] def subscribe(self, requested_timeout=None, auto_renew=False, strict=True): """Subscribe to the service. If requested_timeout is provided, a subscription valid for that number of seconds will be requested, but not guaranteed. Check `timeout` on return to find out what period of validity is actually allocated. This method calls `events_base.SubscriptionBase.subscribe`. Note: SoCo will try to unsubscribe any subscriptions which are still subscribed on program termination, but it is good practice for you to clean up by making sure that you call :meth:`unsubscribe` yourself. Args: requested_timeout(int, optional): The timeout to be requested. auto_renew (bool, optional): If `True`, renew the subscription automatically shortly before timeout. Default `False`. strict (bool, optional): If True and an Exception occurs during execution, the Exception will be raised or, if False, the Exception will be logged and the Subscription instance will be returned. Default `True`. Returns: `Subscription`: The Subscription instance. """ self.subscriptions_map.subscribing() future = asyncio.Future() subscribe = super().subscribe async def _async_wrap_subscribe(): try: if not self.event_listener.is_running: await self.event_listener.async_start(self.service.soco) await subscribe(requested_timeout, auto_renew) future.set_result(self) except SoCoException as ex: future.set_exception(ex) except Exception as exc: # pylint: disable=broad-except self._cancel_subscription(exc) if strict: future.set_exception(exc) else: self._log_exception(exc) future.set_result(self) finally: self.subscriptions_map.finished_subscribing() asyncio.ensure_future(_async_wrap_subscribe()) return future
def _log_exception(self, exc): """Log an exception during subscription.""" msg = ( "An Exception occurred: {}.".format(exc) + " Subscription to {},".format( self.service.base_url + self.service.event_subscription_url ) + " sid: {} has been cancelled".format(self.sid) ) log.exception(msg)
[docs] async def renew( self, requested_timeout=None, is_autorenew=False, strict=True ): # pylint: disable=invalid-overridden-method """renew(requested_timeout=None) Renew the event subscription. You should not try to renew a subscription which has been unsubscribed, or once it has expired. This method calls `events_base.SubscriptionBase.renew`. Args: requested_timeout (int, optional): The period for which a renewal request should be made. If None (the default), use the timeout requested on subscription. is_autorenew (bool, optional): Whether this is an autorenewal. Default `False`. strict (bool, optional): If True and an Exception occurs during execution, the Exception will be raised or, if False, the Exception will be logged and the Subscription instance will be returned. Default `True`. Returns: `Subscription`: The Subscription instance. """ try: return await self._wrap(super().renew, requested_timeout, is_autorenew) except Exception as exc: # pylint: disable=broad-except msg = ( "An Exception occurred. Subscription to" + " {}, sid: {} has been cancelled".format( self.service.base_url + self.service.event_subscription_url, self.sid, ) ) log.exception(msg) self._cancel_subscription(msg) if self.auto_renew_fail is not None: if hasattr(self.auto_renew_fail, "__call__"): # pylint: disable=not-callable self.auto_renew_fail(exc) if strict: raise self._log_exception(exc) return self
[docs] async def unsubscribe( self, strict=True ): # pylint: disable=invalid-overridden-method """unsubscribe() Unsubscribe from the service's events. Once unsubscribed, a Subscription instance should not be reused This method calls `events_base.SubscriptionBase.unsubscribe`. Args: strict (bool, optional): If True and an Exception occurs during execution, the Exception will be raised or, if False, the Exception will be logged and the Subscription instance will be returned. Default `True`. Returns: `Subscription`: The Subscription instance. """ try: return await self._wrap(super().unsubscribe) except Exception as exc: # pylint: disable=broad-except if strict: raise self._log_exception(exc) return self
def _auto_renew_start(self, interval): """Starts the auto_renew loop.""" self._auto_renew_task = asyncio.get_event_loop().call_later( interval, self._auto_renew_run, interval ) def _auto_renew_run(self, interval): asyncio.ensure_future(self.renew(is_autorenew=True)) self._auto_renew_start(interval) def _auto_renew_cancel(self): """Cancels the auto_renew loop""" if self._auto_renew_task: self._auto_renew_task.cancel() self._auto_renew_task = None # pylint: disable=no-self-use, too-many-branches, too-many-arguments def _request(self, method, url, headers, success): """Sends an HTTP request. Args: method (str): 'SUBSCRIBE' or 'UNSUBSCRIBE'. url (str): The full endpoint to which the request is being sent. headers (dict): A dict of headers, each key and each value being of type `str`. success (function): A function to be called if the request succeeds. The function will be called with a dict of response headers as its only parameter. """ async def _async_make_request(): response = await self.event_listener.session.request( method, url, headers=headers ) if response.ok: success(response.headers) return _async_make_request() async def _wrap(self, method, *args): """Wrap a call into an awaitable.""" future = asyncio.Future() def _wrap_action(): try: future.set_result(method(*args)) except Exception as ex: # pylint: disable=broad-except future.set_exception(ex) asyncio.get_event_loop().call_soon(_wrap_action) return await future
[docs]class nullcontext: # pylint: disable=invalid-name """Context manager that does no additional processing. Backport from python 3.7+ for older pythons. """ def __init__(self, enter_result=None): self.enter_result = enter_result def __enter__(self): return self.enter_result def __exit__(self, *excinfo): pass
[docs]class SubscriptionsMapAio(SubscriptionsMap): """Maintains a mapping of sids to `soco.events_asyncio.Subscription` instances. Registers each subscription to be unsubscribed at exit. Inherits from `soco.events_base.SubscriptionsMap`. """ def __init__(self): super().__init__() # A counter of calls to Subscription.subscribe # that have started but not completed. This is # to prevent the event listener from being stopped prematurely self._pending = 0 self.subscriptions_lock = nullcontext()
[docs] def register(self, subscription): """Register a subscription by updating local mapping of sid to subscription and registering it to be unsubscribed at exit. Args: subscription(`soco.events_asyncio.Subscription`): the subscription to be registered. """ # Add the subscription to the local dict of subscriptions so it # can be looked up by sid self.subscriptions[subscription.sid] = subscription
[docs] def subscribing(self): """Called when the `Subscription.subscribe` method commences execution. """ # Increment the counter self._pending += 1
[docs] def finished_subscribing(self): """Called when the `Subscription.subscribe` method completes execution. """ # Decrement the counter self._pending -= 1
@property def count(self): """ `int`: The number of active or pending subscriptions. """ return len(self.subscriptions) + self._pending
subscriptions_map = SubscriptionsMapAio() # pylint: disable=C0103 event_listener = EventListener() # pylint: disable=C0103