import logging
import random
from mopidy import exceptions
from mopidy.core import listener
from mopidy.internal import deprecation, validation
from mopidy.internal.models import TracklistState
from mopidy.models import TlTrack, Track
logger = logging.getLogger(__name__)
[docs]class TracklistController:
def __init__(self, core):
self.core = core
self._next_tlid = 1
self._tl_tracks = []
self._version = 0
self._consume = False
self._random = False
self._shuffled = []
self._repeat = False
self._single = False
[docs] def get_tl_tracks(self):
"""Get tracklist as list of :class:`mopidy.models.TlTrack`."""
return self._tl_tracks[:]
[docs] def get_tracks(self):
"""Get tracklist as list of :class:`mopidy.models.Track`."""
return [tl_track.track for tl_track in self._tl_tracks]
[docs] def get_length(self):
"""Get length of the tracklist."""
return len(self._tl_tracks)
[docs] def get_version(self):
"""
Get the tracklist version.
Integer which is increased every time the tracklist is changed. Is not
reset before Mopidy is restarted.
"""
return self._version
def _increase_version(self):
self._version += 1
self.core.playback._on_tracklist_change()
self._trigger_tracklist_changed()
[docs] def get_consume(self):
"""Get consume mode.
:class:`True`
Tracks are removed from the tracklist when they have been played.
:class:`False`
Tracks are not removed from the tracklist.
"""
return self._consume
[docs] def set_consume(self, value):
"""Set consume mode.
:class:`True`
Tracks are removed from the tracklist when they have been played.
:class:`False`
Tracks are not removed from the tracklist.
"""
validation.check_boolean(value)
if self.get_consume() != value:
self._trigger_options_changed()
self._consume = value
[docs] def get_random(self):
"""Get random mode.
:class:`True`
Tracks are selected at random from the tracklist.
:class:`False`
Tracks are played in the order of the tracklist.
"""
return self._random
[docs] def set_random(self, value):
"""Set random mode.
:class:`True`
Tracks are selected at random from the tracklist.
:class:`False`
Tracks are played in the order of the tracklist.
"""
validation.check_boolean(value)
if self.get_random() != value:
self._trigger_options_changed()
if value:
self._shuffled = self.get_tl_tracks()
random.shuffle(self._shuffled)
self._random = value
[docs] def get_repeat(self):
"""
Get repeat mode.
:class:`True`
The tracklist is played repeatedly.
:class:`False`
The tracklist is played once.
"""
return self._repeat
[docs] def set_repeat(self, value):
"""
Set repeat mode.
To repeat a single track, set both ``repeat`` and ``single``.
:class:`True`
The tracklist is played repeatedly.
:class:`False`
The tracklist is played once.
"""
validation.check_boolean(value)
if self.get_repeat() != value:
self._trigger_options_changed()
self._repeat = value
[docs] def get_single(self):
"""
Get single mode.
:class:`True`
Playback is stopped after current song, unless in ``repeat`` mode.
:class:`False`
Playback continues after current song.
"""
return self._single
[docs] def set_single(self, value):
"""
Set single mode.
:class:`True`
Playback is stopped after current song, unless in ``repeat`` mode.
:class:`False`
Playback continues after current song.
"""
validation.check_boolean(value)
if self.get_single() != value:
self._trigger_options_changed()
self._single = value
[docs] def index(self, tl_track=None, tlid=None):
"""
The position of the given track in the tracklist.
If neither *tl_track* or *tlid* is given we return the index of
the currently playing track.
:param tl_track: the track to find the index of
:type tl_track: :class:`mopidy.models.TlTrack` or :class:`None`
:param tlid: TLID of the track to find the index of
:type tlid: :class:`int` or :class:`None`
:rtype: :class:`int` or :class:`None`
.. versionadded:: 1.1
The *tlid* parameter
"""
tl_track is None or validation.check_instance(tl_track, TlTrack)
tlid is None or validation.check_integer(tlid, min=1)
if tl_track is None and tlid is None:
tl_track = self.core.playback.get_current_tl_track()
if tl_track is not None:
try:
return self._tl_tracks.index(tl_track)
except ValueError:
pass
elif tlid is not None:
for i, tl_track in enumerate(self._tl_tracks):
if tl_track.tlid == tlid:
return i
return None
[docs] def get_eot_tlid(self):
"""
The TLID of the track that will be played after the current track.
Not necessarily the same TLID as returned by :meth:`get_next_tlid`.
:rtype: :class:`int` or :class:`None`
.. versionadded:: 1.1
"""
current_tl_track = self.core.playback.get_current_tl_track()
with deprecation.ignore("core.tracklist.eot_track"):
eot_tl_track = self.eot_track(current_tl_track)
return getattr(eot_tl_track, "tlid", None)
[docs] def eot_track(self, tl_track):
"""
The track that will be played after the given track.
Not necessarily the same track as :meth:`next_track`.
.. deprecated:: 3.0
Use :meth:`get_eot_tlid` instead.
:param tl_track: the reference track
:type tl_track: :class:`mopidy.models.TlTrack` or :class:`None`
:rtype: :class:`mopidy.models.TlTrack` or :class:`None`
"""
deprecation.warn("core.tracklist.eot_track")
tl_track is None or validation.check_instance(tl_track, TlTrack)
if self.get_single() and self.get_repeat():
return tl_track
elif self.get_single():
return None
# Current difference between next and EOT handling is that EOT needs to
# handle "single", with that out of the way the rest of the logic is
# shared.
return self.next_track(tl_track)
[docs] def get_next_tlid(self):
"""
The tlid of the track that will be played if calling
:meth:`mopidy.core.PlaybackController.next()`.
For normal playback this is the next track in the tracklist. If repeat
is enabled the next track can loop around the tracklist. When random is
enabled this should be a random track, all tracks should be played once
before the tracklist repeats.
:rtype: :class:`int` or :class:`None`
.. versionadded:: 1.1
"""
current_tl_track = self.core.playback.get_current_tl_track()
with deprecation.ignore("core.tracklist.next_track"):
next_tl_track = self.next_track(current_tl_track)
return getattr(next_tl_track, "tlid", None)
[docs] def next_track(self, tl_track):
"""
The track that will be played if calling
:meth:`mopidy.core.PlaybackController.next()`.
For normal playback this is the next track in the tracklist. If repeat
is enabled the next track can loop around the tracklist. When random is
enabled this should be a random track, all tracks should be played once
before the tracklist repeats.
.. deprecated:: 3.0
Use :meth:`get_next_tlid` instead.
:param tl_track: the reference track
:type tl_track: :class:`mopidy.models.TlTrack` or :class:`None`
:rtype: :class:`mopidy.models.TlTrack` or :class:`None`
"""
deprecation.warn("core.tracklist.next_track")
tl_track is None or validation.check_instance(tl_track, TlTrack)
if not self._tl_tracks:
return None
if self.get_random() and not self._shuffled:
if self.get_repeat() or not tl_track:
logger.debug("Shuffling tracks")
self._shuffled = self._tl_tracks[:]
random.shuffle(self._shuffled)
if self.get_random():
if self._shuffled:
return self._shuffled[0]
return None
next_index = self.index(tl_track)
if next_index is None:
next_index = 0
else:
next_index += 1
if self.get_repeat():
if self.get_consume() and len(self._tl_tracks) == 1:
return None
else:
next_index %= len(self._tl_tracks)
elif next_index >= len(self._tl_tracks):
return None
return self._tl_tracks[next_index]
[docs] def get_previous_tlid(self):
"""
Returns the TLID of the track that will be played if calling
:meth:`mopidy.core.PlaybackController.previous()`.
For normal playback this is the previous track in the tracklist. If
random and/or consume is enabled it should return the current track
instead.
:rtype: :class:`int` or :class:`None`
.. versionadded:: 1.1
"""
current_tl_track = self.core.playback.get_current_tl_track()
with deprecation.ignore("core.tracklist.previous_track"):
previous_tl_track = self.previous_track(current_tl_track)
return getattr(previous_tl_track, "tlid", None)
[docs] def previous_track(self, tl_track):
"""
Returns the track that will be played if calling
:meth:`mopidy.core.PlaybackController.previous()`.
For normal playback this is the previous track in the tracklist. If
random and/or consume is enabled it should return the current track
instead.
.. deprecated:: 3.0
Use :meth:`get_previous_tlid` instead.
:param tl_track: the reference track
:type tl_track: :class:`mopidy.models.TlTrack` or :class:`None`
:rtype: :class:`mopidy.models.TlTrack` or :class:`None`
"""
deprecation.warn("core.tracklist.previous_track")
tl_track is None or validation.check_instance(tl_track, TlTrack)
if self.get_repeat() or self.get_consume() or self.get_random():
return tl_track
position = self.index(tl_track)
if position in (None, 0):
return None
# Since we know we are not at zero we have to be somewhere in the range
# 1 - len(tracks) Thus 'position - 1' will always be within the list.
return self._tl_tracks[position - 1]
[docs] def add(self, tracks=None, at_position=None, uris=None):
"""
Add tracks to the tracklist.
If ``uris`` is given instead of ``tracks``, the URIs are
looked up in the library and the resulting tracks are added to the
tracklist.
If ``at_position`` is given, the tracks are inserted at the given
position in the tracklist. If ``at_position`` is not given, the tracks
are appended to the end of the tracklist.
Triggers the :meth:`mopidy.core.CoreListener.tracklist_changed` event.
:param tracks: tracks to add
:type tracks: list of :class:`mopidy.models.Track` or :class:`None`
:param at_position: position in tracklist to add tracks
:type at_position: int or :class:`None`
:param uris: list of URIs for tracks to add
:type uris: list of string or :class:`None`
:rtype: list of :class:`mopidy.models.TlTrack`
.. versionadded:: 1.0
The ``uris`` argument.
.. deprecated:: 1.0
The ``tracks`` argument. Use ``uris``.
"""
if sum(o is not None for o in [tracks, uris]) != 1:
raise ValueError('Exactly one of "tracks" or "uris" must be set')
tracks is None or validation.check_instances(tracks, Track)
uris is None or validation.check_uris(uris)
validation.check_integer(at_position or 0)
if tracks:
deprecation.warn("core.tracklist.add:tracks_arg")
if tracks is None:
tracks = []
track_map = self.core.library.lookup(uris=uris)
for uri in uris:
tracks.extend(track_map[uri])
tl_tracks = []
max_length = self.core._config["core"]["max_tracklist_length"]
for track in tracks:
if self.get_length() >= max_length:
raise exceptions.TracklistFull(
f"Tracklist may contain at most {max_length:d} tracks."
)
tl_track = TlTrack(self._next_tlid, track)
self._next_tlid += 1
if at_position is not None:
self._tl_tracks.insert(at_position, tl_track)
at_position += 1
else:
self._tl_tracks.append(tl_track)
tl_tracks.append(tl_track)
if tl_tracks:
self._increase_version()
return tl_tracks
[docs] def clear(self):
"""
Clear the tracklist.
Triggers the :meth:`mopidy.core.CoreListener.tracklist_changed` event.
"""
self._tl_tracks = []
self._increase_version()
[docs] def filter(self, criteria):
"""
Filter the tracklist by the given criteria.
Each rule in the criteria consists of a model field and a list of
values to compare it against. If the model field matches any of the
values, it may be returned.
Only tracks that match all the given criteria are returned.
Examples::
# Returns tracks with TLIDs 1, 2, 3, or 4 (tracklist ID)
filter({'tlid': [1, 2, 3, 4]})
# Returns track with URIs 'xyz' or 'abc'
filter({'uri': ['xyz', 'abc']})
# Returns track with a matching TLIDs (1, 3 or 6) and a
# matching URI ('xyz' or 'abc')
filter({'tlid': [1, 3, 6], 'uri': ['xyz', 'abc']})
:param criteria: one or more rules to match by
:type criteria: dict, of (string, list) pairs
:rtype: list of :class:`mopidy.models.TlTrack`
"""
tlids = criteria.pop("tlid", [])
validation.check_query(criteria, validation.TRACKLIST_FIELDS)
validation.check_instances(tlids, int)
matches = self._tl_tracks
for (key, values) in criteria.items():
matches = [ct for ct in matches if getattr(ct.track, key) in values]
if tlids:
matches = [ct for ct in matches if ct.tlid in tlids]
return matches
[docs] def move(self, start, end, to_position):
"""
Move the tracks in the slice ``[start:end]`` to ``to_position``.
Triggers the :meth:`mopidy.core.CoreListener.tracklist_changed` event.
:param start: position of first track to move
:type start: int
:param end: position after last track to move
:type end: int
:param to_position: new position for the tracks
:type to_position: int
"""
if start == end:
end += 1
tl_tracks = self._tl_tracks
# TODO: use validation helpers?
if start >= end:
raise AssertionError("start must be smaller than end")
if start < 0:
raise AssertionError("start must be at least zero")
if end > len(tl_tracks):
raise AssertionError("end can not be larger than tracklist length")
if to_position < 0:
raise AssertionError("to_position must be at least zero")
if to_position > len(tl_tracks):
raise AssertionError(
"to_position can not be larger than tracklist length"
)
new_tl_tracks = tl_tracks[:start] + tl_tracks[end:]
for tl_track in tl_tracks[start:end]:
new_tl_tracks.insert(to_position, tl_track)
to_position += 1
self._tl_tracks = new_tl_tracks
self._increase_version()
[docs] def remove(self, criteria):
"""
Remove the matching tracks from the tracklist.
Uses :meth:`filter()` to lookup the tracks to remove.
Triggers the :meth:`mopidy.core.CoreListener.tracklist_changed` event.
:param criteria: one or more rules to match by
:type criteria: dict, of (string, list) pairs
:rtype: list of :class:`mopidy.models.TlTrack` that were removed
"""
tl_tracks = self.filter(criteria)
for tl_track in tl_tracks:
position = self._tl_tracks.index(tl_track)
del self._tl_tracks[position]
self._increase_version()
return tl_tracks
[docs] def shuffle(self, start=None, end=None):
"""
Shuffles the entire tracklist. If ``start`` and ``end`` is given only
shuffles the slice ``[start:end]``.
Triggers the :meth:`mopidy.core.CoreListener.tracklist_changed` event.
:param start: position of first track to shuffle
:type start: int or :class:`None`
:param end: position after last track to shuffle
:type end: int or :class:`None`
"""
tl_tracks = self._tl_tracks
# TOOD: use validation helpers?
if start is not None and end is not None:
if start >= end:
raise AssertionError("start must be smaller than end")
if start is not None:
if start < 0:
raise AssertionError("start must be at least zero")
if end is not None:
if end > len(tl_tracks):
raise AssertionError(
"end can not be larger than " + "tracklist length"
)
before = tl_tracks[: start or 0]
shuffled = tl_tracks[start:end]
after = tl_tracks[end or len(tl_tracks) :]
random.shuffle(shuffled)
self._tl_tracks = before + shuffled + after
self._increase_version()
[docs] def slice(self, start, end):
"""
Returns a slice of the tracklist, limited by the given start and end
positions.
:param start: position of first track to include in slice
:type start: int
:param end: position after last track to include in slice
:type end: int
:rtype: :class:`mopidy.models.TlTrack`
"""
# TODO: validate slice?
return self._tl_tracks[start:end]
def _mark_playing(self, tl_track):
"""Internal method for :class:`mopidy.core.PlaybackController`."""
if self.get_random() and tl_track in self._shuffled:
self._shuffled.remove(tl_track)
def _mark_unplayable(self, tl_track):
"""Internal method for :class:`mopidy.core.PlaybackController`."""
logger.warning("Track is not playable: %s", tl_track.track.uri)
if self.get_consume() and tl_track is not None:
self.remove({"tlid": [tl_track.tlid]})
if self.get_random() and tl_track in self._shuffled:
self._shuffled.remove(tl_track)
def _mark_played(self, tl_track):
"""Internal method for :class:`mopidy.core.PlaybackController`."""
if self.get_consume() and tl_track is not None:
self.remove({"tlid": [tl_track.tlid]})
return True
return False
def _trigger_tracklist_changed(self):
if self.get_random():
self._shuffled = self._tl_tracks[:]
random.shuffle(self._shuffled)
else:
self._shuffled = []
logger.debug("Triggering event: tracklist_changed()")
listener.CoreListener.send("tracklist_changed")
def _trigger_options_changed(self):
logger.debug("Triggering options changed event")
listener.CoreListener.send("options_changed")
def _save_state(self):
return TracklistState(
tl_tracks=self._tl_tracks,
next_tlid=self._next_tlid,
consume=self.get_consume(),
random=self.get_random(),
repeat=self.get_repeat(),
single=self.get_single(),
)
def _load_state(self, state, coverage):
if state:
if "mode" in coverage:
self.set_consume(state.consume)
self.set_random(state.random)
self.set_repeat(state.repeat)
self.set_single(state.single)
if "tracklist" in coverage:
self._next_tlid = max(state.next_tlid, self._next_tlid)
self._tl_tracks = list(state.tl_tracks)
self._increase_version()