Files
allhaileris afb81b8278
Some checks failed
Docker. / Ubuntu (push) Has been cancelled
User-agent updater. / User-agent (push) Failing after 15s
Lock Threads / lock (push) Failing after 10s
Waiting for answer. / waiting-for-answer (push) Failing after 22s
Needs user action. / needs-user-action (push) Failing after 8s
Can't reproduce. / cant-reproduce (push) Failing after 8s
Close stale issues and PRs / stale (push) Has been cancelled
init
2026-02-16 15:50:16 +03:00

675 lines
22 KiB
Python

# SPDX-License-Identifier: LGPL-2.1-or-later
#
# This file is formatted with Python Black
import tests as xdp
import dbus
import pytest
import socket
from gi.repository import GLib
from itertools import count
from typing import Any
counter = count()
def default_zones():
return [(1024, 768, 0, 0), (640, 480, 1024, 0)]
@pytest.fixture
def required_templates():
return {"inputcapture": {}}
@pytest.fixture
def zones():
return default_zones()
class TestInputCapture:
def create_session(self, dbus_con, capabilities=0xF):
"""
Call CreateSession for the given capabilities and return the
(response, results) tuple.
"""
inputcapture_intf = xdp.get_portal_iface(dbus_con, "InputCapture")
mock_intf = xdp.get_mock_iface(dbus_con)
request = xdp.Request(dbus_con, inputcapture_intf)
capabilities = dbus.UInt32(capabilities, variant_level=1)
session_handle_token = dbus.String(f"session{next(counter)}", variant_level=1)
options = dbus.Dictionary(
{
"capabilities": capabilities,
"session_handle_token": session_handle_token,
},
signature="sv",
)
response = request.call("CreateSession", parent_window="", options=options)
assert response
assert response.response == 0
assert "session_handle" in response.results
assert "capabilities" in response.results
caps = response.results["capabilities"]
# Returned capabilities must be a subset of the requested ones
assert caps & ~capabilities == 0
self.current_session_handle = response.results["session_handle"]
# Check the impl portal was called with the right args
method_calls = mock_intf.GetMethodCalls("CreateSession")
assert len(method_calls) > 0
_, args = method_calls[-1]
assert args[3] == "" # parent window
assert args[4]["capabilities"] == capabilities
return response
def get_zones(self, dbus_con):
"""
Call GetZones and return the (response, results) tuple.
"""
inputcapture_intf = xdp.get_portal_iface(dbus_con, "InputCapture")
mock_intf = xdp.get_mock_iface(dbus_con)
request = xdp.Request(dbus_con, inputcapture_intf)
options: Any = {}
response = request.call(
"GetZones", session_handle=self.current_session_handle, options=options
)
assert response
assert response.response == 0
assert "zones" in response.results
assert "zone_set" in response.results
self.current_zone_set = response.results["zone_set"]
# Check the impl portal was called with the right args
method_calls = mock_intf.GetMethodCalls("GetZones")
assert len(method_calls) > 0
_, args = method_calls[-1]
assert args[0] == request.handle
assert args[1] == self.current_session_handle
return response
def set_pointer_barriers(self, dbus_con, barriers):
inputcapture_intf = xdp.get_portal_iface(dbus_con, "InputCapture")
mock_intf = xdp.get_mock_iface(dbus_con)
request = xdp.Request(dbus_con, inputcapture_intf)
options: Any = {}
response = request.call(
"SetPointerBarriers",
session_handle=self.current_session_handle,
options=options,
barriers=barriers,
zone_set=self.current_zone_set,
)
assert response
assert response.response == 0
assert "failed_barriers" in response.results
# Check the impl portal was called with the right args
method_calls = mock_intf.GetMethodCalls("SetPointerBarriers")
assert len(method_calls) > 0
_, args = method_calls[-1]
assert args[0] == request.handle
assert args[1] == self.current_session_handle
assert args[4] == barriers
assert args[5] == self.current_zone_set
return response
def connect_to_eis(self, dbus_con):
inputcapture_intf = xdp.get_portal_iface(dbus_con, "InputCapture")
mock_intf = xdp.get_mock_iface(dbus_con)
fd = inputcapture_intf.ConnectToEIS(
self.current_session_handle, dbus.Dictionary({}, signature="sv")
)
# Our dbusmock template sends HELLO
eis_socket = socket.fromfd(fd.take(), socket.AF_UNIX, socket.SOCK_STREAM)
hello = eis_socket.recv(10)
assert hello == b"HELLO"
method_calls = mock_intf.GetMethodCalls("ConnectToEIS")
assert len(method_calls) > 0
_, args = method_calls[-1]
assert args[0] == self.current_session_handle
return eis_socket
def enable(self, dbus_con):
inputcapture_intf = xdp.get_portal_iface(dbus_con, "InputCapture")
mock_intf = xdp.get_mock_iface(dbus_con)
inputcapture_intf.Enable(
self.current_session_handle, dbus.Dictionary({}, signature="sv")
)
method_calls = mock_intf.GetMethodCalls("Enable")
assert len(method_calls) > 0
_, args = method_calls[-1]
assert args[0] == self.current_session_handle
def disable(self, dbus_con):
inputcapture_intf = xdp.get_portal_iface(dbus_con, "InputCapture")
mock_intf = xdp.get_mock_iface(dbus_con)
inputcapture_intf.Disable(
self.current_session_handle, dbus.Dictionary({}, signature="sv")
)
method_calls = mock_intf.GetMethodCalls("Disable")
assert len(method_calls) > 0
_, args = method_calls[-1]
assert args[0] == self.current_session_handle
def release(self, dbus_con, activation_id: int, cursor_position=None):
inputcapture_intf = xdp.get_portal_iface(dbus_con, "InputCapture")
mock_intf = xdp.get_mock_iface(dbus_con)
options = {"activation_id": dbus.UInt32(activation_id)}
if cursor_position:
options["cursor_position"] = dbus.Struct(
list(cursor_position), signature="dd", variant_level=1
)
inputcapture_intf.Release(
self.current_session_handle, dbus.Dictionary(options, signature="sv")
)
method_calls = mock_intf.GetMethodCalls("Release")
assert len(method_calls) > 0
_, args = method_calls[-1]
assert args[0] == self.current_session_handle
assert "activation_id" in args[2]
aid = args[2]["activation_id"]
assert aid == activation_id
if cursor_position:
assert "cursor_position" in args[2]
pos = args[2]["cursor_position"]
assert pos == cursor_position
def test_version(self, portals, dbus_con):
xdp.check_version(dbus_con, "InputCapture", 1)
@pytest.mark.parametrize(
"template_params",
(
{
"inputcapture": {
"supported_capabilities": 0b101, # KEYBOARD, POINTER, TOUCH
},
},
),
)
def test_supported_capabilities(self, portals, dbus_con):
properties_intf = xdp.get_iface(dbus_con, "org.freedesktop.DBus.Properties")
caps = properties_intf.Get(
"org.freedesktop.portal.InputCapture", "SupportedCapabilities"
)
assert caps == 0b101
def test_create_session(self, portals, dbus_con):
mock_intf = xdp.get_mock_iface(dbus_con)
self.create_session(dbus_con, capabilities=0b1) # KEYBOARD
# Check the impl portal was called with the right args
method_calls = mock_intf.GetMethodCalls("CreateSession")
assert len(method_calls) == 1
_, args = method_calls.pop(0)
assert args[3] == "" # parent window
assert args[4]["capabilities"] == 0b1
@pytest.mark.parametrize(
"template_params",
(
{
"inputcapture": {
"capabilities": 0b110, # TOUCH, POINTER
"supported_capabilities": 0b111, # TOUCH, POINTER, KEYBOARD
},
},
),
)
def test_create_session_limited_caps(self, portals, dbus_con):
mock_intf = xdp.get_mock_iface(dbus_con)
# Request more caps than are supported
response, results = self.create_session(dbus_con, capabilities=0b111)
caps = results["capabilities"]
# Returned capabilities must the ones we set up in the params
assert caps == 0b110
# Check the impl portal was called with the right args
method_calls = mock_intf.GetMethodCalls("CreateSession")
assert len(method_calls) == 1
_, args = method_calls.pop(0)
assert args[3] == "" # parent window
assert args[4]["capabilities"] == 0b111
@pytest.mark.parametrize(
"template_params",
(
{
"inputcapture": {
"default-zone": dbus.Array(
[dbus.Struct(z, signature="uuii") for z in default_zones()],
signature="(uuii)",
variant_level=1,
)
},
},
),
)
def test_get_zones(self, portals, dbus_con, zones):
mock_intf = xdp.get_mock_iface(dbus_con)
response, results = self.create_session(dbus_con)
response, results = self.get_zones(dbus_con)
for z1, z2 in zip(results["zones"], zones):
assert z1 == z2
# Check the impl portal was called with the right args
method_calls = mock_intf.GetMethodCalls("CreateSession")
assert len(method_calls) == 1
method_calls = mock_intf.GetMethodCalls("GetZones")
assert len(method_calls) == 1
@pytest.mark.parametrize(
"template_params",
(
{
"inputcapture": {
"default-zone": dbus.Array(
[dbus.Struct(z, signature="uuii") for z in default_zones()],
signature="(uuii)",
variant_level=1,
)
},
},
),
)
def test_set_pointer_barriers(self, portals, dbus_con, zones):
mock_intf = xdp.get_mock_iface(dbus_con)
response, results = self.create_session(dbus_con)
response, results = self.get_zones(dbus_con)
barriers = [
{
"barrier_id": dbus.UInt32(10, variant_level=1),
"position": dbus.Struct(
[0, 0, 0, 768], signature="iiii", variant_level=1
),
},
{
"barrier_id": dbus.UInt32(11, variant_level=1),
"position": dbus.Struct(
[0, 0, 1024, 0], signature="iiii", variant_level=1
),
},
{
"barrier_id": dbus.UInt32(12, variant_level=1),
"position": dbus.Struct(
[1024, 0, 1024, 768], signature="iiii", variant_level=1
),
},
{
"barrier_id": dbus.UInt32(13, variant_level=1),
"position": dbus.Struct(
[0, 768, 1024, 768], signature="iiii", variant_level=1
),
},
{
"barrier_id": dbus.UInt32(14, variant_level=1),
"position": dbus.Struct(
[100, 768, 500, 768], signature="iiii", variant_level=1
),
},
{
"barrier_id": dbus.UInt32(15, variant_level=1),
"position": dbus.Struct(
[1024, 0, 1024, 480], signature="iiii", variant_level=1
),
},
{
"barrier_id": dbus.UInt32(16, variant_level=1),
"position": dbus.Struct(
[1024 + 640, 0, 1024 + 640, 480], signature="iiii", variant_level=1
),
},
# invalid ones
{
"barrier_id": dbus.UInt32(20, variant_level=1),
"position": dbus.Struct(
[0, 1, 3, 4], signature="iiii", variant_level=1
),
},
{
"barrier_id": dbus.UInt32(21, variant_level=1),
"position": dbus.Struct(
[0, 1, 1024, 1], signature="iiii", variant_level=1
),
},
{
"barrier_id": dbus.UInt32(22, variant_level=1),
"position": dbus.Struct(
[1, 0, 1, 768], signature="iiii", variant_level=1
),
},
{
"barrier_id": dbus.UInt32(23, variant_level=1),
"position": dbus.Struct(
[1023, 0, 1023, 768], signature="iiii", variant_level=1
),
},
{
"barrier_id": dbus.UInt32(24, variant_level=1),
"position": dbus.Struct(
[0, 0, 1050, 0], signature="iiii", variant_level=1
),
},
]
response, results = self.set_pointer_barriers(dbus_con, barriers=barriers)
failed_barriers = results["failed_barriers"]
assert all([id >= 20 for id in failed_barriers])
for id in [b["barrier_id"] for b in barriers if b["barrier_id"] >= 20]:
assert id in failed_barriers
# Check the impl portal was called with the right args
method_calls = mock_intf.GetMethodCalls("CreateSession")
assert len(method_calls) == 1
method_calls = mock_intf.GetMethodCalls("GetZones")
assert len(method_calls) == 1
method_calls = mock_intf.GetMethodCalls("SetPointerBarriers")
assert len(method_calls) == 1
_, args = method_calls.pop(0)
assert args[4] == barriers
assert args[5] == self.current_zone_set
def test_connect_to_eis(self, portals, dbus_con):
self.create_session(dbus_con)
self.get_zones(dbus_con)
# The default zone is 1920x1080
barriers = [
{
"barrier_id": dbus.UInt32(10, variant_level=1),
"position": dbus.Struct(
[0, 0, 1920, 0], signature="iiii", variant_level=1
),
},
]
self.set_pointer_barriers(dbus_con, barriers)
self.connect_to_eis(dbus_con)
def test_enable_disable(self, portals, dbus_con):
mock_intf = xdp.get_mock_iface(dbus_con)
self.create_session(dbus_con)
self.get_zones(dbus_con)
# The default zone is 1920x1080
barriers = [
{
"barrier_id": dbus.UInt32(10, variant_level=1),
"position": dbus.Struct(
[0, 0, 1920, 0], signature="iiii", variant_level=1
),
},
]
self.set_pointer_barriers(dbus_con, barriers)
self.connect_to_eis(dbus_con)
# Disable before enable should be a noop
self.disable(dbus_con)
method_calls = mock_intf.GetMethodCalls("Disable")
assert len(method_calls) == 1
self.enable(dbus_con)
method_calls = mock_intf.GetMethodCalls("Enable")
assert len(method_calls) == 1
self.disable(dbus_con)
method_calls = mock_intf.GetMethodCalls("Disable")
assert len(method_calls) == 2
@pytest.mark.parametrize(
"template_params",
(
{
"inputcapture": {
"disable-delay": 200,
},
},
),
)
def test_disable_signal(self, portals, dbus_con):
inputcapture_intf = xdp.get_portal_iface(dbus_con, "InputCapture")
self.create_session(dbus_con)
self.get_zones(dbus_con)
# The default zone is 1920x1080
barriers = [
{
"barrier_id": dbus.UInt32(10, variant_level=1),
"position": dbus.Struct(
[0, 0, 1920, 0], signature="iiii", variant_level=1
),
},
]
self.set_pointer_barriers(dbus_con, barriers)
self.connect_to_eis(dbus_con)
disabled_signal_received = False
def cb_disabled(session_handle, options):
nonlocal disabled_signal_received
disabled_signal_received = True
assert session_handle == session_handle
inputcapture_intf.connect_to_signal("Disabled", cb_disabled)
self.enable(dbus_con)
xdp.wait_for(lambda: disabled_signal_received)
@pytest.mark.parametrize(
"template_params",
(
{
"inputcapture": {
"activated-delay": 200,
"deactivated-delay": 300,
},
},
),
)
def test_activated_signal(self, portals, dbus_con):
inputcapture_intf = xdp.get_portal_iface(dbus_con, "InputCapture")
self.create_session(dbus_con)
self.get_zones(dbus_con)
# The default zone is 1920x1080
barriers = [
{
"barrier_id": dbus.UInt32(10, variant_level=1),
"position": dbus.Struct(
[0, 0, 1920, 0], signature="iiii", variant_level=1
),
},
]
self.set_pointer_barriers(dbus_con, barriers)
self.connect_to_eis(dbus_con)
disabled_signal_received = False
activated_signal_received = False
deactivated_signal_received = False
def cb_disabled(session_handle, options):
nonlocal disabled_signal_received
disabled_signal_received = True
def cb_activated(session_handle, options):
nonlocal activated_signal_received
activated_signal_received = True
assert session_handle == session_handle
assert "activation_id" in options
assert "barrier_id" in options
assert options["barrier_id"] == 10 # template uses first barrier
assert "cursor_position" in options
assert options["cursor_position"] == (
10.0,
20.0,
) # template uses x+10, y+20 of first barrier
def cb_deactivated(session_handle, options):
nonlocal deactivated_signal_received
deactivated_signal_received = True
assert session_handle == session_handle
assert "activation_id" in options
assert "cursor_position" in options
assert options["cursor_position"] == (
10.0,
20.0,
) # template uses x+10, y+20 of first barrier
inputcapture_intf.connect_to_signal("Activated", cb_activated)
inputcapture_intf.connect_to_signal("Deactivated", cb_deactivated)
inputcapture_intf.connect_to_signal("Disabled", cb_disabled)
self.enable(dbus_con)
xdp.wait_for(lambda: activated_signal_received and deactivated_signal_received)
assert not disabled_signal_received
# Disabling should not trigger the signal
self.disable(dbus_con)
assert not disabled_signal_received
@pytest.mark.parametrize(
"template_params",
(
{
"inputcapture": {
"zones-changed-delay": 200,
},
},
),
)
def test_zones_changed_signal(self, portals, dbus_con):
inputcapture_intf = xdp.get_portal_iface(dbus_con, "InputCapture")
self.create_session(dbus_con)
self.get_zones(dbus_con)
# The default zone is 1920x1080
barriers = [
{
"barrier_id": dbus.UInt32(10, variant_level=1),
"position": dbus.Struct(
[0, 0, 1920, 0], signature="iiii", variant_level=1
),
},
]
self.set_pointer_barriers(dbus_con, barriers)
self.connect_to_eis(dbus_con)
zones_changed_signal_received = False
def cb_zones_changed(session_handle, options):
nonlocal zones_changed_signal_received
zones_changed_signal_received = True
assert session_handle == session_handle
inputcapture_intf.connect_to_signal("ZonesChanged", cb_zones_changed)
self.enable(dbus_con)
xdp.wait_for(lambda: zones_changed_signal_received)
@pytest.mark.parametrize(
"template_params",
(
{
"inputcapture": {
"activated-delay": 200,
"deactivated-delay": 1000,
"disabled-delay": 1200,
},
},
),
)
def test_release(self, portals, dbus_con):
inputcapture_intf = xdp.get_portal_iface(dbus_con, "InputCapture")
self.create_session(dbus_con)
self.get_zones(dbus_con)
# The default zone is 1920x1080
barriers = [
{
"barrier_id": dbus.UInt32(10, variant_level=1),
"position": dbus.Struct(
[0, 0, 1920, 0], signature="iiii", variant_level=1
),
},
]
self.set_pointer_barriers(dbus_con, barriers)
self.connect_to_eis(dbus_con)
disabled_signal_received = False
activated_signal_received = False
deactivated_signal_received = False
activation_id = None
def cb_disabled(session_handle, options):
nonlocal disabled_signal_received
disabled_signal_received = True
def cb_activated(session_handle, options):
nonlocal activated_signal_received, activation_id
activated_signal_received = True
activation_id = options["activation_id"]
def cb_deactivated(session_handle, options):
nonlocal deactivated_signal_received
deactivated_signal_received = True
inputcapture_intf.connect_to_signal("Disabled", cb_activated)
inputcapture_intf.connect_to_signal("Activated", cb_activated)
inputcapture_intf.connect_to_signal("Deactivated", cb_deactivated)
self.enable(dbus_con)
xdp.wait_for(lambda: activated_signal_received)
assert activation_id is not None
assert not deactivated_signal_received
assert not disabled_signal_received
self.release(
dbus_con, cursor_position=(10.0, 50.0), activation_id=activation_id
)
# XDP should filter any signals the implementation may
# send after Release().
mainloop = GLib.MainLoop()
GLib.timeout_add(1000, mainloop.quit)
mainloop.run()
# Release() implies deactivated
assert not deactivated_signal_received
assert not disabled_signal_received