Files
allhaileris afb81b8278
Some checks failed
Docker. / Ubuntu (push) Has been cancelled
User-agent updater. / User-agent (push) Failing after 15s
Lock Threads / lock (push) Failing after 10s
Waiting for answer. / waiting-for-answer (push) Failing after 22s
Needs user action. / needs-user-action (push) Failing after 8s
Can't reproduce. / cant-reproduce (push) Failing after 8s
Close stale issues and PRs / stale (push) Has been cancelled
init
2026-02-16 15:50:16 +03:00

422 lines
15 KiB
Python

# SPDX-License-Identifier: LGPL-2.1-or-later
#
# This file is formatted with Python Black
import tests as xdp
import pytest
import dbus
from pathlib import Path
import os
from gi.repository import GLib, Gio
EXPORT_FILES_FLAG_EXPORT_DIR = 8
def path_from_null_term_bytes(bytes):
path_bytes, rest = bytes.split(b"\x00")
assert rest == b""
return Path(os.fsdecode(path_bytes))
def get_mountpoint(documents_intf):
mountpoint = documents_intf.GetMountPoint(byte_arrays=True)
mountpoint = path_from_null_term_bytes(mountpoint)
assert mountpoint.exists()
return mountpoint
def write_bytes_atomic(file_path, bytes):
GLib.file_set_contents(file_path.absolute().as_posix(), bytes)
def write_bytes_trunc(file_path, bytes):
try:
fd = os.open(
file_path.absolute().as_posix(), os.O_RDWR | os.O_TRUNC | os.O_CREAT
)
os.write(fd, bytes)
finally:
os.close(fd)
def get_host_path_attr(path):
xattr = "xattr::document-portal.host-path"
file = Gio.file_new_for_path(path.absolute().as_posix())
info = file.query_info(xattr, Gio.FileQueryInfoFlags.NONE)
host_path = info.get_attribute_as_string(xattr)
if not host_path:
return None
return Path(os.fsdecode(host_path))
def export_file(documents_intf, file_path, unique=False):
assert file_path.exists()
with open(file_path.absolute().as_posix(), "r") as file:
doc_id = documents_intf.Add(file.fileno(), not unique, False)
assert doc_id
return doc_id
def export_file_named(documents_intf, folder_path, name, unique=False):
assert folder_path.exists()
# bytestring convention is zero terminated
name_nt = os.fsencode(name) + b"\x00"
try:
fd = os.open(folder_path.absolute().as_posix(), os.O_PATH | os.O_CLOEXEC)
doc_id = documents_intf.AddNamed(fd, name_nt, not unique, False)
assert doc_id
finally:
os.close(fd)
return doc_id
def export_files(documents_intf, file_paths, perms, flags=0, app_id=""):
fds = []
try:
for file_path in file_paths:
fds.append(
os.open(file_path.absolute().as_posix(), os.O_PATH | os.O_CLOEXEC)
)
result = documents_intf.AddFull(
fds,
flags,
app_id,
perms,
byte_arrays=True,
)
finally:
for fd in fds:
os.close(fd)
assert result
return result
class TestDocuments:
def test_version(self, xdg_document_portal, dbus_con):
documents = dbus_con.get_object(
"org.freedesktop.portal.Documents",
"/org/freedesktop/portal/documents",
)
properties_intf = dbus.Interface(
documents,
"org.freedesktop.DBus.Properties",
)
portal_version = properties_intf.Get(
"org.freedesktop.portal.Documents",
"version",
)
assert int(portal_version) == 5
def test_mount_point(self, xdg_document_portal, dbus_con):
documents_intf = xdp.get_document_portal_iface(dbus_con)
get_mountpoint(documents_intf)
def test_create_doc(self, xdg_document_portal, dbus_con):
documents_intf = xdp.get_document_portal_iface(dbus_con)
mountpoint = get_mountpoint(documents_intf)
content = b"content"
file_name = "a-file"
file_path = Path(os.environ["TMPDIR"]) / file_name
write_bytes_atomic(file_path, content)
doc_id = export_file(documents_intf, file_path)
doc_path = mountpoint / doc_id
doc_app1_path = mountpoint / "by-app" / "com.test.App1" / doc_id
doc_app2_path = mountpoint / "by-app" / "com.test.App2" / doc_id
# Make sure it got exported
assert (doc_path / file_name).read_bytes() == content
assert not (doc_path / "another-file").exists()
assert not (mountpoint / "anotherid" / file_name).exists()
# Make sure it is not viewable by apps
assert not doc_app1_path.exists()
assert not doc_app2_path.exists()
# Create a tmp file in same dir, ensure it works and can't be seen by other apps
write_bytes_atomic(doc_path / "tmp1", b"tmpdata1")
assert (doc_path / "tmp1").read_bytes() == b"tmpdata1"
assert not (doc_app1_path / "tmp1").exists()
# Ensure App 1 and only it can see the document and tmpfile
documents_intf.GrantPermissions(doc_id, "com.test.App1", ["read"])
assert (doc_app1_path / file_name).read_bytes() == content
assert not (doc_app2_path / file_name).exists()
# Make sure App 1 can't create a tmpfile
assert not (doc_app1_path / "tmp2").exists()
with pytest.raises(PermissionError):
(doc_app1_path / "tmp2").write_bytes(b"tmpdata1")
assert not (doc_app1_path / "tmp2").exists()
assert not (doc_path / "tmp2").exists()
# Update the document contents, ensure this is propagated
content = b"content2"
write_bytes_atomic(doc_path / file_name, content)
assert (doc_path / file_name).read_bytes() == content
assert (doc_app1_path / file_name).read_bytes() == content
assert file_path.read_bytes() == content
assert not (doc_app2_path / file_name).exists()
assert not (doc_app2_path / "tmp1").exists()
# Update the document contents outside fuse fd, ensure this is propagated
content = b"content3"
write_bytes_atomic(file_path, content)
assert (doc_path / file_name).read_bytes() == content
assert (doc_app1_path / file_name).read_bytes() == content
assert file_path.read_bytes() == content
assert not (doc_app2_path / file_name).exists()
assert not (doc_app2_path / "tmp1").exists()
# Try to update the doc from an app that can't write to it
with pytest.raises(PermissionError):
(doc_app1_path / file_name).write_bytes(b"content4")
# Update the doc from an app with write access
documents_intf.GrantPermissions(doc_id, "com.test.App1", ["write"])
content = b"content5"
write_bytes_atomic(doc_app1_path / file_name, content)
assert (doc_path / file_name).read_bytes() == content
assert (doc_app1_path / file_name).read_bytes() == content
assert file_path.read_bytes() == content
assert not (doc_app2_path / file_name).exists()
# Try to create a tmp file for an app
assert not (doc_app1_path / "tmp3").exists()
write_bytes_atomic(doc_app1_path / "tmp3", b"tmpdata2")
(doc_app1_path / "tmp3").read_bytes() == b"tmpdata2"
assert not (doc_path / "tmp3").exists()
# Re-Create a file from a fuse document file, in various ways
doc_id2 = export_file(documents_intf, (doc_path / file_name))
assert doc_id2 == doc_id
doc_id3 = export_file(documents_intf, (doc_app1_path / file_name))
assert doc_id3 == doc_id
doc_id4 = export_file(documents_intf, file_path)
assert doc_id4 == doc_id
# Ensure we can make a unique document
doc_id5 = export_file(documents_intf, file_path, unique=True)
assert doc_id5 != doc_id
def test_recursive_doc(self, xdg_document_portal, dbus_con):
documents_intf = xdp.get_document_portal_iface(dbus_con)
mountpoint = get_mountpoint(documents_intf)
content = b"content"
file_name = "recursive-file"
file_path = Path(os.environ["TMPDIR"]) / file_name
write_bytes_atomic(file_path, content)
doc_id = export_file(documents_intf, file_path)
doc_path = mountpoint / doc_id
doc_app1_path = mountpoint / "by-app" / "com.test.App1" / doc_id
assert (doc_path / file_name).read_bytes() == content
doc_id2 = export_file(documents_intf, doc_path / file_name)
assert doc_id2 == doc_id
documents_intf.GrantPermissions(doc_id, "com.test.App1", ["read"])
doc_id3 = export_file(documents_intf, doc_app1_path / file_name)
assert doc_id3 == doc_id
def test_create_docs(self, xdg_document_portal, dbus_con):
documents_intf = xdp.get_document_portal_iface(dbus_con)
mountpoint = get_mountpoint(documents_intf)
files = {
"doc1": b"doc1-content",
"doc2": b"doc2-content",
}
file_paths = []
for file_name, file_content in files.items():
file_path = Path(os.environ["TMPDIR"]) / file_name
write_bytes_atomic(file_path, file_content)
file_paths.append(file_path)
doc_ids, extra = export_files(
documents_intf, file_paths, ["read"], app_id="org.other.App"
)
assert extra
out_mountpoint = path_from_null_term_bytes(extra["mountpoint"])
assert out_mountpoint == mountpoint
assert doc_ids
for doc_id, (file_name, file_content) in zip(doc_ids, files.items()):
assert (mountpoint / doc_id / file_name).read_bytes() == file_content
assert (Path(os.environ["TMPDIR"]) / file_name).read_bytes() == file_content
app1_path = mountpoint / "by-app" / "com.test.App1" / doc_id / file_name
app2_path = mountpoint / "by-app" / "com.test.App2" / doc_id / file_name
assert not app1_path.exists()
assert not app2_path.exists()
assert not (mountpoint / doc_id / "another-file").exists()
assert not (mountpoint / "anotherid" / file_name).exists()
other_app_path = (
mountpoint / "by-app" / "org.other.App" / doc_id / file_name
)
assert other_app_path.read_bytes() == file_content
with pytest.raises(PermissionError):
other_app_path.write_bytes(b"new-content")
def test_add_named(self, xdg_document_portal, dbus_con):
documents_intf = xdp.get_document_portal_iface(dbus_con)
mountpoint = get_mountpoint(documents_intf)
content = b"content"
file_name = "add-named-1"
folder_path = Path(os.environ["TMPDIR"])
doc_id = export_file_named(documents_intf, folder_path, file_name)
assert doc_id
doc_path = mountpoint / doc_id
doc_app1_path = mountpoint / "by-app" / "com.test.App1" / doc_id
doc_app2_path = mountpoint / "by-app" / "com.test.App2" / doc_id
assert doc_path.exists()
assert not doc_app1_path.exists()
assert not (doc_path / file_name).exists()
assert not (doc_app1_path / file_name).exists()
documents_intf.GrantPermissions(doc_id, "com.test.App1", ["read", "write"])
assert doc_path.exists()
assert doc_app1_path.exists()
assert not (doc_path / file_name).exists()
assert not (doc_app1_path / file_name).exists()
# Update truncating with no previous file
write_bytes_trunc(doc_path / file_name, content)
assert (doc_path / file_name).read_bytes() == content
assert (doc_app1_path / file_name).read_bytes() == content
assert not (doc_app2_path / file_name).exists()
# Update truncating with previous file
content = b"content2"
write_bytes_trunc(doc_path / file_name, content)
assert (doc_path / file_name).read_bytes() == content
assert (doc_app1_path / file_name).read_bytes() == content
assert not (doc_app2_path / file_name).exists()
# Update atomic with previous file
content = b"content3"
write_bytes_atomic(doc_path / file_name, content)
assert (doc_path / file_name).read_bytes() == content
assert (doc_app1_path / file_name).read_bytes() == content
assert not (doc_app2_path / file_name).exists()
# Update from host
content = b"content4"
write_bytes_atomic(folder_path / file_name, content)
assert (doc_path / file_name).read_bytes() == content
assert (doc_app1_path / file_name).read_bytes() == content
assert not (doc_app2_path / file_name).exists()
# Unlink doc
(doc_path / file_name).unlink()
assert doc_path.exists()
assert doc_app1_path.exists()
assert not (doc_path / file_name).exists()
assert not (doc_app1_path / file_name).exists()
# Update atomic with no previous file
content = b"content5"
write_bytes_atomic(doc_path / file_name, content)
assert (doc_path / file_name).read_bytes() == content
assert (doc_app1_path / file_name).read_bytes() == content
assert not (doc_app2_path / file_name).exists()
# Unlink doc on host
(folder_path / file_name).unlink()
assert doc_path.exists()
assert doc_app1_path.exists()
assert not (doc_path / file_name).exists()
assert not (doc_app1_path / file_name).exists()
# Update atomic with unexpected no previous file
content = b"content6"
write_bytes_atomic(doc_path / file_name, content)
assert (doc_path / file_name).read_bytes() == content
assert (doc_app1_path / file_name).read_bytes() == content
assert not (doc_app2_path / file_name).exists()
# Unlink doc on host again
(folder_path / file_name).unlink()
assert doc_path.exists()
assert doc_app1_path.exists()
assert not (doc_path / file_name).exists()
assert not (doc_app1_path / file_name).exists()
# Update truncating with unexpected no previous file
content = b"content7"
write_bytes_trunc(doc_path / file_name, content)
assert (doc_path / file_name).read_bytes() == content
assert (doc_app1_path / file_name).read_bytes() == content
assert not (doc_app2_path / file_name).exists()
def test_get_host_paths(self, xdg_document_portal, dbus_con):
documents_intf = xdp.get_document_portal_iface(dbus_con)
content = b"content"
file_name = "host-path"
file_path = Path(os.environ["TMPDIR"]) / file_name
write_bytes_atomic(file_path, content)
doc_id = export_file(documents_intf, file_path)
host_paths = documents_intf.GetHostPaths([doc_id], byte_arrays=True)
assert doc_id in host_paths
doc_host_path = path_from_null_term_bytes(host_paths[doc_id])
assert doc_host_path == file_path
def test_host_paths_xattr(self, xdg_document_portal, dbus_con):
documents_intf = xdp.get_document_portal_iface(dbus_con)
mountpoint = get_mountpoint(documents_intf)
base_path = Path(os.environ["TMPDIR"]) / "a"
file_path = base_path / "b" / "c"
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_bytes(b"test")
doc_ids, extra = export_files(
documents_intf, [base_path], ["read"], flags=EXPORT_FILES_FLAG_EXPORT_DIR
)
doc_id = doc_ids[0]
host_path = get_host_path_attr(mountpoint / doc_id)
assert not host_path
host_path = get_host_path_attr(mountpoint / doc_id / "a")
assert host_path == base_path
host_path = get_host_path_attr(mountpoint / doc_id / "a" / "b")
assert host_path == base_path / "b"
host_path = get_host_path_attr(mountpoint / doc_id / "a" / "b" / "c")
assert host_path == base_path / "b" / "c"
try:
xdp.ensure_fuse_supported()
except xdp.FuseNotSupportedException as e:
pytest.skip(f"No fuse support: {e}", allow_module_level=True)