Source code for mopidy.commands

from __future__ import annotations

import argparse
import collections
import contextlib
import logging
import os
import pathlib
import signal
import sys
from typing import TYPE_CHECKING

import pykka
from pykka.messages import ProxyCall

from mopidy import config as config_lib
from mopidy import exceptions
from mopidy.audio import Audio
from mopidy.core import Core
from mopidy.internal import deps, process, timer, versioning
from mopidy.internal.gi import GLib

if TYPE_CHECKING:
    from typing import Optional


logger = logging.getLogger(__name__)

_default_config = [
    (pathlib.Path(base) / "mopidy" / "mopidy.conf").resolve()
    for base in GLib.get_system_config_dirs() + [GLib.get_user_config_dir()]
]
DEFAULT_CONFIG = ":".join(map(str, _default_config))


def config_files_type(value):
    return value.split(":")


def config_override_type(value):
    try:
        section, remainder = value.split("/", 1)
        key, value = remainder.split("=", 1)
        return (section.strip(), key.strip(), value.strip())
    except ValueError:
        raise argparse.ArgumentTypeError(
            f"{value} must have the format section/key=value"
        )


class _ParserError(Exception):
    def __init__(self, message):
        self.message = message


class _HelpError(Exception):
    pass


class _ArgumentParser(argparse.ArgumentParser):
    def error(self, message):
        raise _ParserError(message)


class _HelpAction(argparse.Action):
    def __init__(self, option_strings, dest=None, help=None):
        super().__init__(
            option_strings=option_strings,
            dest=dest or argparse.SUPPRESS,
            default=argparse.SUPPRESS,
            nargs=0,
            help=help,
        )

    def __call__(self, parser, namespace, values, option_string=None):
        raise _HelpError()


