#!/usr/bin/python3 from threading import Thread from jeepney import DBusAddress from jeepney.bus_messages import message_bus from jeepney.io.blocking import DBusConnection, open_dbus_connection from .utils import * from .dbus_properties import DBusProperties from .dbus_introspectable import DBUSIntrospectable from .mpris_root import MPRISRoot from .mpris_player import MPRISPlayer class MPRISServer: def __init__(self, app): self.app = app self.properties_interface = DBusProperties(self, PROPERTIES_INTERFACE) self.introspection_interface = DBUSIntrospectable(self, INTROSPECTION_INTERFACE) self.root_interface = MPRISRoot(self, MPRIS_ROOT_INTERFACE) self.player_interface = MPRISPlayer(self, MPRIS_PLAYER_INTERFACE) self.bus_address = DBusAddress(OBJECT_PATH, SERVICE_NAME, PROPERTIES_INTERFACE) self.bus: DBusConnection = None def start(self): self.bus = open_dbus_connection() reply = self.bus.send_and_get_reply(message_bus.RequestName(SERVICE_NAME)) if reply.body[0] == 1: print("MPRIS Server connected to DBus: ", SERVICE_NAME) Thread(target=self.run_event_loop, daemon=True).start() def run_event_loop(self): while True: msg = self.bus.receive() self.app.gui.window.mpris_signal.emit(msg) # queue message in the PyQt event loop def handle_event(self, event): # called by app.gui.window.mpris_signal self.handle_message(event) def handle_message(self, msg: Message): object_path = msg.header.fields[HeaderFields.path] if not object_path == OBJECT_PATH: # only accept messages for "/org/mpris/MediaPlayer2" self.bus.send_message(new_error(msg, *DBusErrors.unknownObject(object_path))) return interface = msg.header.fields[HeaderFields.interface] # let the corresponding interface handle the message if interface == PROPERTIES_INTERFACE: self.properties_interface.handle_message(msg) elif interface == INTROSPECTION_INTERFACE: self.introspection_interface.handle_message(msg) elif interface == MPRIS_ROOT_INTERFACE: self.root_interface.handle_message(msg) elif interface == MPRIS_PLAYER_INTERFACE: self.player_interface.handle_message(msg) def on_playstate_update(self): player = self.app.player current_playlist = player.current_playlist if current_playlist is not None and current_playlist.current_track is not None: current_track = current_playlist.current_track art_path = self.app.utils.tmp_path + "/cover_cache/" + current_track.metadata.path.split("/")[-1][:-4] # metadata milli to microseconds --↓ self.player_interface.metadata["mpris:length"] = ("x", current_track.duration * 1000) self.player_interface.metadata["mpris:artUrl"] = ("s", "file://" + art_path) self.player_interface.metadata["xesam:title"] = ("s", current_track.metadata.title) self.properties_interface.properties_changed(MPRIS_PLAYER_INTERFACE, "Metadata") if player.playing: if player.paused: playback_status = "Paused" else: playback_status = "Playing" else: playback_status = "Stopped" self.player_interface.playback_status = playback_status self.properties_interface.properties_changed(MPRIS_PLAYER_INTERFACE, "PlaybackStatus")