Some checks failed
Docker. / Ubuntu (push) Has been cancelled
User-agent updater. / User-agent (push) Failing after 15s
Lock Threads / lock (push) Failing after 10s
Waiting for answer. / waiting-for-answer (push) Failing after 22s
Needs user action. / needs-user-action (push) Failing after 8s
Can't reproduce. / cant-reproduce (push) Failing after 8s
Close stale issues and PRs / stale (push) Has been cancelled
621 lines
19 KiB
Python
621 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# SPDX-License-Identifier: LGPL-2.1-or-later
|
|
#
|
|
# This file is formatted with Python Black
|
|
|
|
from dbus.mainloop.glib import DBusGMainLoop
|
|
from gi.repository import GLib, Gio
|
|
from itertools import count
|
|
from typing import Any, Dict, Optional, NamedTuple, Callable, List
|
|
|
|
import os
|
|
import dbus
|
|
import dbus.proxies
|
|
import dbusmock
|
|
import logging
|
|
import subprocess
|
|
|
|
|
|
DBusGMainLoop(set_as_default=True)
|
|
|
|
# Anything that takes longer than 5s needs to fail
|
|
DBUS_TIMEOUT = int(os.environ.get("XDP_DBUS_TIMEOUT", "5000"))
|
|
|
|
_counter = count()
|
|
|
|
ASV = Dict[str, Any]
|
|
|
|
|
|
def init_logger(name: str) -> logging.Logger:
|
|
"""
|
|
Common logging setup for tests. Use as:
|
|
|
|
>>> import tests as xdp
|
|
>>> logger = xdp.init_logger(__name__)
|
|
>>> logger.debug("foo")
|
|
|
|
"""
|
|
logging.basicConfig(
|
|
format="%(levelname).1s|%(name)s: %(message)s", level=logging.DEBUG
|
|
)
|
|
logger = logging.getLogger(f"xdp.{name}")
|
|
logger.setLevel(logging.DEBUG)
|
|
return logger
|
|
|
|
|
|
logger = init_logger("utils")
|
|
|
|
|
|
def is_in_ci() -> bool:
|
|
return os.environ.get("XDP_TEST_IN_CI") is not None
|
|
|
|
|
|
def is_in_container() -> bool:
|
|
return is_in_ci() or (
|
|
"container" in os.environ
|
|
and (os.environ["container"] == "docker" or os.environ["container"] == "podman")
|
|
)
|
|
|
|
|
|
def run_long_tests() -> bool:
|
|
return os.environ.get("XDP_TEST_RUN_LONG") is not None
|
|
|
|
|
|
def check_program_success(cmd) -> bool:
|
|
proc = subprocess.Popen(
|
|
cmd, stdout=None, stderr=None, shell=True, universal_newlines=True
|
|
)
|
|
_ = proc.communicate()
|
|
return proc.returncode == 0
|
|
|
|
|
|
class FuseNotSupportedException(Exception):
|
|
pass
|
|
|
|
|
|
def ensure_fuse_supported() -> None:
|
|
if not check_program_success("fusermount3 --version"):
|
|
raise FuseNotSupportedException("no fusermount3")
|
|
|
|
if not check_program_success(
|
|
"capsh --print | grep -q 'Bounding set.*[^a-z]cap_sys_admin'"
|
|
):
|
|
raise FuseNotSupportedException(
|
|
"No cap_sys_admin in bounding set, can't use FUSE"
|
|
)
|
|
|
|
if not check_program_success("[ -w /dev/fuse ]"):
|
|
raise FuseNotSupportedException("no write access to /dev/fuse")
|
|
|
|
if not check_program_success("[ -e /etc/mtab ]"):
|
|
raise FuseNotSupportedException("no /etc/mtab")
|
|
|
|
|
|
def wait(ms: int):
|
|
"""
|
|
Waits for the specified amount of milliseconds.
|
|
"""
|
|
mainloop = GLib.MainLoop()
|
|
GLib.timeout_add(ms, mainloop.quit)
|
|
mainloop.run()
|
|
|
|
|
|
def wait_for(fn: Callable[[], bool]):
|
|
"""
|
|
Waits and dispatches to mainloop until the function fn returns true. This is
|
|
useful in combination with a lambda which captures a variable:
|
|
|
|
my_var = False
|
|
def callback():
|
|
my_var = True
|
|
do_something_later(callback)
|
|
xdp.wait_for(lambda: my_var)
|
|
"""
|
|
mainloop = GLib.MainLoop()
|
|
while not fn():
|
|
GLib.timeout_add(50, mainloop.quit)
|
|
mainloop.run()
|
|
|
|
|
|
def get_permission_store_iface(bus: dbus.Bus) -> dbus.Interface:
|
|
"""
|
|
Returns the dbus interface of the xdg-permission-store.
|
|
"""
|
|
obj = bus.get_object(
|
|
"org.freedesktop.impl.portal.PermissionStore",
|
|
"/org/freedesktop/impl/portal/PermissionStore",
|
|
)
|
|
return dbus.Interface(obj, "org.freedesktop.impl.portal.PermissionStore")
|
|
|
|
|
|
def get_document_portal_iface(bus: dbus.Bus) -> dbus.Interface:
|
|
"""
|
|
Returns the dbus interface of the xdg-document-portal.
|
|
"""
|
|
obj = bus.get_object(
|
|
"org.freedesktop.portal.Documents",
|
|
"/org/freedesktop/portal/documents",
|
|
)
|
|
return dbus.Interface(obj, "org.freedesktop.portal.Documents")
|
|
|
|
|
|
def get_mock_iface(bus: dbus.Bus, bus_name: Optional[str] = None) -> dbus.Interface:
|
|
"""
|
|
Returns the mock interface of the xdg-desktop-portal.
|
|
"""
|
|
if not bus_name:
|
|
bus_name = "org.freedesktop.impl.portal.Test"
|
|
|
|
obj = bus.get_object(bus_name, "/org/freedesktop/portal/desktop")
|
|
return dbus.Interface(obj, dbusmock.MOCK_IFACE)
|
|
|
|
|
|
def portal_interface_name(portal_name: str, domain: Optional[str] = None) -> str:
|
|
"""
|
|
Returns the fully qualified interface for a portal name.
|
|
"""
|
|
if domain:
|
|
return f"org.freedesktop.{domain}.portal.{portal_name}"
|
|
else:
|
|
return f"org.freedesktop.portal.{portal_name}"
|
|
|
|
|
|
def get_portal_iface(
|
|
bus: dbus.Bus, name: str, domain: Optional[str] = None
|
|
) -> dbus.Interface:
|
|
"""
|
|
Returns the dbus interface for a portal name.
|
|
"""
|
|
name = portal_interface_name(name, domain)
|
|
return get_iface(bus, name)
|
|
|
|
|
|
def get_iface(bus: dbus.Bus, name: str) -> dbus.Interface:
|
|
"""
|
|
Returns a named interface of the main portal object.
|
|
"""
|
|
try:
|
|
ifaces = bus._xdp_portal_ifaces
|
|
except AttributeError:
|
|
ifaces = bus._xdp_portal_ifaces = {}
|
|
|
|
try:
|
|
intf = ifaces[name]
|
|
except KeyError:
|
|
intf = dbus.Interface(get_xdp_dbus_object(bus), name)
|
|
assert intf
|
|
ifaces[name] = intf
|
|
return intf
|
|
|
|
|
|
def get_xdp_dbus_object(bus: dbus.Bus) -> dbus.proxies.ProxyObject:
|
|
"""
|
|
Returns the main portal object.
|
|
"""
|
|
try:
|
|
obj = getattr(bus, "_xdp_dbus_object")
|
|
except AttributeError:
|
|
obj = bus.get_object(
|
|
"org.freedesktop.portal.Desktop", "/org/freedesktop/portal/desktop"
|
|
)
|
|
assert obj
|
|
bus._xdp_dbus_object = obj
|
|
return obj
|
|
|
|
|
|
def check_version(bus: dbus.Bus, portal_name: str, expected_version: int):
|
|
"""
|
|
Checks that the portal_name portal version is equal to expected_version.
|
|
"""
|
|
properties_intf = dbus.Interface(
|
|
get_xdp_dbus_object(bus), "org.freedesktop.DBus.Properties"
|
|
)
|
|
portal_iface_name = portal_interface_name(portal_name)
|
|
try:
|
|
portal_version = properties_intf.Get(portal_iface_name, "version")
|
|
assert int(portal_version) == expected_version
|
|
except dbus.exceptions.DBusException as e:
|
|
logger.critical(e)
|
|
assert e is None, str(e)
|
|
|
|
|
|
class Response(NamedTuple):
|
|
"""
|
|
Response as returned by a completed :class:`Request`
|
|
"""
|
|
|
|
response: int
|
|
results: ASV
|
|
|
|
|
|
class ResponseTimeout(Exception):
|
|
"""
|
|
Exception raised by :meth:`Request.call` if the Request did not receive a
|
|
Response in time.
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
class Closable:
|
|
"""
|
|
Parent class for both Session and Request. Both of these have a Close()
|
|
method.
|
|
"""
|
|
|
|
def __init__(self, bus: dbus.Bus, objpath: str):
|
|
self.objpath = objpath
|
|
# GLib makes assertions in callbacks impossible, so we wrap all
|
|
# callbacks into a try: except and store the error on the request to
|
|
# be raised later when we're back in the main context
|
|
self.error: Optional[Exception] = None
|
|
|
|
self._mainloop: Optional[GLib.MainLoop] = None
|
|
self._impl_closed = False
|
|
self._bus = bus
|
|
|
|
self._closable = type(self).__name__
|
|
assert self._closable in ("Request", "Session")
|
|
proxy = bus.get_object("org.freedesktop.portal.Desktop", objpath)
|
|
self._closable_interface = dbus.Interface(
|
|
proxy, f"org.freedesktop.portal.{self._closable}"
|
|
)
|
|
|
|
@property
|
|
def bus(self) -> dbus.Bus:
|
|
return self._bus
|
|
|
|
@property
|
|
def closed(self) -> bool:
|
|
"""
|
|
True if the impl.portal was closed
|
|
"""
|
|
return self._impl_closed
|
|
|
|
def close(self) -> None:
|
|
signal_match = None
|
|
|
|
def cb_impl_closed_by_portal(handle) -> None:
|
|
if handle == self.objpath:
|
|
logger.debug(f"Impl{self._closable} {self.objpath} was closed")
|
|
signal_match.remove() # type: ignore
|
|
self._impl_closed = True
|
|
if self.closed and self._mainloop:
|
|
self._mainloop.quit()
|
|
|
|
# See :class:`ImplRequest`, this signal is a side-channel for the
|
|
# impl.portal template to notify us when the impl.Request was really
|
|
# closed by the portal.
|
|
signal_match = self._bus.add_signal_receiver(
|
|
cb_impl_closed_by_portal,
|
|
f"{self._closable}Closed",
|
|
dbus_interface="org.freedesktop.impl.portal.Mock",
|
|
)
|
|
|
|
logger.debug(f"Closing {self._closable} {self.objpath}")
|
|
self._closable_interface.Close()
|
|
|
|
def schedule_close(self, timeout_ms=300):
|
|
"""
|
|
Schedule an automatic Close() on the given timeout in milliseconds.
|
|
"""
|
|
assert 0 < timeout_ms < DBUS_TIMEOUT
|
|
GLib.timeout_add(timeout_ms, self.close)
|
|
|
|
|
|
class Request(Closable):
|
|
"""
|
|
Helper class for executing methods that use Requests. This calls takes
|
|
care of subscribing to the signals and invokes the method on the
|
|
interface with the expected behaviors. A typical invocation is:
|
|
|
|
>>> response = Request(connection, interface).call("Foo", bar="bar")
|
|
>>> assert response.response == 0
|
|
|
|
Requests can only be used once, to call a second method you must
|
|
instantiate a new Request object.
|
|
"""
|
|
|
|
def __init__(self, bus: dbus.Bus, interface: dbus.Interface):
|
|
def sanitize(name):
|
|
return name.lstrip(":").replace(".", "_")
|
|
|
|
sender_token = sanitize(bus.get_unique_name())
|
|
self._handle_token = f"request{next(_counter)}"
|
|
self.handle = f"/org/freedesktop/portal/desktop/request/{sender_token}/{self._handle_token}"
|
|
# The Closable
|
|
super().__init__(bus, self.handle)
|
|
|
|
self.interface = interface
|
|
self.response: Optional[Response] = None
|
|
self.used = False
|
|
# GLib makes assertions in callbacks impossible, so we wrap all
|
|
# callbacks into a try: except and store the error on the request to
|
|
# be raised later when we're back in the main context
|
|
self.error: Optional[Exception] = None
|
|
|
|
proxy = bus.get_object("org.freedesktop.portal.Desktop", self.handle)
|
|
self.mock_interface = dbus.Interface(proxy, dbusmock.MOCK_IFACE)
|
|
self._proxy = bus.get_object("org.freedesktop.portal.Desktop", self.handle)
|
|
|
|
def cb_response(response: int, results: ASV) -> None:
|
|
try:
|
|
logger.debug(f"Response received on {self.handle}")
|
|
assert self.response is None
|
|
self.response = Response(response, results)
|
|
if self._mainloop:
|
|
self._mainloop.quit()
|
|
except Exception as e:
|
|
self.error = e
|
|
|
|
self.request_interface = dbus.Interface(proxy, "org.freedesktop.portal.Request")
|
|
self.request_interface.connect_to_signal("Response", cb_response)
|
|
|
|
@property
|
|
def handle_token(self) -> dbus.String:
|
|
"""
|
|
Returns the dbus-ready handle_token, ready to be put into the options
|
|
"""
|
|
return dbus.String(self._handle_token, variant_level=1)
|
|
|
|
def call(self, methodname: str, **kwargs) -> Optional[Response]:
|
|
"""
|
|
Semi-synchronously call method ``methodname`` on the interface given
|
|
in the Request's constructor. The kwargs must be specified in the
|
|
order the DBus method takes them but the handle_token is automatically
|
|
filled in.
|
|
|
|
>>> response = Request(connection, interface).call("Foo", bar="bar")
|
|
>>> if response.response != 0:
|
|
... print("some error occured")
|
|
|
|
The DBus call itself is asynchronous (required for signals to work)
|
|
but this method does not return until the Response is received, the
|
|
Request is closed or an error occurs. If the Request is closed, the
|
|
Response is None.
|
|
|
|
If the "reply_handler" and "error_handler" keywords are present, those
|
|
callbacks are called just like they would be as dbus.service.ProxyObject.
|
|
"""
|
|
assert not self.used
|
|
self.used = True
|
|
|
|
# Make sure options exists and has the handle_token set
|
|
try:
|
|
options = kwargs["options"]
|
|
except KeyError:
|
|
options = dbus.Dictionary({}, signature="sv")
|
|
|
|
if "handle_token" not in options:
|
|
options["handle_token"] = self.handle_token
|
|
|
|
# Anything that takes longer than 5s needs to fail
|
|
self._mainloop = GLib.MainLoop()
|
|
GLib.timeout_add(DBUS_TIMEOUT, self._mainloop.quit)
|
|
|
|
method = getattr(self.interface, methodname)
|
|
assert method
|
|
|
|
reply_handler = kwargs.pop("reply_handler", None)
|
|
error_handler = kwargs.pop("error_handler", None)
|
|
|
|
# Handle the normal method reply which returns is the Request object
|
|
# path. We don't exit the mainloop here, we're waiting for either the
|
|
# Response signal on the Request itself or the Close() handling
|
|
def reply_cb(handle):
|
|
try:
|
|
logger.debug(f"Reply to {methodname} with {self.handle}")
|
|
assert handle == self.handle
|
|
|
|
if reply_handler:
|
|
reply_handler(handle)
|
|
except Exception as e:
|
|
self.error = e
|
|
|
|
# Handle any exceptions during the actual method call (not the Request
|
|
# handling itself). Can exit the mainloop if that happens
|
|
def error_cb(error):
|
|
try:
|
|
logger.debug(f"Error after {methodname} with {error}")
|
|
if error_handler:
|
|
error_handler(error)
|
|
self.error = error
|
|
except Exception as e:
|
|
self.error = e
|
|
finally:
|
|
if self._mainloop:
|
|
self._mainloop.quit()
|
|
|
|
# Method is invoked async, otherwise we can't mix and match signals
|
|
# and other calls. It's still sync as seen by the caller in that we
|
|
# have a mainloop that waits for us to finish though.
|
|
method(
|
|
*list(kwargs.values()),
|
|
reply_handler=reply_cb,
|
|
error_handler=error_cb,
|
|
)
|
|
|
|
self._mainloop.run()
|
|
|
|
if self.error:
|
|
raise self.error
|
|
elif not self.closed and self.response is None:
|
|
raise ResponseTimeout(f"Timed out waiting for response from {methodname}")
|
|
|
|
return self.response
|
|
|
|
|
|
class Session(Closable):
|
|
"""
|
|
Helper class for a Session created by a portal. This class takes care of
|
|
subscribing to the `Closed` signals. A typical invocation is:
|
|
|
|
>>> response = Request(connection, interface).call("CreateSession")
|
|
>>> session = Session.from_response(response)
|
|
# Now run the main loop and do other stuff
|
|
# Check if the session was closed
|
|
>>> if session.closed:
|
|
... pass
|
|
# or close the session explicitly
|
|
>>> session.close() # to close the session or
|
|
"""
|
|
|
|
def __init__(self, bus: dbus.Bus, handle: str):
|
|
assert handle
|
|
super().__init__(bus, handle)
|
|
|
|
self.handle = handle
|
|
self.details = None
|
|
# GLib makes assertions in callbacks impossible, so we wrap all
|
|
# callbacks into a try: except and store the error on the request to
|
|
# be raised later when we're back in the main context
|
|
self.error = None
|
|
self._closed_sig_received = False
|
|
|
|
def cb_closed(details: ASV) -> None:
|
|
try:
|
|
logger.debug(f"Session.Closed received on {self.handle}")
|
|
assert not self._closed_sig_received
|
|
self._closed_sig_received = True
|
|
self.details = details
|
|
if self._mainloop:
|
|
self._mainloop.quit()
|
|
except Exception as e:
|
|
self.error = e
|
|
|
|
proxy = bus.get_object("org.freedesktop.portal.Desktop", handle)
|
|
self.session_interface = dbus.Interface(proxy, "org.freedesktop.portal.Session")
|
|
self.session_interface.connect_to_signal("Closed", cb_closed)
|
|
|
|
@property
|
|
def closed(self):
|
|
"""
|
|
Returns True if the session was closed by the backend
|
|
"""
|
|
return self._closed_sig_received or super().closed
|
|
|
|
@classmethod
|
|
def from_response(cls, bus: dbus.Bus, response: Response) -> "Session":
|
|
return cls(bus, response.results["session_handle"])
|
|
|
|
|
|
class GDBusIfaceSignal:
|
|
"""
|
|
Helper class which represents a connected signal on a GDBusIface and can be
|
|
used to disconnect from the signal.
|
|
"""
|
|
|
|
def __init__(self, signal_id: int, proxy: Gio.DBusProxy):
|
|
self.signal_id = signal_id
|
|
self.proxy = proxy
|
|
|
|
def disconnect(self):
|
|
"""
|
|
Disconnects the signal
|
|
"""
|
|
self.proxy.disconnect(self.signal_id)
|
|
|
|
|
|
class GDBusIface:
|
|
"""
|
|
Helper class for calling dbus interfaces with complex arguments.
|
|
Usually you want to use python-dbus on the dbus_con fixture with
|
|
get_portal_iface , get_mock_iface or get_iface. This is convenient but
|
|
might not be sufficient for complex arguments or for asynchronously calling
|
|
a method.
|
|
"""
|
|
|
|
def __init__(self, bus: str, obj: str, iface: str):
|
|
"""
|
|
Creates a GDBusIface for a specific bus, object and interface on the
|
|
session bus.
|
|
"""
|
|
address = Gio.dbus_address_get_for_bus_sync(Gio.BusType.SESSION, None)
|
|
session_bus = Gio.DBusConnection.new_for_address_sync(
|
|
address,
|
|
Gio.DBusConnectionFlags.AUTHENTICATION_CLIENT
|
|
| Gio.DBusConnectionFlags.MESSAGE_BUS_CONNECTION,
|
|
None,
|
|
None,
|
|
)
|
|
assert session_bus
|
|
self._proxy = Gio.DBusProxy.new_sync(
|
|
session_bus,
|
|
Gio.DBusProxyFlags.NONE,
|
|
None,
|
|
bus,
|
|
obj,
|
|
iface,
|
|
None,
|
|
)
|
|
|
|
def _call(
|
|
self, method_name: str, args_variant: GLib.Variant, fds: List[int] = []
|
|
) -> GLib.Variant:
|
|
"""
|
|
Calls a method synchronously with the arguments passed in args_variant,
|
|
passing the file descriptors specified in fds.
|
|
Returns the result of the dbus call.
|
|
"""
|
|
fdlist = Gio.UnixFDList.new()
|
|
for fd in fds:
|
|
fdlist.append(fd)
|
|
|
|
return self._proxy.call_with_unix_fd_list_sync(
|
|
method_name,
|
|
args_variant,
|
|
0,
|
|
-1,
|
|
fdlist,
|
|
None,
|
|
)
|
|
|
|
def _call_async(
|
|
self,
|
|
method_name: str,
|
|
args_variant: GLib.Variant,
|
|
fds: List[int] = [],
|
|
cb: Optional[Callable[[GLib.Variant], None]] = None,
|
|
) -> None:
|
|
"""
|
|
Calls a method asynchronously with the arguments passed in args_variant,
|
|
passing the file descriptors specified in fds.
|
|
Invokes the callback cb when the call finished.
|
|
"""
|
|
fdlist = Gio.UnixFDList.new()
|
|
for fd in fds:
|
|
fdlist.append(fd)
|
|
|
|
def internal_cb(s, res, _):
|
|
res = s.call_finish(res)
|
|
if cb:
|
|
cb(res)
|
|
|
|
self._proxy.call_with_unix_fd_list(
|
|
method_name,
|
|
args_variant,
|
|
0,
|
|
-1,
|
|
fdlist,
|
|
None,
|
|
internal_cb,
|
|
None,
|
|
)
|
|
|
|
def connect_to_signal(
|
|
self, name: str, cb: Callable[[GLib.Variant], None]
|
|
) -> GDBusIfaceSignal:
|
|
"""
|
|
Connects to the dbus signal name to the callback cb. Returns an object
|
|
representing the connection which can be used to disconnect it again.
|
|
"""
|
|
|
|
def internal_cb(proxy, sender_name, signal_name, parameters):
|
|
if signal_name != name:
|
|
return
|
|
cb(parameters)
|
|
|
|
signal_id = self._proxy.connect("g-signal", internal_cb)
|
|
return GDBusIfaceSignal(signal_id, self._proxy)
|