# -*- coding: utf-8 -*-
# pylint: disable=not-context-manager

# NOTE: The pylint not-content-manager warning is disabled pending the fix of
# a bug in pylint:

# Disable while we have Python 2.x compatability
# pylint: disable=useless-object-inheritance

"""Classes to handle Sonos UPnP Events and Subscriptions."""

from __future__ import unicode_literals

import atexit
import logging
import socket
import threading
import time
import weakref

import requests

from . import config
from .compat import (
    Queue, BaseHTTPRequestHandler, URLError, socketserver, urlopen
from .data_structures_entry import from_didl_string
from .exceptions import SoCoException, SoCoFault, EventParseException
from .utils import camel_to_underscore
from .xml import XML

log = logging.getLogger(__name__)  # pylint: disable=C0103

# pylint: disable=too-many-branches
[docs]def parse_event_xml(xml_event): """Parse the body of a UPnP event. Args: xml_event (bytes): bytes containing the body of the event encoded with utf-8. Returns: dict: A dict with keys representing the evented variables. The relevant value will usually be a string representation of the variable's value, but may on occasion be: * a dict (eg when the volume changes, the value will itself be a dict containing the volume for each channel: :code:`{'Volume': {'LF': '100', 'RF': '100', 'Master': '36'}}`) * an instance of a `DidlObject` subclass (eg if it represents track metadata). * a `SoCoFault` (if a variable contains illegal metadata) Example: Run this code, and change your volume, tracks etc:: from __future__ import print_function try: from queue import Empty except: # Py2.7 from Queue import Empty import soco from pprint import pprint from import event_listener # pick a device at random device = print (device.player_name) sub = device.renderingControl.subscribe() sub2 = device.avTransport.subscribe() while True: try: event = pprint (event.variables) except Empty: pass try: event = pprint (event.variables) except Empty: pass except KeyboardInterrupt: sub.unsubscribe() sub2.unsubscribe() event_listener.stop() break """ result = {} tree = XML.fromstring(xml_event) # property values are just under the propertyset, which # uses this namespace properties = tree.findall( '{urn:schemas-upnp-org:event-1-0}property') for prop in properties: # pylint: disable=too-many-nested-blocks for variable in prop: # Special handling for a LastChange event specially. For details on # LastChange events, see # # and if variable.tag == "LastChange": last_change_tree = XML.fromstring( variable.text.encode('utf-8')) # We assume there is only one InstanceID tag. This is true for # Sonos, as far as we know. # InstanceID can be in one of two namespaces, depending on # whether we are looking at an avTransport event, a # renderingControl event, or a Queue event # (there, it is named QueueID) instance = last_change_tree.find( "{urn:schemas-upnp-org:metadata-1-0/AVT/}InstanceID") if instance is None: instance = last_change_tree.find( "{urn:schemas-upnp-org:metadata-1-0/RCS/}InstanceID") if instance is None: instance = last_change_tree.find( "{urn:schemas-sonos-com:metadata-1-0/Queue/}QueueID") # Look at each variable within the LastChange event for last_change_var in instance: tag = last_change_var.tag # Remove any namespaces from the tags if tag.startswith('{'): tag = tag.split('}', 1)[1] # Un-camel case it tag = camel_to_underscore(tag) # Now extract the relevant value for the variable. # The UPnP specs suggest that the value of any variable # evented via a LastChange Event will be in the 'val' # attribute, but audio related variables may also have a # 'channel' attribute. In addition, it seems that Sonos # sometimes uses a text value instead: see # value = last_change_var.get('val') if value is None: value = last_change_var.text # If DIDL metadata is returned, convert it to a music # library data structure if value.startswith('<DIDL-Lite'): # Wrap any parsing exception in a SoCoFault, so the # user can handle it try: value = from_didl_string(value)[0] except SoCoException as original_exception: log.warning("Event contains illegal metadata" "for '%s'.\n" "Error message: '%s'\n" "The result will be a SoCoFault.", tag, str(original_exception)) event_parse_exception = EventParseException( tag, value, original_exception ) value = SoCoFault(event_parse_exception) channel = last_change_var.get('channel') if channel is not None: if result.get(tag) is None: result[tag] = {} result[tag][channel] = value else: result[tag] = value else: result[camel_to_underscore(variable.tag)] = variable.text return result
[docs]class Event(object): """A read-only object representing a received event. The values of the evented variables can be accessed via the ``variables`` dict, or as attributes on the instance itself. You should treat all attributes as read-only. Args: sid (str): the subscription id. seq (str): the event sequence number for that subscription. timestamp (str): the time that the event was received (from Python's `time.time` function). service (str): the service which is subscribed to the event. variables (dict, optional): contains the ``{names: values}`` of the evented variables. Defaults to `None`. The values may be `SoCoFault` objects if the metadata could not be parsed. Raises: AttributeError: Not all attributes are returned with each event. An `AttributeError` will be raised if you attempt to access as an attribute a variable which was not returned in the event. Example: >>> print event.variables['transport_state'] 'STOPPED' >>> print event.transport_state 'STOPPED' """ # pylint: disable=too-few-public-methods, too-many-arguments def __init__(self, sid, seq, service, timestamp, variables=None): # Initialisation has to be done like this, because __setattr__ is # overridden, and will not allow direct setting of attributes self.__dict__['sid'] = sid self.__dict__['seq'] = seq self.__dict__['timestamp'] = timestamp self.__dict__['service'] = service self.__dict__['variables'] = variables if variables is not None else {} def __getattr__(self, name): if name in self.variables: return self.variables[name] else: raise AttributeError('No such attribute: %s' % name) def __setattr__(self, name, value): """Disable (most) attempts to set attributes. This is not completely foolproof. It just acts as a warning! See `object.__setattr__`. """ raise TypeError('Event object does not support attribute assignment')
[docs]class EventServer(socketserver.ThreadingMixIn, socketserver.TCPServer): """A TCP server which handles each new request in a new thread.""" allow_reuse_address = True
[docs]class EventNotifyHandler(BaseHTTPRequestHandler): """Handles HTTP ``NOTIFY`` Verbs sent to the listener server."""
[docs] def do_NOTIFY(self): # pylint: disable=invalid-name """Serve a ``NOTIFY`` request. A ``NOTIFY`` request will be sent by a Sonos device when a state variable changes. See the `UPnP Spec ยง4.3 [pdf] < -DeviceArchitecture-v1.1.pdf>`_ for details. """ timestamp = time.time() headers = requests.structures.CaseInsensitiveDict(self.headers) seq = headers['seq'] # Event sequence number sid = headers['sid'] # Event Subscription Identifier content_length = int(headers['content-length']) content = # Find the relevant service and queue from the sid with _subscriptions_lock: subscription = _subscriptions.get(sid) # It might have been removed by another thread if subscription: service = subscription.service "Event %s received for %s service on thread %s at %s", seq, service.service_id, threading.current_thread(), timestamp) log.debug("Event content: %s", content) variables = parse_event_xml(content) # Build the Event object event = Event(sid, seq, service, timestamp, variables) # pass the event details on to the service so it can update its # cache. # pylint: disable=protected-access service._update_cache_on_event(event) # Put the event on the queue else:"No service registered for %s", sid) self.send_response(200) self.end_headers()
[docs] def log_message(self, fmt, *args): # pylint: disable=arguments-differ # Divert standard webserver logging to the debug log log.debug(fmt, *args)
[docs]class EventServerThread(threading.Thread): """The thread in which the event listener server will run.""" def __init__(self, address): """ Args: address (tuple): The (ip, port) address on which the server should listen. """ super(EventServerThread, self).__init__() #: `threading.Event`: Used to signal that the server should stop. self.stop_flag = threading.Event() #: `tuple`: The (ip, port) address on which the server is #: configured to listen. self.address = address
[docs] def run(self): """Start the server on the local IP at port 1400 (default). Handling of requests is delegated to an instance of the `EventNotifyHandler` class. """ listener = EventServer(self.address, EventNotifyHandler)"Event listener running on %s", listener.server_address) # Listen for events until told to stop while not self.stop_flag.is_set(): listener.handle_request()
[docs]class EventListener(object): """The Event Listener. Runs an http server in a thread which is an endpoint for ``NOTIFY`` requests from Sonos devices. """ def __init__(self): super(EventListener, self).__init__() #: `bool`: Indicates whether the server is currently running self.is_running = False self._start_lock = threading.Lock() self._listener_thread = None #: `tuple`: The address (ip, port) on which the server is #: configured to listen. # Empty for the moment. (It is set in `start`) self.address = ()
[docs] def start(self, any_zone): """Start the event listener listening on the local machine at port 1400 (default) Make sure that your firewall allows connections to this port Args: any_zone (SoCo): Any Sonos device on the network. It does not matter which device. It is used only to find a local IP address reachable by the Sonos net. Note: The port on which the event listener listens is configurable. See `config.EVENT_LISTENER_PORT` """ # Find our local network IP address which is accessible to the # Sonos net, see with self._start_lock: if not self.is_running: # Use configured IP address if there is one, else detect # automatically. if config.EVENT_LISTENER_IP: ip_address = config.EVENT_LISTENER_IP else: temp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) temp_sock.connect((any_zone.ip_address, config.EVENT_LISTENER_PORT)) ip_address = temp_sock.getsockname()[0] temp_sock.close() # Start the event listener server in a separate thread. self.address = (ip_address, config.EVENT_LISTENER_PORT) self._listener_thread = EventServerThread(self.address) self._listener_thread.daemon = True self._listener_thread.start() self.is_running = True"Event listener started")
[docs] def stop(self): """Stop the event listener.""" # Signal the thread to stop before handling the next request self._listener_thread.stop_flag.set() # Send a dummy request in case the http server is currently listening try: urlopen( 'http://%s:%s/' % (self.address[0], self.address[1])) except URLError: # If the server is already shut down, we receive a socket error, # which we ignore. pass # wait for the thread to finish self._listener_thread.join() self.is_running = False"Event listener stopped")
[docs]class Subscription(object): """A class representing the subscription to a UPnP event.""" # pylint: disable=too-many-instance-attributes def __init__(self, service, event_queue=None): """ Args: service (Service): The SoCo `Service` to which the subscription should be made. event_queue (:class:`~queue.Queue`): A queue on which received events will be put. If not specified, a queue will be created and used. """ super(Subscription, self).__init__() self.service = service #: `str`: A unique ID for this subscription self.sid = None #: `int`: The amount of time in seconds until the subscription expires. self.timeout = None #: `bool`: An indication of whether the subscription is subscribed. self.is_subscribed = False #: :class:`~queue.Queue`: The queue on which events are placed. = Queue() if event_queue is None else event_queue #: `int`: The period (seconds) for which the subscription is requested self.requested_timeout = None # A flag to make sure that an unsubscribed instance is not # resubscribed self._has_been_unsubscribed = False # The time when the subscription was made self._timestamp = None # Used to keep track of the auto_renew thread self._auto_renew_thread = None self._auto_renew_thread_flag = threading.Event()
[docs] def subscribe(self, requested_timeout=None, auto_renew=False): """Subscribe to the service. If requested_timeout is provided, a subscription valid for that number of seconds will be requested, but not guaranteed. Check `timeout` on return to find out what period of validity is actually allocated. Note: SoCo will try to unsubscribe any subscriptions which are still subscribed on program termination, but it is good practice for you to clean up by making sure that you call :meth:`unsubscribe` yourself. Args: requested_timeout(int, optional): The timeout to be requested. auto_renew (bool, optional): If `True`, renew the subscription automatically shortly before timeout. Default `False`. """ class AutoRenewThread(threading.Thread): """Used by the auto_renew code to renew a subscription from within a thread. """ def __init__(self, interval, stop_flag, sub, *args, **kwargs): super(AutoRenewThread, self).__init__(*args, **kwargs) self.interval = interval self.sub = sub self.stop_flag = stop_flag self.daemon = True def run(self): sub = self.sub stop_flag = self.stop_flag interval = self.interval while not stop_flag.wait(interval):"Autorenewing subscription %s", sub.sid) sub.renew() # TIMEOUT is provided for in the UPnP spec, but it is not clear if # Sonos pays any attention to it. A timeout of 86400 secs always seems # to be allocated self.requested_timeout = requested_timeout if self._has_been_unsubscribed: raise SoCoException( 'Cannot resubscribe instance once unsubscribed') service = self.service # The event listener must be running, so start it if not if not event_listener.is_running: event_listener.start(service.soco) # an event subscription looks like this: # SUBSCRIBE publisher path HTTP/1.1 # HOST: publisher host:publisher port # CALLBACK: <delivery URL> # NT: upnp:event # TIMEOUT: Second-requested subscription duration (optional) # pylint: disable=unbalanced-tuple-unpacking ip_address, port = event_listener.address if config.EVENT_ADVERTISE_IP: ip_address = config.EVENT_ADVERTISE_IP headers = { 'Callback': '<http://{}:{}>'.format(ip_address, port), 'NT': 'upnp:event' } if requested_timeout is not None: headers["TIMEOUT"] = "Second-{}".format(requested_timeout) # Lock out EventNotifyHandler during registration with _subscriptions_lock: response = requests.request( 'SUBSCRIBE', service.base_url + service.event_subscription_url, headers=headers) response.raise_for_status() self.sid = response.headers['sid'] timeout = response.headers['timeout'] # According to the spec, timeout can be "infinite" or "second-123" # where 123 is a number of seconds. Sonos uses "Second-123" (with # a capital letter) if timeout.lower() == 'infinite': self.timeout = None else: self.timeout = int(timeout.lstrip('Second-')) self._timestamp = time.time() self.is_subscribed = True "Subscribed to %s, sid: %s", service.base_url + service.event_subscription_url, self.sid) # Add the subscription to the master dict so it can be looked up # by sid _subscriptions[self.sid] = self # Register this subscription to be unsubscribed at exit if still alive # This will not happen if exit is abnormal (eg in response to a # signal or fatal interpreter error - see the docs for `atexit`). atexit.register(self.unsubscribe) # Set up auto_renew if not auto_renew: return # Autorenew just before expiry, say at 85% of self.timeout seconds interval = self.timeout * 85 / 100 auto_renew_thread = AutoRenewThread( interval, self._auto_renew_thread_flag, self) auto_renew_thread.start()
[docs] def renew(self, requested_timeout=None): """Renew the event subscription. You should not try to renew a subscription which has been unsubscribed, or once it has expired. Args: requested_timeout (int, optional): The period for which a renewal request should be made. If None (the default), use the timeout requested on subscription. """ # NB This code is sometimes called from a separate thread (when # subscriptions are auto-renewed. Be careful to ensure thread-safety if self._has_been_unsubscribed: raise SoCoException( 'Cannot renew subscription once unsubscribed') if not self.is_subscribed: raise SoCoException( 'Cannot renew subscription before subscribing') if self.time_left == 0: raise SoCoException( 'Cannot renew subscription after expiry') # SUBSCRIBE publisher path HTTP/1.1 # HOST: publisher host:publisher port # SID: uuid:subscription UUID # TIMEOUT: Second-requested subscription duration (optional) headers = { 'SID': self.sid } if requested_timeout is None: requested_timeout = self.requested_timeout if requested_timeout is not None: headers["TIMEOUT"] = "Second-{}".format(requested_timeout) response = requests.request( 'SUBSCRIBE', self.service.base_url + self.service.event_subscription_url, headers=headers) response.raise_for_status() timeout = response.headers['timeout'] # According to the spec, timeout can be "infinite" or "second-123" # where 123 is a number of seconds. Sonos uses "Second-123" (with a # a capital letter) if timeout.lower() == 'infinite': self.timeout = None else: self.timeout = int(timeout.lstrip('Second-')) self._timestamp = time.time() self.is_subscribed = True "Renewed subscription to %s, sid: %s", self.service.base_url + self.service.event_subscription_url, self.sid)
[docs] def unsubscribe(self): """Unsubscribe from the service's events. Once unsubscribed, a Subscription instance should not be reused """ # Trying to unsubscribe if already unsubscribed, or not yet # subscribed, fails silently if self._has_been_unsubscribed or not self.is_subscribed: return # Cancel any auto renew self._auto_renew_thread_flag.set() # Send an unsubscribe request like this: # UNSUBSCRIBE publisher path HTTP/1.1 # HOST: publisher host:publisher port # SID: uuid:subscription UUID headers = { 'SID': self.sid } response = requests.request( 'UNSUBSCRIBE', self.service.base_url + self.service.event_subscription_url, headers=headers) response.raise_for_status() self.is_subscribed = False self._timestamp = None "Unsubscribed from %s, sid: %s", self.service.base_url + self.service.event_subscription_url, self.sid) # remove queue from event queues and sid to service mappings with _subscriptions_lock: try: del _subscriptions[self.sid] except KeyError: pass self._has_been_unsubscribed = True
@property def time_left(self): """ `int`: The amount of time left until the subscription expires (seconds) If the subscription is unsubscribed (or not yet subscribed), `time_left` is 0. """ if self._timestamp is None: return 0 else: time_left = self.timeout - (time.time() - self._timestamp) return time_left if time_left > 0 else 0 def __enter__(self): if not self.is_subscribed: self.subscribe() return self def __exit__(self, exc_type, exc_val, exc_tb): self.unsubscribe()
# pylint: disable=C0103 event_listener = EventListener() # Thread safe mapping. # Used to store a mapping of sid to subscription. _subscriptions = weakref.WeakValueDictionary() # The lock to go with it # You must only ever access the mapping in the context of this lock, eg: # with _subscriptions_lock: # queue = _subscriptions[sid].events _subscriptions_lock = threading.Lock()