Source code for mopidy.mpd.protocol

This is Mopidy's MPD protocol implementation.

This is partly based upon the `MPD protocol documentation
<>`_, which is a useful resource, but it is
rather incomplete with regards to data formats, both for requests and
responses. Thus, we have had to talk a great deal with the the original `MPD
server <>`_ using telnet to get the details we need to
implement our own MPD server which is compatible with the numerous existing
`MPD clients <>`_.

import inspect

from mopidy.mpd import exceptions

#: The MPD protocol uses UTF-8 for encoding all data.

#: The MPD protocol uses ``\n`` as line terminator.

#: The MPD protocol version is 0.19.0.
VERSION = "0.19.0"

[docs]def load_protocol_modules(): """ The protocol modules must be imported to get them registered in :attr:`commands`. """ from . import ( # noqa audio_output, channels, command_list, connection, current_playlist, mount, music_db, playback, reflection, status, stickers, stored_playlists, )
[docs]def INT(value): # noqa: N802 r"""Converts a value that matches [+-]?\d+ into an integer.""" if value is None: raise ValueError("None is not a valid integer") # TODO: check for whitespace via value != value.strip()? return int(value)
[docs]def UINT(value): # noqa: N802 r"""Converts a value that matches \d+ into an integer.""" if value is None: raise ValueError("None is not a valid integer") if not value.isdigit(): raise ValueError("Only positive numbers are allowed") return int(value)
[docs]def FLOAT(value): # noqa: N802 r"""Converts a value that matches [+-]\d+(.\d+)? into a float.""" if value is None: raise ValueError("None is not a valid float") return float(value)
[docs]def UFLOAT(value): # noqa: N802 r"""Converts a value that matches \d+(.\d+)? into a float.""" if value is None: raise ValueError("None is not a valid float") value = float(value) if value < 0: raise ValueError("Only positive numbers are allowed") return value
[docs]def BOOL(value): # noqa: N802 """Convert the values 0 and 1 into booleans.""" if value in ("1", "0"): return bool(int(value)) raise ValueError(f"{value!r} is not 0 or 1")
[docs]def RANGE(value): # noqa: N802 """Convert a single integer or range spec into a slice ``n`` should become ``slice(n, n+1)`` ``n:`` should become ``slice(n, None)`` ``n:m`` should become ``slice(n, m)`` and ``m > n`` must hold """ if ":" in value: start, stop = value.split(":", 1) start = UINT(start) if stop.strip(): stop = UINT(stop) if start >= stop: raise ValueError("End must be larger than start") else: stop = None else: start = UINT(value) stop = start + 1 return slice(start, stop)
[docs]class Commands: """Collection of MPD commands to expose to users. Normally used through the global instance which command handlers have been installed into. """ def __init__(self): self.handlers = {} # TODO: consider removing auth_required and list_command in favour of # additional command instances to register in?
[docs] def add(self, name, auth_required=True, list_command=True, **validators): """Create a decorator that registers a handler and validation rules. Additional keyword arguments are treated as converters/validators to apply to tokens converting them to proper Python types. Requirements for valid handlers: - must accept a context argument as the first arg. - may not use variable keyword arguments, ``**kwargs``. - may use variable arguments ``*args`` *or* a mix of required and optional arguments. Decorator returns the unwrapped function so that tests etc can use the functions with values with correct python types instead of strings. :param string name: Name of the command being registered. :param bool auth_required: If authorization is required. :param bool list_command: If command should be listed in reflection. """ def wrapper(func): if name in self.handlers: raise ValueError(f"{name} already registered") spec = inspect.getfullargspec(func) defaults = dict( zip(spec.args[-len(spec.defaults or []) :], spec.defaults or []) ) if not spec.args and not spec.varargs: raise TypeError("Handler must accept at least one argument.") if len(spec.args) > 1 and spec.varargs: raise TypeError( "*args may not be combined with regular arguments" ) if not set(validators.keys()).issubset(spec.args): raise TypeError("Validator for non-existent arg passed") if spec.varkw or spec.kwonlyargs: raise TypeError("Keyword arguments are not permitted") def validate(*args, **kwargs): if spec.varargs: return func(*args, **kwargs) try: ba = inspect.signature(func).bind(*args, **kwargs) ba.apply_defaults() callargs = ba.arguments except TypeError: raise exceptions.MpdArgError( f'wrong number of arguments for "{name}"' ) for key, value in callargs.items(): default = defaults.get(key, object()) if key in validators and value != default: try: callargs[key] = validators[key](value) except ValueError: raise exceptions.MpdArgError("incorrect arguments") return func(**callargs) validate.auth_required = auth_required validate.list_command = list_command self.handlers[name] = validate return func return wrapper
[docs] def call(self, tokens, context=None): """Find and run the handler registered for the given command. If the handler was registered with any converters/validators they will be run before calling the real handler. :param list tokens: List of tokens to process :param context: MPD context. :type context: :class:`~mopidy.mpd.dispatcher.MpdContext` """ if not tokens: raise exceptions.MpdNoCommand() if tokens[0] not in self.handlers: raise exceptions.MpdUnknownCommand(command=tokens[0]) return self.handlers[tokens[0]](context, *tokens[1:])
#: Global instance to install commands into commands = Commands()