[docs] class Command: """Command parser and runner for building trees of commands. This class provides a wraper around :class:`argparse.ArgumentParser` for handling this type of command line application in a better way than argprases own sub-parser handling. """ help: Optional[str] = None #: Help text to display in help output. def __init__(self): self._children = collections.OrderedDict() self._arguments = [] self._overrides = {} def _build(self): actions = [] parser = _ArgumentParser(add_help=False) parser.register("action", "help", _HelpAction) for args, kwargs in self._arguments: actions.append(parser.add_argument(*args, **kwargs)) parser.add_argument( "_args", nargs=argparse.REMAINDER, help=argparse.SUPPRESS ) return parser, actions
[docs] def add_child(self, name, command): """Add a child parser to consider using. :param name: name to use for the sub-command that is being added. :type name: string """ self._children[name] = command
[docs] def add_argument(self, *args, **kwargs): """Add an argument to the parser. This method takes all the same arguments as the :class:`argparse.ArgumentParser` version of this method. """ self._arguments.append((args, kwargs))
[docs] def set(self, **kwargs): """Override a value in the finaly result of parsing.""" self._overrides.update(kwargs)
[docs] def exit(self, status_code=0, message=None, usage=None): """Optionally print a message and exit.""" print("\n\n".join(m for m in (usage, message) if m)) sys.exit(status_code)
[docs] def format_usage(self, prog=None): """Format usage for current parser.""" actions = self._build()[1] prog = prog or os.path.basename(sys.argv[0]) return self._usage(actions, prog) + "\n"
def _usage(self, actions, prog): formatter = argparse.HelpFormatter(prog) formatter.add_usage(None, actions, []) return formatter.format_help().strip()
[docs] def format_help(self, prog=None): """Format help for current parser and children.""" actions = self._build()[1] prog = prog or os.path.basename(sys.argv[0]) formatter = argparse.HelpFormatter(prog) formatter.add_usage(None, actions, []) if self.help: formatter.add_text(self.help) if actions: formatter.add_text("OPTIONS:") formatter.start_section(None) formatter.add_arguments(actions) formatter.end_section() subhelp = [] for name, child in self._children.items(): child._subhelp(name, subhelp) if subhelp: formatter.add_text("COMMANDS:") subhelp.insert(0, "") return formatter.format_help() + "\n".join(subhelp)
def _subhelp(self, name, result): actions = self._build()[1] if self.help or actions: formatter = argparse.HelpFormatter(name) formatter.add_usage(None, actions, [], "") formatter.start_section(None) formatter.add_text(self.help) formatter.start_section(None) formatter.add_arguments(actions) formatter.end_section() formatter.end_section() result.append(formatter.format_help()) for childname, child in self._children.items(): child._subhelp(" ".join((name, childname)), result)
[docs] def parse(self, args, prog=None): """Parse command line arguments. Will recursively parse commands until a final parser is found or an error occurs. In the case of errors we will print a message and exit. Otherwise, any overrides are applied and the current parser stored in the command attribute of the return value. :param args: list of arguments to parse :type args: list of strings :param prog: name to use for program :type prog: string :rtype: :class:`argparse.Namespace` """ prog = prog or os.path.basename(sys.argv[0]) try: return self._parse( args, argparse.Namespace(), self._overrides.copy(), prog ) except _HelpError: self.exit(0, self.format_help(prog))
def _parse(self, args, namespace, overrides, prog): overrides.update(self._overrides) parser, actions = self._build() try: result = parser.parse_args(args, namespace) except _ParserError as exc: self.exit(1, str(exc), self._usage(actions, prog)) if not result._args: for attr, value in overrides.items(): setattr(result, attr, value) delattr(result, "_args") result.command = self return result child = result._args.pop(0) if child not in self._children: usage = self._usage(actions, prog) self.exit(1, f"unrecognized command: {child}", usage) return self._children[child]._parse( result._args, result, overrides, " ".join([prog, child]) )
[docs] def run(self, *args, **kwargs): """Run the command. Must be implemented by sub-classes that are not simply an intermediate in the command namespace. """ raise NotImplementedError
@contextlib.contextmanager def _actor_error_handling(name): try: yield except exceptions.BackendError as exc: logger.error("Backend (%s) initialization error: %s", name, exc) except exceptions.FrontendError as exc: logger.error("Frontend (%s) initialization error: %s", name, exc) except exceptions.MixerError as exc: logger.error("Mixer (%s) initialization error: %s", name, exc) except Exception: logger.exception("Got un-handled exception from %s", name) # TODO: move out of this utility class
[docs] class RootCommand(Command): def __init__(self): super().__init__() self.set(base_verbosity_level=0) self.add_argument( "-h", "--help", action="help", help="Show this message and exit" ) self.add_argument( "--version", action="version", version=f"Mopidy {versioning.get_version()}", ) self.add_argument( "-q", "--quiet", action="store_const", const=-1, dest="verbosity_level", help="less output (warning level)", ) self.add_argument( "-v", "--verbose", action="count", dest="verbosity_level", default=0, help="more output (repeat up to 4 times for even more)", ) self.add_argument( "--config", action="store", dest="config_files", type=config_files_type, default=DEFAULT_CONFIG, metavar="FILES", help="config files to use, colon seperated, later files override", ) self.add_argument( "-o", "--option", action="append", dest="config_overrides", type=config_override_type, metavar="OPTIONS", help="`section/key=value` values to override config options", )
[docs] def run(self, args, config): def on_sigterm(loop): logger.info("GLib mainloop got SIGTERM. Exiting...") loop.quit() loop = GLib.MainLoop() GLib.unix_signal_add( GLib.PRIORITY_DEFAULT, signal.SIGTERM, on_sigterm, loop ) mixer_class = self.get_mixer_class(config, args.registry["mixer"]) backend_classes = args.registry["backend"] frontend_classes = args.registry["frontend"] core = None exit_status_code = 0 try: mixer = None if mixer_class is not None: mixer = self.start_mixer(config, mixer_class) if mixer: self.configure_mixer(config, mixer) audio = self.start_audio(config, mixer) backends = self.start_backends(config, backend_classes, audio) core = self.start_core(config, mixer, backends, audio) self.start_frontends(config, frontend_classes, core) logger.info("Starting GLib mainloop") loop.run() except ( exceptions.BackendError, exceptions.FrontendError, exceptions.MixerError, ): logger.info("Initialization error. Exiting...") exit_status_code = 1 except KeyboardInterrupt: logger.info("Interrupted. Exiting...") except Exception: logger.exception("Uncaught exception") finally: loop.quit() self.stop_frontends(frontend_classes) self.stop_core(core) self.stop_backends(backend_classes) self.stop_audio() if mixer_class is not None: self.stop_mixer(mixer_class) process.stop_remaining_actors() return exit_status_code
def get_mixer_class(self, config, mixer_classes): logger.debug( "Available Mopidy mixers: %s", ", ".join(m.__name__ for m in mixer_classes) or "none", ) if config["audio"]["mixer"] == "none": logger.debug("Mixer disabled") return None selected_mixers = [ m for m in mixer_classes if m.name == config["audio"]["mixer"] ] if len(selected_mixers) != 1: logger.error( 'Did not find unique mixer "%s". Alternatives are: %s', config["audio"]["mixer"], ", ".join(m.name for m in mixer_classes) + ", none" or "none", ) process.exit_process() return selected_mixers[0] def start_mixer(self, config, mixer_class): logger.info("Starting Mopidy mixer: %s", mixer_class.__name__) with _actor_error_handling(mixer_class.__name__): mixer = mixer_class.start(config=config).proxy() try: mixer.ping().get() return mixer except pykka.ActorDeadError as exc: logger.error("Actor died: %s", exc) return None def configure_mixer(self, config, mixer): volume = config["audio"]["mixer_volume"] if volume is not None: mixer.set_volume(volume) logger.info("Mixer volume set to %d", volume) else: logger.debug("Mixer volume left unchanged") def start_audio(self, config, mixer): logger.info("Starting Mopidy audio") return Audio.start(config=config, mixer=mixer).proxy() def start_backends(self, config, backend_classes, audio): logger.info( "Starting Mopidy backends: %s", ", ".join(b.__name__ for b in backend_classes) or "none", ) backends = [] for backend_class in backend_classes: with _actor_error_handling(backend_class.__name__): with timer.time_logger(backend_class.__name__): backend = backend_class.start( config=config, audio=audio ).proxy() backends.append(backend) # Block until all on_starts have finished, letting them run in parallel for backend in backends[:]: try: backend.ping().get() except pykka.ActorDeadError as exc: backends.remove(backend) logger.error("Actor died: %s", exc) return backends def start_core(self, config, mixer, backends, audio): logger.info("Starting Mopidy core") core = Core.start( config=config, mixer=mixer, backends=backends, audio=audio ).proxy() call = ProxyCall(attr_path=["_setup"], args=[], kwargs={}) core.actor_ref.ask(call, block=True) return core def start_frontends(self, config, frontend_classes, core): logger.info( "Starting Mopidy frontends: %s", ", ".join(f.__name__ for f in frontend_classes) or "none", ) for frontend_class in frontend_classes: with _actor_error_handling(frontend_class.__name__): with timer.time_logger(frontend_class.__name__): frontend_class.start(config=config, core=core) def stop_frontends(self, frontend_classes): logger.info("Stopping Mopidy frontends") for frontend_class in frontend_classes: process.stop_actors_by_class(frontend_class) def stop_core(self, core): logger.info("Stopping Mopidy core") if core: call = ProxyCall(attr_path=["_teardown"], args=[], kwargs={}) core.actor_ref.ask(call, block=True) process.stop_actors_by_class(Core) def stop_backends(self, backend_classes): logger.info("Stopping Mopidy backends") for backend_class in backend_classes: process.stop_actors_by_class(backend_class) def stop_audio(self): logger.info("Stopping Mopidy audio") process.stop_actors_by_class(Audio) def stop_mixer(self, mixer_class): logger.info("Stopping Mopidy mixer") process.stop_actors_by_class(mixer_class)
[docs] class ConfigCommand(Command): help = "Show currently active configuration." def __init__(self): super().__init__() self.set(base_verbosity_level=-1)
[docs] def run(self, config, errors, schemas): data = config_lib.format(config, schemas, errors) # Throw away all bytes that are not valid UTF-8 before printing data = data.encode(errors="surrogateescape").decode(errors="replace") print(data) return 0
[docs] class DepsCommand(Command): help = "Show dependencies and debug information." def __init__(self): super().__init__() self.set(base_verbosity_level=-1)
[docs] def run(self): print(deps.format_dependency_list()) return 0