Commit 1ca753a1 authored by Roman Alifanov's avatar Roman Alifanov

converted to Gio

parent 8e767d91
#!/usr/bin/env python3
import dbus, json
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
def handle_signal(*args, **kwargs):
response = json.loads(args[0])
print(
f"signal handled:\n{response["message"]}\n{response["state"]}\n{response["progress"]}"
)
import json
import gi
gi.require_version('Gio', '2.0')
gi.require_version('GLib', '2.0')
from gi.repository import Gio, GLib
def handle_signal(connection, sender_name, object_path, interface_name, signal_name, parameters, *args, **kwargs):
try:
response = json.loads(parameters.unpack()[0])
print(f"signal handled:\n{response['message']}\n{response['state']}\n{response['progress']}")
except Exception as e:
print("Failed to handle signal:", e)
def handle_updates(*args, **kwargs):
response = json.loads(args[0])
def handle_updates(source_object, result, user_data=None):
try:
variant = bus.call_finish(result)
print(
"handled method response:\n",
response["data"]["info"],
)
response_str = variant.unpack()[0]
response = json.loads(response_str)
print("handled method response:")
print(response.get('data', {}).get('info', 'No info'))
if response["data"]["info"]["upgradedPackages"]:
print(f"{response["data"]["message"]}\n{response["data"]["info"]["upgradedPackages"]}")
if 'data' in response and 'upgradedPackages' in response['data'].get('info', {}):
upgraded = response['data']['info']['upgradedPackages']
print(f"\n{response.get('message', '')}\n{upgraded}")
except Exception as e:
print("Error during async method call:", e)
if __name__ == "__main__":
# Initialize the D-Bus main loop
DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
bus.add_signal_receiver(
handle_signal,
dbus_interface="org.altlinux.APM",
interface_keyword="interface",
member_keyword="member",
path_keyword="path",
signal_name="Notification"
)
loop = GLib.MainLoop()
print("try to get updates...")
bus.call_async(
reply_handler=handle_updates,
error_handler=print,
bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
bus.call(
bus_name="org.altlinux.APM",
dbus_interface="org.altlinux.APM.system",
object_path="/org/altlinux/APM",
method="CheckUpgrade",
signature="s",
args=["Ximper System Updater"],
interface_name="org.altlinux.APM.system",
method_name="CheckUpgrade",
parameters=GLib.Variant("(s)", ("Ximper System Updater",)),
flags=Gio.DBusCallFlags.NONE,
reply_type=GLib.VariantType("(s)"),
timeout_msec=-1,
callback=handle_updates,
)
Gio.DBusConnection.signal_subscribe(
bus,
sender="org.altlinux.APM",
interface_name="org.altlinux.APM",
member="Notification",
object_path="/org/altlinux/APM",
arg0=None,
flags=Gio.DBusSignalFlags.NONE,
callback=handle_signal,
)
print("try to get updates...")
print("Listening for signals from 'org.altlinux.APM'...")
loop = GLib.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
loop.quit()
print("\nExiting.")
loop.quit()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment