108 lines
3.7 KiB
Python
108 lines
3.7 KiB
Python
#!/usr/bin/python3
|
|
|
|
from jeepney import Message, MessageType, HeaderFields, new_error, new_method_return
|
|
|
|
SERVICE_NAME = "org.mpris.MediaPlayer2.wobuzz"
|
|
OBJECT_PATH = "/org/mpris/MediaPlayer2"
|
|
PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties"
|
|
INTROSPECTION_INTERFACE = "org.freedesktop.DBus.Introspectable"
|
|
MPRIS_ROOT_INTERFACE = "org.mpris.MediaPlayer2"
|
|
MPRIS_PLAYER_INTERFACE = "org.mpris.MediaPlayer2.Player"
|
|
|
|
|
|
class DBusErrors:
|
|
@classmethod
|
|
def unknownMethod(cls, method: str) -> tuple[str, str, tuple[str]]:
|
|
return "org.freedesktop.DBus.Error.UnknownMethod", "s", (f"No such method '{method}'.",)
|
|
|
|
@classmethod
|
|
def unknownProperty(cls, prop: str) -> tuple[str, str, tuple[str]]:
|
|
return "org.freedesktop.DBus.Error.InvalidArgs", "s", (f"No such property '{prop}'",)
|
|
|
|
@classmethod
|
|
def invalidArgs(cls, prop: str=None, interface: str=None):
|
|
if prop is None and interface is None:
|
|
return "org.freedesktop.DBus.Error.InvalidArgs"
|
|
|
|
if interface is None:
|
|
return "org.freedesktop.DBus.Error.InvalidArgs", "s", (f"No such property '{prop}'.",)
|
|
|
|
if prop is None:
|
|
return "org.freedesktop.DBus.Error.InvalidArgs", "s", (f"No such interface '{interface}'.",)
|
|
|
|
return (
|
|
"org.freedesktop.DBus.Error.InvalidArgs",
|
|
"s",
|
|
(f"No such property '{prop}' on interface '{interface}'.",)
|
|
)
|
|
|
|
@classmethod
|
|
def unknownObject(cls, path: str):
|
|
return "org.freedesktop.DBus.Error.UnknownObject", "s", (f"No such object on path '{path}'.",)
|
|
|
|
|
|
class DBusInterface:
|
|
def __init__(self, server, interface: str):
|
|
self.server = server
|
|
self.interface = interface
|
|
|
|
def handle_message(self, msg: Message):
|
|
return_msg = None
|
|
post_action = None # function that gets called after return_msg was set
|
|
|
|
match msg.header.message_type:
|
|
case MessageType.method_call:
|
|
method_name: str = msg.header.fields[HeaderFields.member]
|
|
|
|
if hasattr(self, method_name) and method_name[0].isupper():
|
|
method = getattr(self, msg.header.fields[HeaderFields.member])
|
|
|
|
if callable(method):
|
|
method_data = method(msg)
|
|
|
|
if isinstance(method_data, tuple):
|
|
post_action, return_msg = method_data
|
|
|
|
else:
|
|
if callable(method_data):
|
|
post_action = method_data
|
|
|
|
else:
|
|
return_msg = method_data
|
|
|
|
else:
|
|
return_msg = new_error(msg, *DBusErrors.unknownMethod(method_name))
|
|
|
|
else:
|
|
return_msg = new_error(msg, *DBusErrors.unknownMethod(method_name))
|
|
|
|
if return_msg is None:
|
|
return_msg = new_method_return(msg)
|
|
|
|
self.server.bus.send_message(return_msg)
|
|
|
|
if not post_action is None:
|
|
post_action()
|
|
|
|
def get(self, msg: Message):
|
|
prop_name: str = msg.body[1]
|
|
|
|
if prop_name[0].isupper() and hasattr(self, prop_name):
|
|
prop = getattr(self, prop_name)
|
|
|
|
if callable(prop):
|
|
prop_value = prop(msg)
|
|
signature = prop_value[0]
|
|
value = prop_value[1]
|
|
|
|
return_msg = new_method_return(msg, "v", (prop_value,))
|
|
|
|
return return_msg
|
|
|
|
return new_error(msg, *DBusErrors.unknownProperty(prop_name))
|
|
|
|
else:
|
|
return new_error(msg, *DBusErrors.unknownProperty(prop_name))
|
|
|
|
def get_all(self) -> tuple[dict[str: tuple[str: any]]]:
|
|
raise NotImplementedError
|