Source code for mopidy.core.actor

# ruff: noqa: ARG002

from __future__ import annotations

import itertools
import logging
from collections.abc import Iterable
from pathlib import Path
from typing import TYPE_CHECKING

import pykka
from pykka.typing import ActorMemberMixin, proxy_method

import mopidy
from mopidy import audio, backend, mixer
from mopidy.audio import PlaybackState
from mopidy.core.history import HistoryController
from mopidy.core.library import LibraryController
from mopidy.core.listener import CoreListener
from mopidy.core.mixer import MixerController
from mopidy.core.playback import PlaybackController
from mopidy.core.playlists import PlaylistsController
from mopidy.core.tracklist import TracklistController
from mopidy.internal import path, storage, validation
from mopidy.internal.models import CoreState

if TYPE_CHECKING:
    from mopidy.config import Config
    from mopidy.core.history import HistoryControllerProxy
    from mopidy.core.library import LibraryControllerProxy
    from mopidy.core.mixer import MixerControllerProxy
    from mopidy.core.playback import PlaybackControllerProxy
    from mopidy.core.playlists import PlaylistsControllerProxy
    from mopidy.core.tracklist import TracklistControllerProxy
    from mopidy.types import Uri

logger = logging.getLogger(__name__)


[docs] class Core( pykka.ThreadingActor, audio.AudioListener, backend.BackendListener, mixer.MixerListener, ): library: LibraryController """An instance of :class:`~mopidy.core.LibraryController`""" history: HistoryController """An instance of :class:`~mopidy.core.HistoryController`""" mixer: MixerController """An instance of :class:`~mopidy.core.MixerController`""" playback: PlaybackController """An instance of :class:`~mopidy.core.PlaybackController`""" playlists: PlaylistsController """An instance of :class:`~mopidy.core.PlaylistsController`""" tracklist: TracklistController """An instance of :class:`~mopidy.core.TracklistController`""" def __init__( self, config: Config, *, mixer: mixer.MixerProxy | None = None, backends: Iterable[backend.BackendProxy], audio: audio.AudioProxy | None = None, ) -> None: super().__init__() self._config = config self.backends = Backends(backends or []) self.library = pykka.traversable( LibraryController(backends=self.backends, core=self) ) self.history = pykka.traversable(HistoryController()) self.mixer = pykka.traversable(MixerController(mixer=mixer)) self.playback = pykka.traversable( PlaybackController(audio=audio, backends=self.backends, core=self) ) self.playlists = pykka.traversable( PlaylistsController(backends=self.backends, core=self) ) self.tracklist = pykka.traversable(TracklistController(core=self)) self.audio = audio
[docs] def get_uri_schemes(self) -> list[backend.UriScheme]: """Get list of URI schemes we can handle.""" futures = [b.uri_schemes for b in self.backends] results = pykka.get_all(futures) uri_schemes = itertools.chain(*results) return sorted(uri_schemes)
[docs] def get_version(self) -> str: """Get version of the Mopidy core API.""" return mopidy.__version__
def reached_end_of_stream(self) -> None: self.playback._on_end_of_stream() def stream_changed(self, uri: Uri) -> None: self.playback._on_stream_changed(uri) def position_changed(self, position: int) -> None: self.playback._on_position_changed(position) def state_changed( self, old_state: PlaybackState, new_state: PlaybackState, target_state: PlaybackState | None, ) -> None: # XXX: This is a temporary fix for issue #232 while we wait for a more # permanent solution with the implementation of issue #234. When the # Spotify play token is lost, the Spotify backend pauses audio # playback, but mopidy.core doesn't know this, so we need to update # mopidy.core's state to match the actual state in mopidy.audio. If we # don't do this, clients will think that we're still playing. # We ignore cases when target state is set as this is buffering # updates (at least for now) and we need to get #234 fixed... if ( new_state == PlaybackState.PAUSED and not target_state and self.playback.get_state() != PlaybackState.PAUSED ): self.playback.set_state(new_state) self.playback._trigger_track_playback_paused() def playlists_loaded(self) -> None: # Forward event from backend to frontends CoreListener.send("playlists_loaded") def volume_changed(self, volume: int) -> None: # Forward event from mixer to frontends CoreListener.send("volume_changed", volume=volume) def mute_changed(self, mute: bool) -> None: # Forward event from mixer to frontends CoreListener.send("mute_changed", mute=mute) def tags_changed(self, tags: set[str]) -> None: if not self.audio or "title" not in tags: return current_tags = self.audio.get_current_tags().get() if not current_tags: return self.playback._stream_title = None # TODO: Do not emit stream title changes for plain tracks. We need a # better way to decide if something is a stream. if current_tags.get("title"): title = current_tags["title"][0] current_track = self.playback.get_current_track() if current_track is not None and current_track.name != title: self.playback._stream_title = title CoreListener.send("stream_title_changed", title=title) def _setup(self) -> None: """Do not call this function. It is for internal use at startup.""" try: coverage = [] if ( self._config and "restore_state" in self._config["core"] and self._config["core"]["restore_state"] ): coverage = [ "tracklist", "mode", "play-last", "mixer", "history", ] if len(coverage): self._load_state(coverage) except Exception as e: logger.warning("Restore state: Unexpected error: %s", str(e)) def _teardown(self) -> None: """Do not call this function. It is for internal use at shutdown.""" try: if ( self._config and "restore_state" in self._config["core"] and self._config["core"]["restore_state"] ): self._save_state() except Exception as e: logger.warning("Unexpected error while saving state: %s", str(e)) def _get_data_dir(self) -> Path: # get or create data director for core data_dir_path = path.expand_path(self._config["core"]["data_dir"]) / "core" path.get_or_create_dir(data_dir_path) return data_dir_path def _get_state_file(self) -> Path: return self._get_data_dir() / "state.json.gz" def _save_state(self) -> None: """Save current state to disk.""" state_file = self._get_state_file() logger.info("Saving state to %s", state_file) data = {} data["version"] = mopidy.__version__ data["state"] = CoreState( tracklist=self.tracklist._save_state(), history=self.history._save_state(), playback=self.playback._save_state(), mixer=self.mixer._save_state(), ) storage.dump(state_file, data) logger.debug("Saving state done") def _load_state(self, coverage: Iterable[str]) -> None: """Restore state from disk. Load state from disk and restore it. Parameter ``coverage`` limits the amount of data to restore. Possible values for ``coverage`` (list of one or more of): - 'tracklist' fill the tracklist - 'mode' set tracklist properties (consume, random, repeat, single) - 'play-last' restore play state ('tracklist' also required) - 'mixer' set mixer volume and mute state - 'history' restore history :param coverage: amount of data to restore :type coverage: list of strings """ state_file = self._get_state_file() logger.info("Loading state from %s", state_file) data = storage.load(state_file) try: # Try only once. If something goes wrong, the next start is clean. state_file.unlink() except OSError: logger.info("Failed to delete %s", state_file) if "state" in data: core_state = data["state"] validation.check_instance(core_state, CoreState) self.history._load_state(core_state.history, coverage) self.tracklist._load_state(core_state.tracklist, coverage) self.mixer._load_state(core_state.mixer, coverage) # playback after tracklist self.playback._load_state(core_state.playback, coverage) logger.debug("Loading state done")
class Backends(list): def __init__(self, backends: Iterable[backend.BackendProxy]) -> None: super().__init__(backends) self.with_library: dict[backend.UriScheme, backend.BackendProxy] = {} self.with_library_browse: dict[backend.UriScheme, backend.BackendProxy] = {} self.with_playback: dict[backend.UriScheme, backend.BackendProxy] = {} self.with_playlists: dict[backend.UriScheme, backend.BackendProxy] = {} backends_by_scheme: dict[backend.UriScheme, backend.BackendProxy] = {} def name(backend_proxy: backend.BackendProxy) -> str: return backend_proxy.actor_ref.actor_class.__name__ for b in backends: try: has_library = b.has_library().get() has_library_browse = b.has_library_browse().get() has_playback = b.has_playback().get() has_playlists = b.has_playlists().get() except Exception: self.remove(b) logger.exception("Fetching backend info for %s failed", name(b)) continue for scheme in b.uri_schemes.get(): if scheme in backends_by_scheme: raise AssertionError( f"Cannot add URI scheme {scheme!r} for {name(b)}, " f"it is already handled by {name(backends_by_scheme[scheme])}" ) backends_by_scheme[scheme] = b if has_library: self.with_library[scheme] = b if has_library_browse: self.with_library_browse[scheme] = b if has_playback: self.with_playback[scheme] = b if has_playlists: self.with_playlists[scheme] = b class CoreProxy(ActorMemberMixin, pykka.ActorProxy[Core]): library: LibraryControllerProxy history: HistoryControllerProxy mixer: MixerControllerProxy playback: PlaybackControllerProxy playlists: PlaylistsControllerProxy tracklist: TracklistControllerProxy get_uri_schemes = proxy_method(Core.get_uri_schemes) get_version = proxy_method(Core.get_version) reached_end_of_stream = proxy_method(Core.reached_end_of_stream) stream_changed = proxy_method(Core.stream_changed) position_changed = proxy_method(Core.position_changed) state_changed = proxy_method(Core.state_changed) playlists_loaded = proxy_method(Core.playlists_loaded) volume_changed = proxy_method(Core.volume_changed) mute_changed = proxy_method(Core.mute_changed) tags_changed = proxy_method(Core.tags_changed)