Files
allhaileris afb81b8278
Some checks failed
Docker. / Ubuntu (push) Has been cancelled
User-agent updater. / User-agent (push) Failing after 15s
Lock Threads / lock (push) Failing after 10s
Waiting for answer. / waiting-for-answer (push) Failing after 22s
Needs user action. / needs-user-action (push) Failing after 8s
Can't reproduce. / cant-reproduce (push) Failing after 8s
Close stale issues and PRs / stale (push) Has been cancelled
init
2026-02-16 15:50:16 +03:00

385 lines
12 KiB
Python

# SPDX-License-Identifier: LGPL-2.1-or-later
#
# This file is formatted with Python Black
import tests as xdp
import pytest
import os
import gi
import subprocess
import re
gi.require_version("UMockdev", "1.0")
from gi.repository import GLib, UMockdev # noqa E402
@pytest.fixture
def required_templates():
return {"usb": {}}
@pytest.fixture
def umockdev():
return UMockdev.Testbed.new()
def umockdev_has_working_remove():
# umockdev only generates remove events since version 0.18.4
# https://github.com/martinpitt/umockdev/releases/tag/0.18.4
required = (0, 18, 4)
result = subprocess.run(["umockdev-run", "--version"], stdout=subprocess.PIPE)
if result.returncode != 0:
return False
match = re.match(r"^(\d+)\.(\d+)\.(\d+)", result.stdout.decode("UTF-8").strip())
if not match:
return False
version = tuple(map(int, match.groups()))
return version >= required
class TestUsb:
_num_devices = 0
def generate_device(
self, testbed, vendor, vendor_name, product, product_name, serial
):
n = self._num_devices
self._num_devices += 1
testbed.add_from_string(f"""P: /devices/usb{n}
N: bus/usb/001/{n:03d}
E: BUSNUM=001
E: DEVNUM={n:03d}
E: DEVNAME=/dev/bus/usb/001/{n:03d}
E: DEVTYPE=usb_device
E: DRIVER=usb
E: ID_BUS=usb
E: ID_MODEL={product_name}
E: ID_MODEL_ID={product}
E: ID_REVISION=0002
E: ID_SERIAL={vendor_name}_{product_name}_{serial}
E: ID_SERIAL_SHORT={serial}
E: ID_VENDOR={vendor_name}
E: ID_VENDOR_ID={vendor}
E: SUBSYSTEM=usb
A: idProduct={product}
A: idVendor={vendor}
""")
return f"/sys/devices/usb{n}"
def test_version(self, portals, dbus_con):
xdp.check_version(dbus_con, "Usb", 1)
def test_create_close_session(self, portals, dbus_con, app_id):
usb_intf = xdp.get_portal_iface(dbus_con, "Usb")
session = xdp.Session(
dbus_con,
usb_intf.CreateSession({"session_handle_token": "session_token0"}),
)
session.close()
def test_empty_initial_devices(self, portals, dbus_con, app_id):
usb_intf = xdp.get_portal_iface(dbus_con, "Usb")
xdp.Session(
dbus_con,
usb_intf.CreateSession({"session_handle_token": "session_token0"}),
)
device_events_signal_received = False
def cb_device_events(session_handle, events):
nonlocal device_events_signal_received
device_events_signal_received = True
usb_intf.connect_to_signal("DeviceEvents", cb_device_events)
xdp.wait(300)
assert not device_events_signal_received
@pytest.mark.parametrize("usb_queries", ["vnd:04a9", None])
def test_initial_devices(self, portals, dbus_con, app_id, usb_queries, umockdev):
usb_intf = xdp.get_portal_iface(dbus_con, "Usb")
self.generate_device(
umockdev,
"04a9",
"Canon_Inc.",
"31c0",
"Canon_Digital_Camera",
"C767F1C714174C309255F70E4A7B2EE2",
)
# TODO: to make this more robust, we should find a way to wait for
# the portal to pick up the device
xdp.wait(300)
session = xdp.Session(
dbus_con,
usb_intf.CreateSession({"session_handle_token": "session_token0"}),
)
device_events_signal_received = False
devices_received = 0
def cb_device_events(session_handle, events):
nonlocal device_events_signal_received
nonlocal devices_received
assert session.handle == session_handle
for action, id, device in events:
assert action == "add"
devices_received += 1
device_events_signal_received = True
usb_intf.connect_to_signal("DeviceEvents", cb_device_events)
if usb_queries is None:
xdp.wait(300)
assert not device_events_signal_received
assert devices_received == 0
else:
xdp.wait_for(lambda: device_events_signal_received)
assert devices_received == 1
@pytest.mark.parametrize("usb_queries", ["vnd:04a9", None])
def test_device_add(self, portals, dbus_con, app_id, usb_queries, umockdev):
usb_intf = xdp.get_portal_iface(dbus_con, "Usb")
session = xdp.Session(
dbus_con,
usb_intf.CreateSession({"session_handle_token": "session_token0"}),
)
device_events_signal_received = False
devices_received = 0
device = None
def cb_device_events(session_handle, events):
nonlocal device_events_signal_received
nonlocal devices_received
nonlocal device
assert session.handle == session_handle
for action, _, dev in events:
assert action == "add"
device = dev
devices_received += 1
device_events_signal_received = True
usb_intf.connect_to_signal("DeviceEvents", cb_device_events)
xdp.wait(300)
assert not device_events_signal_received
self.generate_device(
umockdev,
"04a9",
"Canon_Inc.",
"31c0",
"Canon_Digital_Camera",
"C767F1C714174C309255F70E4A7B2EE2",
)
if usb_queries is None:
xdp.wait(300)
assert not device_events_signal_received
assert devices_received == 0
else:
xdp.wait_for(lambda: device_events_signal_received)
assert devices_received == 1
assert device
assert device["readable"]
assert device["writable"]
assert device["device-file"] == "/dev/bus/usb/001/000"
assert device["properties"]["ID_VENDOR_ID"] == "04a9"
assert device["properties"]["ID_MODEL_ID"] == "31c0"
assert (
device["properties"]["ID_SERIAL"]
== "Canon_Inc._Canon_Digital_Camera_C767F1C714174C309255F70E4A7B2EE2"
)
@pytest.mark.parametrize("usb_queries", ["vnd:04a9", None])
@pytest.mark.skipif(
not umockdev_has_working_remove(), reason="UMockdev version 0.18.4 required"
)
def test_device_remove(self, portals, dbus_con, app_id, usb_queries, umockdev):
usb_intf = xdp.get_portal_iface(dbus_con, "Usb")
dev_path = self.generate_device(
umockdev,
"04a9",
"Canon_Inc.",
"31c0",
"Canon_Digital_Camera",
"C767F1C714174C309255F70E4A7B2EE2",
)
session = xdp.Session(
dbus_con,
usb_intf.CreateSession({"session_handle_token": "session_token0"}),
)
device_events_signal_count = 0
devices_received = 0
devices_removed = 0
def cb_device_events(session_handle, events):
nonlocal device_events_signal_count
nonlocal devices_received
nonlocal devices_removed
assert session.handle == session_handle
for action, id, device in events:
if action == "add":
devices_received += 1
elif action == "remove":
devices_removed += 1
else:
assert False
device_events_signal_count += 1
usb_intf.connect_to_signal("DeviceEvents", cb_device_events)
if usb_queries is None:
xdp.wait(300)
assert device_events_signal_count == 0
assert devices_received == 0
assert devices_removed == 0
else:
xdp.wait_for(lambda: device_events_signal_count == 1)
assert devices_received == 1
assert devices_removed == 0
umockdev.remove_device(dev_path)
if usb_queries is None:
xdp.wait(300)
assert device_events_signal_count == 0
assert devices_received == 0
assert devices_removed == 0
else:
xdp.wait_for(lambda: device_events_signal_count == 2)
assert devices_received == 1
assert devices_removed == 1
@pytest.mark.parametrize("usb_queries", ["vnd:04a9;vnd:04aa"])
@pytest.mark.parametrize(
"template_params", [{"usb": {"filters": {"vendor": "04a9"}}}]
)
def test_acquire(self, portals, dbus_con, app_id, umockdev):
usb_intf = xdp.get_portal_iface(dbus_con, "Usb")
self.generate_device(
umockdev,
"04a9",
"Canon_Inc.",
"31c0",
"Canon_Digital_Camera",
"C767F1C714174C309255F70E4A7B2EE2",
)
self.generate_device(
umockdev,
"04aa",
"Someone Else.",
"31c0",
"SomeProduct",
"00001",
)
possible_vendors = ["04a9", "04aa"]
devices = usb_intf.EnumerateDevices({})
assert len(devices) == 2
(id1, dev_info1) = devices[0]
assert id1
assert dev_info1
vendor_id = dev_info1["properties"]["ID_VENDOR_ID"]
assert vendor_id in possible_vendors
possible_vendors.remove(vendor_id)
(id2, dev_info2) = devices[1]
assert id2
assert dev_info2
vendor_id = dev_info2["properties"]["ID_VENDOR_ID"]
assert vendor_id in possible_vendors
possible_vendors.remove(vendor_id)
request = xdp.Request(dbus_con, usb_intf)
response = request.call(
"AcquireDevices",
parent_window="",
devices=[
(id1, {"writable": True}),
(id2, {"writable": True}),
],
options={},
)
assert response
assert response.response == 0
(results, finished) = usb_intf.FinishAcquireDevices(request.handle, {})
assert finished
assert len(results) == 1
(res_id, device) = results[0]
assert res_id == id1 or res_id == id2
assert device["success"]
fd = device["fd"].take()
assert fd > 0
with os.fdopen(fd, "r") as f:
assert f
assert "error" not in device
usb_intf.ReleaseDevices([res_id], {})
@pytest.mark.parametrize("usb_queries", ["vnd:0001"])
@pytest.mark.parametrize(
"expected,template_params",
[
(1, {"usb": {"filters": {"model": "0000"}}}),
(1, {"usb": {"filters": {"model": "0001"}}}),
(0, {"usb": {"filters": {"model": "0002"}}}),
(2, {"usb": {"filters": {"vendor": "0001"}}}),
(0, {"usb": {"filters": {"vendor": "0002"}}}),
(1, {"usb": {"filters": {"vendor": "0001", "model": "0000"}}}),
(0, {"usb": {"filters": {"vendor": "0002", "model": "0000"}}}),
],
)
def test_queries(self, portals, dbus_con, expected, app_id, usb_queries, umockdev):
usb_intf = xdp.get_portal_iface(dbus_con, "Usb")
for i in range(2):
self.generate_device(
umockdev,
"0001",
"example_org",
f"000{i}",
f"model{i}",
"0001",
)
devices = usb_intf.EnumerateDevices({})
assert len(devices) == 2
acquire_devices = [(id, {"writable": True}) for (id, _) in devices]
request = xdp.Request(dbus_con, usb_intf)
response = request.call(
"AcquireDevices",
parent_window="",
devices=acquire_devices,
options={},
)
assert response
assert response.response == 0
(results, finished) = usb_intf.FinishAcquireDevices(request.handle, {})
assert finished
assert len(results) == expected