Wobuzz/wobuzz/mpris/dbus_properties.py

68 lines
1.9 KiB
Python
Raw Normal View History

#!/usr/bin/python3
from jeepney import Header, MessageType, Endianness, MessageFlag, new_method_return
from jeepney.bus_messages import message_bus
from jeepney.io.blocking import open_dbus_connection
from jeepney.wrappers import new_header
from .utils import *
class DBusProperties(DBusInterface):
def get_all(self):
body = ({},)
return body
def properties_changed(self, interface: str):
body = None
if interface == MPRIS_ROOT_INTERFACE:
body = (MPRIS_ROOT_INTERFACE,) + self.server.root_interface.get_all()
elif interface == MPRIS_PLAYER_INTERFACE:
body = (MPRIS_PLAYER_INTERFACE,) + self.server.player_interface.get_all()
signature = "" if body is None else "sa{sv}"
msg = new_signal(
self.server.bus_address.with_interface("org.freedesktop.DBus"),
"PropertiesChanged",
signature,
body
)
self.server.bus.send(msg)
def Get(self, msg: Message):
interface_name = msg.body[0]
return_msg = None
if interface_name == PROPERTIES_INTERFACE:
return self.get(msg)
elif interface_name == MPRIS_ROOT_INTERFACE:
return self.server.root_interface.get(msg)
elif interface_name == MPRIS_PLAYER_INTERFACE:
return self.server.player_interface.get(msg)
else:
return new_error(msg, *DBusErrors.invalidArgs(interface=interface_name))
def GetAll(self, msg: Message):
interface = msg.body[0]
if interface == PROPERTIES_INTERFACE:
body = self.get_all()
elif interface == MPRIS_ROOT_INTERFACE:
body = self.server.root_interface.get_all()
elif interface == MPRIS_PLAYER_INTERFACE:
body = self.server.player_interface.get_all()
else:
return new_error(msg, *DBusErrors.invalidArgs(interface=interface))
return new_method_return(msg, "a{sv}", body)