from sdbus import ( dbus_method_async, dbus_property_async, DbusInterfaceCommonAsync, request_default_bus_name_async ) import asyncio from wobuzz.player.track import TrackMetadata SERVICE_NAME = "org.mpris.MediaPlayer2.wobuzz" OBJECT_PATH = "/org/mpris/MediaPlayer2" MPRIS_ROOT_INTERFACE = "org.mpris.MediaPlayer2" MPRIS_PLAYER_INTERFACE = "org.mpris.MediaPlayer2.Player" class MPRISRoot(DbusInterfaceCommonAsync, interface_name=MPRIS_ROOT_INTERFACE): @dbus_method_async() async def Raise(self): print("Raise, maybe?") @dbus_property_async("s") def Identity(self): return "Wobuzz" class MPRISPlayer(DbusInterfaceCommonAsync, interface_name=MPRIS_PLAYER_INTERFACE): def __init__(self): super().__init__() self._metadata = {} @dbus_property_async("b") def CanSeek(self): return False @dbus_method_async() async def PlayPause(self): self.app.gui.track_control.toggle_playing_signal.emit() @dbus_method_async() async def Next(self): self.app.gui.track_control.next_signal.emit() @dbus_method_async() async def Previous(self): self.app.gui.track_control.previous_signal.emit() @dbus_method_async() async def Stop(self): self.app.gui.track_control.stop_signal.emit() @dbus_property_async("a{sv}") def Metadata(self): return self._metadata @Metadata.setter # noqa def Metadata_setter(self, metadata: dict) -> None: self._metadata = metadata async def set_metadata(self, metadata: TrackMetadata): await self.Metadata.set_async(self.to_xesam(metadata)) def to_xesam(self, metadata: "TrackMetadata") -> dict: xesam_metadata = { "mpris:trackid": ("s", "kjuztuktg"), "xesam:title": ("s", metadata.title), "xesam:artist": ("as", [metadata.artist]) } return xesam_metadata class MPRISServer(MPRISRoot, MPRISPlayer): def __init__(self, app): super().__init__() self.app = app self.loop = None async def setup_bus(self) -> None: await request_default_bus_name_async(SERVICE_NAME) self.export_to_dbus(OBJECT_PATH) def start(self): self.loop = asyncio.new_event_loop() self.loop.run_until_complete(self.setup_bus()) self.loop.run_forever() def exec_async(self, function): """ This stupid function somehow allows us to execute an async function from the main thread. If someone ha a better solution, please improve this. I have no idea of how asyncio works. """ loop = asyncio.new_event_loop() loop.run_until_complete(function)