#!/usr/bin/python3 import os import threading from PyQt6.QtCore import Qt from PyQt6.QtWidgets import QAbstractItemView from reactivex.observable.case import case_ from .track import Track, TrackMetadata from ..wobuzzm3u import WobuzzM3U, WBZM3UData class Playlist: def __init__(self, app, title: str, load_from=None): self.app = app self.title = title # playlist title # if the playlist is imported and not already in the library, this variable will contain the playlist path or # track path from which the playlist will get imported # if None, playlist should be already in the library and will be loaded from a .wbz.m3u self.load_from = load_from # add to unique names so if the playlist is loaded from disk, # no other playlist can be created using the same name self.app.utils.unique_names.append(self.title) # sort order self.sorting: list[WBZM3UData.SortOrder] = [ WBZM3UData.SortOrder(WBZM3UData.SortOrder.track_title, True), WBZM3UData.SortOrder(WBZM3UData.SortOrder.track_artist, True), WBZM3UData.SortOrder(WBZM3UData.SortOrder.track_album, True), WBZM3UData.SortOrder(WBZM3UData.SortOrder.track_genre, True), WBZM3UData.SortOrder(WBZM3UData.SortOrder.custom_sorting, True) ] self.tracks: list[Track] = [] self.current_track_index = 0 self.current_track: Track | None = None self.views = {} # dict of id(LibraryWidget): PlaylistView self.loaded = False self.loading = False self.path = self.path_from_title(title) def clear(self): self.sorting: list[Qt.SortOrder] | None = None self.tracks = [] self.current_track_index = 0 self.current_track = None def load_from_paths(self, paths): num_tracks = len(paths) i = 0 process_title = f'Loading Playlist "{self.title}"' self.app.gui.on_background_job_start( process_title, f'Loading the tracks of "{self.title}".', num_tracks, lambda: i ) while i < num_tracks: path = paths[i] if os.path.isfile(path): self.append_track(Track(self.app, path, cache=i==0)) # first track is cached i += 1 self.loaded = True self.app.gui.on_background_job_stop(process_title) def load(self): loading_thread = threading.Thread(target=self.loading_thread) loading_thread.start() def loading_thread(self): if self.loaded or self.loading: return self.loading = True if self.load_from is None: # if the playlist is in the library self.load_from_wbz(self.path) elif isinstance(self.load_from, str): # if it's imported from a .m3u self.load_from_m3u(self.load_from) elif isinstance(self.load_from, list): # if it's created from tracks self.load_from_paths(self.load_from) self.loading = False for dock_id in self.views: # enable drag and drop on every view view = self.views[dock_id] view.setDragDropMode(QAbstractItemView.DragDropMode.InternalMove) def load_from_m3u(self, path): file = open(path, "r") m3u = file.read() file.close() lines = m3u.split("\n") # m3u entries are separated by newlines lines = lines[:-1] # remove last entry because it is just an empty string num_lines = len(lines) i = 0 process_title = f'Loading Playlist "{self.title}"' self.app.gui.on_background_job_start( process_title, f'Loading the tracks of "{self.title}".', num_lines, lambda: i ) while i < num_lines: line = lines[i] if line.startswith("#") or line.startswith("http"): # filter out comments, extended m3u and urls i += 1 continue self.append_track(Track(self.app, line, cache=i==0)) # first track is cached i += 1 # set current track to the first track if there is no currently playing track if self.current_track is None and self.has_tracks(): self.current_track = self.tracks[0] self.loaded = True self.app.gui.on_background_job_stop(process_title) def load_from_wbz(self, path): file = open(path, "r") m3u = file.read() file.close() lines = m3u.split("\n") # m3u entries are separated by newlines lines = lines[:-1] # remove last entry because it is just an empty string num_lines = len(lines) i = 0 process_title = f'Loading Playlist "{self.title}"' self.app.gui.on_background_job_start( process_title, f'Loading the tracks of "{self.title}".', num_lines, lambda: i ) wbzm3u = WobuzzM3U(self.path) track_metadata = TrackMetadata() # cached track metadata from WOBUZZM3U while i < num_lines: line = lines[i] line_data = wbzm3u.parse_line(line) if line_data is None: i += 1 continue if line_data.is_comment: # comments and EXTM3U/WOBUZZM3U if isinstance(line_data, WBZM3UData.SortOrder): # sort del self.sorting[0] # delete first sort so the length stays at 6 self.sorting.append(line_data) if isinstance(line_data, WBZM3UData.TrackMetadata.TrackTitle): track_metadata.title = line_data if isinstance(line_data, WBZM3UData.TrackMetadata.TrackArtist): track_metadata.artist = line_data if isinstance(line_data, WBZM3UData.TrackMetadata.TrackAlbum): track_metadata.album = line_data i += 1 continue elif isinstance(line_data, WBZM3UData.URL): # ignore urls i += 1 continue track_metadata.path = line track_metadata.add_missing() self.append_track(Track(self.app, line, cache=i == 0, metadata=track_metadata)) # first track is cached track_metadata = TrackMetadata() # metadata for next track i += 1 # set current track to the first track if there is no currently playing track if self.current_track is None and self.has_tracks(): self.current_track = self.tracks[0] list(self.views.values())[0].sort() # execute sort() on the first view self.loaded = True self.app.gui.on_background_job_stop(process_title) def sync(self, view, user_sort: bool=False): num_tracks = view.topLevelItemCount() i = 0 while i < num_tracks: track_item = view.topLevelItem(i) track = track_item.track track_item.index = i if user_sort: track_item.index_user_sort = i self.tracks[i] = track track.set_occurrences() i += 1 # make sure the next track is cached (could be moved by user) if self.app.player.current_playlist == self and self.has_tracks(): self.app.player.cache_next_track() def has_tracks(self): return len(self.tracks) > 0 def on_first_track(self): return self.current_track_index == 0 def on_last_track(self): # if the current track is the last return self.current_track_index == len(self.tracks) - 1 def next_track(self): self.current_track_index += 1 self.current_track = self.tracks[self.current_track_index] if not self.current_track.cached: # make sure the track is cached because else the player can't play it self.current_track.cache() return self.current_track.sound, self.current_track.duration def previous_track(self): if self.on_first_track(): return self.current_track, self.current_track.duration self.current_track_index -= 1 self.current_track = self.tracks[self.current_track_index] if not self.current_track.cached: # make sure the track is cached because else the player can't play it self.current_track.cache() return self.current_track.sound, self.current_track.duration def set_track(self, track_index): self.current_track_index = track_index self.current_track = self.tracks[self.current_track_index] if not self.current_track.cached: self.current_track.cache() return self.current_track.sound, self.current_track.duration def save(self): first_view = list(self.views.values())[0] first_view.sortItems(4, Qt.SortOrder.AscendingOrder) self.sync(first_view) wbzm3u = WobuzzM3U(self.path) wbz_data = "" for order in self.sorting: wbz_data += wbzm3u.assemble_line(order) for track in self.tracks: # cache track metadata wbz_data += wbzm3u.assemble_line(WBZM3UData.TrackMetadata.TrackTitle(track.metadata.title)) wbz_data += wbzm3u.assemble_line(WBZM3UData.TrackMetadata.TrackArtist(track.metadata.artist)) wbz_data += wbzm3u.assemble_line(WBZM3UData.TrackMetadata.TrackAlbum(track.metadata.album)) # wbz_data += wbzm3u.assemble_line(WBZM3UData.TrackMetadata.TrackGenre(track.metadata.genre)) wbz_data += wbzm3u.assemble_line(WBZM3UData.Path(track.path)) wbz = open(self.path, "w") wbz.write(wbz_data) wbz.close() def rename(self, title: str): if os.path.exists(self.path): os.remove(self.path) old_title = self.title self.title = self.app.utils.unique_name(title, ignore=old_title) self.path = self.path_from_title(self.title) # make sure the playlist is not referenced anymore as the temporary playlist if self == self.app.library.temporary_playlist: self.app.library.temporary_playlist = None # remove from unique names so a new playlist can have the old name and delete old playlist. if not old_title == self.title: # remove only when the playlist actually has a different name self.app.utils.unique_names.remove(old_title) def delete(self): if os.path.exists(self.path): os.remove(self.path) if self.app.player.current_playlist == self: # stop if this is the current playlist self.app.player.stop() self.app.player.current_playlist = None for view in self.views.values(): # close views (and PyQt automatically closes the corresponding tabs) view.deleteLater() for track in self.tracks: # remove items that corresponded to the track and this playlist track.delete_items(self) # make sure the playlist is not referenced as the temporary playlist if self is self.app.library.temporary_playlist: self.app.library.temporary_playlist = None self.app.utils.unique_names.remove(self.title) self.app.library.playlists.remove(self) def append_track(self, track): for dock_id in self.views: view = self.views[dock_id] view.append_track(track) self.tracks.append(track) def h_last_track(self): # get last track in history (only gets used in player.history) if len(self.tracks) > 1: return self.tracks[-2] def path_from_title(self, title): path = os.path.expanduser( f"{self.app.settings.library_path}/playlists/{title.replace(" ", "_")}.wbz.m3u" ) return path