Wobuzz/wobuzz/player/mpris.py

102 lines
2.9 KiB
Python

from sdbus import (
dbus_method_async,
dbus_property_async,
DbusInterfaceCommonAsync,
request_default_bus_name_async
)
import asyncio
from wobuzz.player.track import TrackMetadata
SERVICE_NAME = "org.mpris.MediaPlayer2.wobuzz"
OBJECT_PATH = "/org/mpris/MediaPlayer2"
MPRIS_ROOT_INTERFACE = "org.mpris.MediaPlayer2"
MPRIS_PLAYER_INTERFACE = "org.mpris.MediaPlayer2.Player"
class MPRISRoot(DbusInterfaceCommonAsync, interface_name=MPRIS_ROOT_INTERFACE):
@dbus_method_async()
async def Raise(self):
print("Raise, maybe?")
@dbus_property_async("s")
def Identity(self):
return "Wobuzz"
class MPRISPlayer(DbusInterfaceCommonAsync, interface_name=MPRIS_PLAYER_INTERFACE):
def __init__(self):
super().__init__()
self._metadata = {}
@dbus_property_async("b")
def CanSeek(self):
return False
@dbus_method_async()
async def PlayPause(self):
self.app.gui.track_control.toggle_playing_signal.emit()
@dbus_method_async()
async def Next(self):
self.app.gui.track_control.next_signal.emit()
@dbus_method_async()
async def Previous(self):
self.app.gui.track_control.previous_signal.emit()
@dbus_method_async()
async def Stop(self):
self.app.gui.track_control.stop_signal.emit()
@dbus_property_async("a{sv}")
def Metadata(self):
return self._metadata
@Metadata.setter # noqa
def Metadata_setter(self, metadata: dict) -> None:
self._metadata = metadata
async def set_metadata(self, metadata: TrackMetadata):
await self.Metadata.set_async(self.to_xesam(metadata))
def to_xesam(self, metadata: "TrackMetadata") -> dict:
# cache name by filename without extension
art_path = self.app.utils.tmp_path + "/cover_cache/" + metadata.path.split("/")[-1][:-4]
xesam_metadata = {
"mpris:trackid": ("s", "kjuztuktg"), # nonsense, no functionality
"mpris:artUrl": ("s", "file://" + art_path),
"xesam:title": ("s", metadata.title),
"xesam:artist": ("as", [metadata.artist])
}
return xesam_metadata
class MPRISServer(MPRISRoot, MPRISPlayer):
def __init__(self, app):
super().__init__()
self.app = app
self.loop = None
async def setup_bus(self) -> None:
await request_default_bus_name_async(SERVICE_NAME)
self.export_to_dbus(OBJECT_PATH)
def start(self):
self.loop = asyncio.new_event_loop()
self.loop.run_until_complete(self.setup_bus())
self.loop.run_forever()
def exec_async(self, function):
"""
This stupid function somehow allows us to execute an async function from the main thread.
If someone ha a better solution, please improve this. I have no idea of how asyncio works.
"""
loop = asyncio.new_event_loop()
loop.run_until_complete(function)