Files
tdesktop/Telegram/ThirdParty/xdg-desktop-portal/tests/test_document_fuse.py
allhaileris afb81b8278
Some checks failed
Docker. / Ubuntu (push) Has been cancelled
User-agent updater. / User-agent (push) Failing after 15s
Lock Threads / lock (push) Failing after 10s
Waiting for answer. / waiting-for-answer (push) Failing after 22s
Needs user action. / needs-user-action (push) Failing after 8s
Can't reproduce. / cant-reproduce (push) Failing after 8s
Close stale issues and PRs / stale (push) Has been cancelled
init
2026-02-16 15:50:16 +03:00

1260 lines
39 KiB
Python

# SPDX-License-Identifier: LGPL-2.1-or-later
#
# This file is formatted with Python Black
import tests as xdp
import pytest
import errno
import os
import random
import stat
import sys
import multiprocessing as mp
import traceback
from collections import defaultdict
from gi.repository import Gio, GLib
@pytest.fixture
def app_id():
return None
def filename_to_ay(filename):
return list(filename.encode("utf-8")) + [0]
app_prefix = "org.test."
dir_prefix = "dir"
ensure_no_remaining = True
running_count: defaultdict[str, int] = defaultdict(int)
def log(str):
print(str, file=sys.stderr)
def logv(str):
log(str)
def get_a_count(counter: str):
global running_count
running_count[counter] += 1
return running_count[counter]
def setFileContent(path, content):
with open(path, "w") as f:
f.write(content)
def appendFileContent(path, content):
with open(path, "a") as f:
f.write(content)
def readFdContent(fd):
os.lseek(fd, 0, os.SEEK_SET)
return str(os.read(fd, 64 * 1024), "utf-8")
def replaceFdContent(fd, content):
os.lseek(fd, 0, os.SEEK_SET)
os.ftruncate(fd, 0)
os.write(fd, bytes(content, "utf-8"))
def appendFdContent(fd, content):
os.lseek(fd, 0, os.SEEK_END)
os.write(fd, bytes(content, "utf-8"))
DOCUMENT_ADD_FLAGS_REUSE_EXISTING = 1 << 0
DOCUMENT_ADD_FLAGS_PERSISTENT = 1 << 1
DOCUMENT_ADD_FLAGS_AS_NEEDED_BY_APP = 1 << 2
DOCUMENT_ADD_FLAGS_DIRECTORY = 1 << 3
def assertRaises(exc_type, func, *args, **kwargs):
with pytest.raises(exc_type):
func(*args, **kwargs)
def assertRaisesErrno(error_nr, func, *args, **kwargs):
with pytest.raises(OSError) as excinfo:
func(*args, **kwargs)
if excinfo.value.errno != error_nr:
raise AssertionError(
"Wrong errno {0} was raised instead of {1}".format(
excinfo.value.errno, error_nr
)
)
def assertRaisesGError(message, code, func, *args, **kwargs):
with pytest.raises(GLib.GError) as excinfo:
func(*args, **kwargs)
if not excinfo.value.message.startswith(message):
raise AssertionError(
"Wrong message {0} doesn't start with {1}".format(
excinfo.value.message, message
)
)
if excinfo.value.code != code:
raise AssertionError(
"Wrong code {0} was raised instead of {1}".format(excinfo.value.code, code)
)
def assertFileHasContent(path, expected_content):
with open(path) as f:
file_content = f.read()
assert file_content == expected_content
def assertFdHasContent(fd, expected_content):
content = readFdContent(fd)
assert content == expected_content
def assertSameStat(a, b, b_mode_mask):
if not (
a.st_mode == (b.st_mode & b_mode_mask)
and a.st_nlink == b.st_nlink
and a.st_size == b.st_size
and a.st_uid == b.st_uid
and a.st_gid == b.st_gid
and a.st_atime == b.st_atime
and a.st_mtime == b.st_mtime
and a.st_ctime == b.st_ctime
):
raise AssertionError("Stat value {} was not the expected {})".format(a, b))
def assertFileExist(path):
try:
info = os.lstat(path)
if info.st_mode & stat.S_IFREG != stat.S_IFREG:
raise AssertionError("File {} is not a regular file".format(path))
except Exception:
raise AssertionError("File {} doesn't exist".format(path))
def assertDirExist(path):
try:
info = os.lstat(path)
if info.st_mode & stat.S_IFDIR != stat.S_IFDIR:
raise AssertionError("File {} is not a directory file".format(path))
except Exception:
raise AssertionError("File {} doesn't exist".format(path))
def assertSymlink(path, expected_target):
try:
info = os.lstat(path)
if info.st_mode & stat.S_IFLNK != stat.S_IFLNK:
raise AssertionError("File {} is not a symlink".format(path))
target = os.readlink(path)
if target != expected_target:
raise AssertionError(
"File {} has wrong target {}, expected {}".format(
path, target, expected_target
)
)
except Exception:
raise AssertionError("Symlink {} doesn't exist".format(path))
def assertFileNotExist(path):
try:
os.lstat(path)
except FileNotFoundError:
return
except Exception:
raise AssertionError(
"Got wrong execption {} for {}, expected FileNotFoundError".format(
sys.exc_info()[0], path
)
)
raise AssertionError("Path {} unexpectedly exists".format(path))
def assertDirFiles(path, expected_files, exhaustive=True, volatile_files=None):
found_files = os.listdir(path)
remaining = set(found_files)
for file in expected_files:
if file in remaining:
remaining.remove(file)
elif file not in volatile_files:
raise AssertionError(
"Expected file {} not found in dir {} (all: {})".format(
file, path, found_files
)
)
if exhaustive:
if len(remaining) != 0:
raise AssertionError(
"Unexpected files {} in dir {} (all: {})".format(
remaining, path, found_files
)
)
class Doc:
def __init__(self, portal, id, path, content, is_dir=False):
self.portal = portal
self.id = id
self.content = content
self.real_path = path
self.is_dir = is_dir
self.apps = []
self.files = []
if is_dir:
self.real_dirname = path
self.filename = None
self.dirname = os.path.basename(path)
else:
(self.real_dirname, self.filename) = os.path.split(path)
self.dirname = None
if content:
self.files.append(self.filename)
def is_readable_by(self, app_id):
if app_id:
return app_id in self.apps
return True
def is_writable_by(self, app_id):
if app_id:
return app_id in self.apps and ".write." in app_id
else:
return True
def get_doc_path(self, app_id):
if app_id:
base = self.portal.app_path(app_id) + "/" + self.id
else:
base = self.portal.mountpoint + "/" + self.id
if self.is_dir:
return base + "/" + self.dirname
else:
return base
def __str__(self):
name = self.id
if self.is_dir:
return "%s(dir)" % (name)
elif self.content is None:
return "%s(missing)" % (name)
else:
return "%s" % (name)
class DocPortal:
def __init__(self):
self.apps = []
self.volatile_apps = set()
self.docs = {}
self.bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
self.proxy = Gio.DBusProxy.new_sync(
self.bus,
Gio.DBusProxyFlags.NONE,
None,
"org.freedesktop.portal.Documents",
"/org/freedesktop/portal/documents",
"org.freedesktop.portal.Documents",
None,
)
self.mountpoint = self.get_mount_path()
def get_mount_path(self):
res = self.proxy.call_sync("GetMountPoint", GLib.Variant("()", ()), 0, -1, None)
return bytearray(res[0][:-1]).decode("utf-8")
def grant_permissions(self, doc_id, app_id, permissions):
self.proxy.call_sync(
"GrantPermissions",
GLib.Variant("(ssas)", (doc_id, app_id, permissions)),
0,
-1,
None,
)
def lookup(self, path):
res = self.proxy.call_sync(
"Lookup", GLib.Variant("(ay)", (filename_to_ay(path),)), 0, -1, None
)
return res[0]
def delete(self, doc_id):
self.proxy.call_sync("Delete", GLib.Variant("(s)", (doc_id,)), 0, -1, None)
del self.docs[doc_id]
def add(self, path, reuse_existing=True):
fdlist = Gio.UnixFDList.new()
fd = os.open(path, os.O_PATH)
handle = fdlist.append(fd)
os.close(fd)
res = self.proxy.call_with_unix_fd_list_sync(
"Add",
GLib.Variant("(hbb)", (handle, reuse_existing, False)),
0,
-1,
fdlist,
None,
)
doc_id = res[0][0]
if doc_id in self.docs:
return self.docs[doc_id]
with open(path) as f:
content = f.read()
doc = Doc(self, doc_id, path, content)
self.docs[doc.id] = doc
return doc
def add_named(self, path, reuse_existing=True):
(dirname, filename) = os.path.split(path)
fdlist = Gio.UnixFDList.new()
fd = os.open(dirname, os.O_PATH)
handle = fdlist.append(fd)
os.close(fd)
res = self.proxy.call_with_unix_fd_list_sync(
"AddNamed",
GLib.Variant(
"(haybb)", (handle, filename_to_ay(filename), reuse_existing, False)
),
0,
-1,
fdlist,
None,
)
doc_id = res[0][0]
if doc_id in self.docs:
return self.docs[doc_id]
try:
with open(path) as f:
content = f.read()
except Exception:
content = None
doc = Doc(self, doc_id, path, content)
self.docs[doc.id] = doc
return doc
def add_full(self, path, flags):
fdlist = Gio.UnixFDList.new()
fd = os.open(path, os.O_PATH)
handle = fdlist.append(fd)
os.close(fd)
res = self.proxy.call_with_unix_fd_list_sync(
"AddFull",
GLib.Variant("(ahusas)", ([handle], flags, "", [])),
0,
-1,
fdlist,
None,
)
doc_id = res[0][0][0]
if doc_id in self.docs:
return self.docs[doc_id]
doc = Doc(self, doc_id, path, True, (flags & DOCUMENT_ADD_FLAGS_DIRECTORY) != 0)
self.docs[doc.id] = doc
return doc
def add_dir(self, path):
return self.add_full(
path, DOCUMENT_ADD_FLAGS_REUSE_EXISTING | DOCUMENT_ADD_FLAGS_DIRECTORY
)
def get_docs_for_app(self, app_id):
docs = []
for doc in self.docs.values():
if doc.is_readable_by(app_id):
docs.append(doc.id)
return docs
def ensure_app_id(self, app_id, volatile=False):
if app_id not in self.apps:
self.apps.append(app_id)
if volatile:
self.volatile_apps.add(app_id)
def get_docs(self):
return list(self.docs.values())
def get_docs_randomized(self):
docs = list(self.docs.values())
random.shuffle(docs)
return docs
def get_doc(self, doc_id):
return self.docs[doc_id]
def get_app_ids(self):
return self.apps
def get_volatile_app_ids(self):
return self.volatile_apps
def get_app_ids_randomized(self):
apps = self.apps.copy()
random.shuffle(apps)
return apps
def by_app_path(self):
return self.mountpoint + "/by-app"
def app_path(self, app_id):
return self.mountpoint + "/by-app/" + app_id
class FileTransferPortal(DocPortal):
def __init__(self):
super().__init__()
self.ft_proxy = Gio.DBusProxy.new_sync(
self.bus,
Gio.DBusProxyFlags.NONE,
None,
"org.freedesktop.portal.Documents",
"/org/freedesktop/portal/documents",
"org.freedesktop.portal.FileTransfer",
None,
)
def start_transfer(self):
res = self.ft_proxy.call_sync(
"StartTransfer", GLib.Variant("(a{sv})", ([None])), 0, -1, None
)
return res[0]
def add_files(self, key, files):
fdlist = Gio.UnixFDList.new()
handles = []
for filename in files:
fd = os.open(filename, os.O_PATH)
handle = fdlist.append(fd)
handles.append(handle)
os.close(fd)
res = self.ft_proxy.call_with_unix_fd_list_sync(
"AddFiles",
GLib.Variant("(saha{sv})", (key, handles, [])),
0,
-1,
fdlist,
None,
)
return res
def retrieve_files(self, key):
res = self.ft_proxy.call_sync(
"RetrieveFiles",
GLib.Variant("(sa{sv})", (key, [])),
0,
-1,
None,
)
return res
def stop_transfer(self, key):
res = self.ft_proxy.call_sync(
"StopTransfer",
GLib.Variant("(s)", (key,)),
0,
-1,
None,
)
return res
def check_virtual_stat(info, writable=False):
assert info.st_uid == os.getuid()
assert info.st_gid == os.getgid()
if writable:
assert info.st_mode == stat.S_IFDIR | 0o700
else:
assert info.st_mode == stat.S_IFDIR | 0o500
def verify_virtual_dir(path, files, volatile_files=None):
info = os.lstat(path)
check_virtual_stat(info)
assert os.access(path, os.R_OK)
assert not os.access(path, os.W_OK)
assertRaises(FileNotFoundError, os.lstat, path + "/not-existing-file")
if files is not None:
assertDirFiles(path, files, ensure_no_remaining, volatile_files)
def verify_doc(doc, app_id=None):
dir = doc.get_doc_path(app_id)
if doc.is_dir:
vdir = os.path.dirname(dir)
info = os.lstat(vdir)
check_virtual_stat(info)
pass
else:
info = os.lstat(dir)
check_virtual_stat(info, doc.is_writable_by(app_id))
assert os.access(dir, os.R_OK)
if doc.is_writable_by(app_id):
assert os.access(dir, os.W_OK)
else:
assert not os.access(dir, os.W_OK)
assertRaises(FileNotFoundError, os.lstat, dir + "/not-existing-file")
assertDirFiles(dir, doc.files)
for file in doc.files:
filepath = dir + "/" + file
info = os.lstat(filepath)
assert info.st_uid == os.getuid()
assert info.st_gid == os.getgid()
assert os.access(filepath, os.R_OK)
if doc.is_writable_by(app_id):
assert os.access(filepath, os.W_OK)
else:
assert not os.access(filepath, os.W_OK)
if doc.filename:
main_path = dir + "/" + doc.filename
real_path = doc.real_path
if doc.content:
assertFileExist(main_path)
assertFileHasContent(main_path, doc.content)
assertFileHasContent(real_path, doc.content)
info = os.lstat(main_path)
real_info = os.lstat(real_path)
mode_mask = ~(stat.S_ISUID | stat.S_ISGID | stat.S_ISVTX)
if not doc.is_writable_by(app_id):
mode_mask = mode_mask & ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
assertSameStat(info, real_info, mode_mask)
else:
assertRaises(FileNotFoundError, os.lstat, main_path)
assertRaises(FileNotFoundError, os.open, main_path, os.O_RDONLY)
assertRaises(FileNotFoundError, os.lstat, doc.real_path)
assertRaises(FileNotFoundError, os.open, doc.real_path, os.O_RDONLY)
# Ensure no leftover temp files
for real_file in os.listdir(os.path.dirname(doc.real_path)):
assert not real_file.startswith(".xdp")
def verify_fs_layout(portal):
verify_virtual_dir(portal.mountpoint, ["by-app"] + list(portal.docs.keys()))
verify_virtual_dir(
portal.by_app_path(), portal.get_app_ids(), portal.get_volatile_app_ids()
)
for doc in portal.get_docs():
verify_doc(doc)
# Verify the by-app subdirs (just the directory for now)
for app_id in portal.get_app_ids():
docs_for_app = portal.get_docs_for_app(app_id)
verify_virtual_dir(portal.app_path(app_id), docs_for_app)
for doc_id in docs_for_app:
doc = portal.get_doc(doc_id)
verify_doc(doc, app_id)
def check_virtdir_perms(path):
assertRaises(PermissionError, os.mkdir, path + "/a_dir")
assertRaises(PermissionError, os.open, path + "/a-file", os.O_RDWR | os.O_CREAT)
def check_root_perms(path):
check_virtdir_perms(path)
assertRaises(PermissionError, os.rename, path + "/by-app", path + "/by-app2")
assertRaises(PermissionError, os.rmdir, path + "/by-app")
def check_byapp_perms(path):
check_virtdir_perms(path)
assertRaises(PermissionError, os.mkdir, path + "/a_dir")
def check_regular_doc_perms(doc, app_id):
path = doc.get_doc_path(app_id)
writable = doc.is_writable_by(app_id)
# regular documents, can't do most stuff
assertRaises(PermissionError, os.mkdir, path + "/dir")
assertRaises(PermissionError, os.symlink, "symlink-value", path + "/symlink")
docpath = path + "/" + doc.filename
tmppath = path + "/a-tmpfile"
tmppath2 = path + "/another-tmpfile"
if doc.content: # Main file exists
assertFileExist(docpath)
assertFileExist(doc.real_path)
assertRaises(PermissionError, os.link, docpath, path + "/a-hardlink")
assertRaises(NotADirectoryError, os.rmdir, docpath)
assertRaises(PermissionError, os.setxattr, docpath, "user.attr", b"foo")
assertRaises(PermissionError, os.removexattr, docpath, "user.attr")
fd = os.open(docpath, os.O_RDONLY, 0o600)
os.close(fd)
if not writable:
assertRaises(
PermissionError, os.open, docpath, os.O_RDONLY | os.O_TRUNC, 0o600
)
assertRaises(PermissionError, os.open, docpath, os.O_WRONLY, 0o600)
assertRaises(PermissionError, os.open, docpath, os.O_RDWR, 0o600)
assertRaises(PermissionError, os.rename, docpath, docpath + "renamed")
assertRaises(PermissionError, os.truncate, docpath, 1)
assertRaises(PermissionError, os.unlink, docpath)
assertRaises(PermissionError, os.utime, docpath)
else:
# Can't move file out of docdir or into other version of same docdir
assertRaisesErrno(
errno.EXDEV, os.rename, docpath, path + "/../" + doc.filename
)
if app_id:
assertRaisesErrno(
errno.EXDEV,
os.rename,
docpath,
doc.get_doc_path(None) + doc.filename,
)
if doc.apps and app_id != doc.apps[0]:
assertRaisesErrno(
errno.EXDEV,
os.rename,
docpath,
doc.get_doc_path(doc.apps[0]) + doc.filename,
)
# Ensure we can read it (multiple times)
fd = os.open(docpath, os.O_RDONLY, 0o600)
assertFdHasContent(fd, doc.content)
assertFdHasContent(fd, doc.content)
# Ensure we can rename it
os.rename(docpath, docpath + "_renamed")
assertRaises(FileNotFoundError, os.open, docpath, os.O_RDONLY, 0o600)
# ... and still read it
assertFdHasContent(fd, doc.content)
# Ensure we can delete it
os.unlink(docpath + "_renamed")
# ... and still read it
assertFdHasContent(fd, doc.content)
os.close(fd)
# Replace main file with rename of tmpfile
setFileContent(docpath, "orig-data")
fd1 = os.open(docpath, os.O_RDONLY, 0o600)
setFileContent(tmppath, "new-data")
fd2 = os.open(tmppath, os.O_RDONLY, 0o600)
os.rename(tmppath, tmppath2)
assertRaises(FileNotFoundError, os.lstat, tmppath)
assertFdHasContent(fd2, "new-data")
assertFileHasContent(tmppath2, "new-data")
os.rename(tmppath2, docpath)
assertRaises(FileNotFoundError, os.lstat, tmppath2)
assertFdHasContent(fd1, "orig-data")
assertFdHasContent(fd2, "new-data")
assertFileHasContent(docpath, "new-data")
appendFileContent(docpath, "-more")
assertFdHasContent(fd2, "new-data-more")
setFileContent(tmppath, "replace-this-data")
fd3 = os.open(tmppath, os.O_RDONLY, 0o600)
os.rename(docpath, tmppath)
assertFdHasContent(fd2, "new-data-more")
assertFdHasContent(fd3, "replace-this-data")
fd4 = os.open(tmppath, os.O_RDWR, 0o600)
assertFdHasContent(fd4, "new-data-more")
# Restore original version
os.rename(tmppath, docpath)
replaceFdContent(fd4, doc.content)
assertFdHasContent(fd2, doc.content)
assertFdHasContent(fd4, doc.content)
assertFileHasContent(docpath, doc.content)
assertFdHasContent(fd1, "orig-data")
assertFdHasContent(fd3, "replace-this-data")
os.close(fd1)
os.close(fd2)
os.close(fd3)
os.close(fd4)
assertRaises(NotADirectoryError, os.rmdir, docpath)
assertRaises(PermissionError, os.link, docpath, path + "/a-hardlink")
assertRaises(PermissionError, os.setxattr, docpath, "user.attr", b"foo")
assertRaises(PermissionError, os.removexattr, docpath, "user.attr")
else: # Main file doesn't exist
assertFileNotExist(docpath)
assertFileNotExist(doc.real_path)
if writable: # But we can create it
setFileContent(docpath, "some-data")
assertFileHasContent(docpath, "some-data")
os.unlink(docpath)
else: # And we can't create it
assertRaises(
PermissionError,
os.open,
docpath,
os.O_CREAT | os.O_RDONLY | os.O_TRUNC,
0o600,
)
assertRaises(
PermissionError, os.open, docpath, os.O_CREAT | os.O_WRONLY, 0o600
)
assertRaises(
PermissionError, os.open, docpath, os.O_CREAT | os.O_RDWR, 0o600
)
# Ensure it show up if created outside
setFileContent(doc.real_path, "from-outside")
assertFileExist(docpath)
assertFileHasContent(docpath, "from-outside")
if writable:
os.unlink(docpath)
else:
assertRaises(PermissionError, os.unlink, docpath)
os.unlink(doc.real_path)
assertFileNotExist(docpath)
if writable: # We can create tempfiles, do some simple checks
setFileContent(tmppath, "tempdata")
assertFileHasContent(tmppath, "tempdata")
assertRaises(NotADirectoryError, os.rmdir, tmppath)
assertRaises(PermissionError, os.link, tmppath, path + "/a-hardlink")
assertRaises(PermissionError, os.setxattr, tmppath, "user.attr", b"foo")
assertRaises(PermissionError, os.removexattr, tmppath, "user.attr")
os.rename(tmppath, tmppath2)
assertFileHasContent(tmppath2, "tempdata")
os.unlink(tmppath2)
else:
# We should be unable to create tempfiles
assertRaises(
PermissionError,
os.open,
tmppath,
os.O_CREAT | os.O_RDONLY | os.O_TRUNC,
0o600,
)
assertRaises(PermissionError, os.open, tmppath, os.O_CREAT | os.O_WRONLY, 0o600)
assertRaises(PermissionError, os.open, tmppath, os.O_CREAT | os.O_RDWR, 0o600)
def check_directory_doc_perms(doc, app_id):
writable = doc.is_writable_by(app_id)
docpath = doc.get_doc_path(app_id)
realpath = doc.real_path
# We should not be able to do anything with the toplevel document dir (other than reading the real dir)
vpath = os.path.dirname(docpath)
assertDirExist(vpath)
assertDirFiles(vpath, [doc.dirname])
assertRaises(PermissionError, os.mkdir, vpath + "/a_dir")
assertRaises(PermissionError, os.rename, docpath, vpath + "/foo")
assertRaises(PermissionError, os.rmdir, docpath)
assertRaises(
PermissionError, os.open, vpath + "/a_file", os.O_CREAT | os.O_RDWR, 0o600
)
assertDirExist(docpath)
# Create some pre-existing files:
real_dir = realpath + "/dir"
os.mkdir(real_dir)
setFileContent(real_dir + "/realfile", "real1")
setFileContent(real_dir + "/readonly", "readonly")
os.chmod(real_dir + "/readonly", 0o500)
os.mkdir(real_dir + "/subdir")
os.link(real_dir + "/realfile", real_dir + "/subdir/hardlink")
os.symlink("realfile", real_dir + "/symlink")
os.symlink("the-void", real_dir + "/broken-symlink")
# Ensure they are visible via portal
dir = docpath + "/dir"
assertDirFiles(docpath, ["dir"])
assertDirExist(dir)
assertDirFiles(dir, ["realfile", "readonly", "subdir", "symlink", "broken-symlink"])
assertDirExist(dir + "/subdir")
assertDirFiles(dir + "/subdir", ["hardlink"])
assertFileHasContent(dir + "/realfile", "real1")
assertFileHasContent(dir + "/readonly", "readonly")
assertFileHasContent(dir + "/subdir/hardlink", "real1")
assert (
os.lstat(dir + "/realfile").st_ino == os.lstat(dir + "/subdir/hardlink").st_ino
)
assertSymlink(dir + "/symlink", "realfile")
assertSymlink(dir + "/broken-symlink", "the-void")
filepath = docpath + "/a-file"
real_filepath = doc.real_path + "/a-file"
filepath2 = docpath + "/dir/a-file2"
real_filepath2 = doc.real_path + "/dir/a-file2"
if writable: # We can create files
if os.environ.get("TEST_IN_ROOTED_CI"):
assertRaises(PermissionError, os.open, dir + "/readonly", os.O_RDWR)
os.chmod(dir + "/readonly", 0o700)
fd = os.open(dir + "/readonly", os.O_RDWR) # Works now
os.close(fd)
setFileContent(filepath, "filedata")
assertFileHasContent(filepath, "filedata")
assertFileHasContent(real_filepath, "filedata")
fd = os.open(filepath, os.O_RDONLY)
fd2 = os.open(filepath, os.O_RDWR)
assertFdHasContent(fd, "filedata")
assertFdHasContent(fd2, "filedata")
appendFdContent(fd2, "-more")
assertFdHasContent(fd, "filedata-more")
assertFdHasContent(fd2, "filedata-more")
os.link(filepath, filepath2)
assert os.lstat(filepath).st_ino == os.lstat(filepath2).st_ino
assert os.lstat(filepath).st_ino == os.fstat(fd).st_ino
assertFileHasContent(filepath2, "filedata-more")
assertFileHasContent(real_filepath2, "filedata-more")
os.unlink(filepath)
assertFileNotExist(filepath)
assertFileNotExist(real_filepath)
assertFdHasContent(fd, "filedata-more")
assertFdHasContent(fd2, "filedata-more")
replaceFdContent(fd2, "replaced")
assertFileHasContent(filepath2, "replaced")
assertFileHasContent(real_filepath2, "replaced")
assertFileNotExist(filepath)
assertFileNotExist(real_filepath)
assertFdHasContent(fd, "replaced")
assertFdHasContent(fd2, "replaced")
# Move between dirs
os.rename(filepath2, docpath + "/moved")
assertFileHasContent(docpath + "/moved", "replaced")
assertRaisesErrno(errno.EXDEV, os.rename, docpath, doc.portal.mountpoint)
os.unlink(docpath + "/moved")
os.close(fd)
os.close(fd2)
os.symlink("realfile", dir + "/symlink2")
os.symlink("the-void", dir + "/broken-symlink2")
assertSymlink(dir + "/symlink2", "realfile")
assertSymlink(dir + "/broken-symlink2", "the-void")
os.unlink(dir + "/symlink2")
os.unlink(dir + "/broken-symlink2")
else:
# We should be unable to create files
assertRaises(
PermissionError,
os.open,
filepath,
os.O_CREAT | os.O_RDONLY | os.O_TRUNC,
0o600,
)
assertRaises(
PermissionError, os.open, filepath, os.O_CREAT | os.O_WRONLY, 0o600
)
assertRaises(PermissionError, os.open, filepath, os.O_CREAT | os.O_RDWR, 0o600)
assertRaises(PermissionError, os.open, dir + "/realfile", os.O_RDWR)
assertRaises(PermissionError, os.open, dir + "/readonly", os.O_RDWR)
assertRaises(PermissionError, os.truncate, dir + "/realfile", 0)
assertRaises(PermissionError, os.link, dir + "/realfile", dir + "/foo")
assertRaises(PermissionError, os.symlink, "foo", dir + "/new-symlink")
assertRaises(PermissionError, os.rename, dir + "/realfile", dir + "/foo")
assertRaises(PermissionError, os.unlink, dir + "/realfile")
assertRaises(PermissionError, os.chmod, dir + "/realfile", 0o700)
assertRaises(PermissionError, os.rmdir, dir + "/subdir")
os.unlink(real_dir + "/realfile")
os.unlink(real_dir + "/readonly")
os.unlink(real_dir + "/subdir/hardlink")
os.unlink(real_dir + "/symlink")
os.unlink(real_dir + "/broken-symlink")
os.rmdir(real_dir + "/subdir")
os.rmdir(real_dir)
def check_doc_perms(doc, app_id):
path = doc.get_doc_path(app_id)
readable = doc.is_readable_by(app_id)
if not readable:
assertRaises(FileNotFoundError, os.lstat, path)
if doc.is_dir: # Non readable dir means we can't even see the toplevel dir
assertRaises(FileNotFoundError, os.mkdir, path)
else:
assertRaises(PermissionError, os.mkdir, path)
return
assertRaises(PermissionError, os.rmdir, path)
assertRaises(PermissionError, os.rename, path, path + "_renamed")
assertRaises(IsADirectoryError, os.unlink, path)
if doc.is_dir:
check_directory_doc_perms(doc, app_id)
else:
check_regular_doc_perms(doc, app_id)
def check_perms(portal):
check_root_perms(portal.mountpoint)
check_byapp_perms(portal.by_app_path())
for doc in portal.get_docs_randomized():
check_doc_perms(doc, None)
for app_id in portal.get_app_ids_randomized():
check_doc_perms(doc, app_id)
# Ensure that a single lookup by app-id creates that app id (we need this for when mounting the subdir for an app)
def create_app_by_lookup(portal):
# Should only work for valid app ids
assertRaises(FileNotFoundError, os.lstat, portal.app_path("not-an-app-id"))
app_id = app_prefix + "Lookup"
info = os.lstat(portal.app_path(app_id))
check_virtual_stat(info)
portal.ensure_app_id(app_id, volatile=True)
def ensure_real_dir(create_hidden_file=True):
count = get_a_count("doc")
dir = os.environ["TMPDIR"] + "/" + dir_prefix + str(count)
os.makedirs(dir)
if create_hidden_file:
setFileContent(dir + "/cant-see-this-file", "s3krit")
return (dir, count)
def ensure_real_dir_file(create_file):
(dir, count) = ensure_real_dir()
path = dir + "/the-file"
if create_file:
setFileContent(path, "data" + str(count))
return path
def export_a_doc(portal):
path = ensure_real_dir_file(True)
doc = portal.add(path)
logv("exported %s as %s" % (path, doc))
lookup = portal.lookup(path)
assert lookup == doc.id
lookup_on_fuse = portal.lookup(doc.get_doc_path(None) + "/" + doc.filename)
assert lookup_on_fuse == doc.id
reused_doc = portal.add(path)
assert doc is reused_doc
not_reused_doc = portal.add(path, False)
assert doc is not not_reused_doc
# We should not be able to re-export a tmpfile
tmppath = doc.get_doc_path(None) + "/tmpfile"
setFileContent(tmppath, "tempdata")
# Should not be able to add a tempfile on the fuse mount, or look it up
assertRaises(GLib.Error, portal.add, tmppath)
lookup = portal.lookup(tmppath)
assert lookup == ""
os.unlink(tmppath)
def export_a_named_doc(portal, create_file):
path = ensure_real_dir_file(create_file)
doc = portal.add_named(path)
logv("exported (named) %s as %s" % (path, doc))
if create_file:
lookup = portal.lookup(path)
assert lookup == doc.id
reused_doc = portal.add_named(path)
assert doc is reused_doc
not_reused_doc = portal.add_named(path, False)
assert doc is not not_reused_doc
def export_a_dir_doc(portal):
(dir, count) = ensure_real_dir(False)
doc = portal.add_dir(dir)
logv("exported (dir) %s as %s" % (dir, doc))
lookup = portal.lookup(dir)
assert lookup == doc.id
lookup_on_fuse = portal.lookup(doc.get_doc_path(None))
assert lookup_on_fuse == doc.id
# We should not be able to portal lookup a file in the dir doc
subpath = doc.get_doc_path(None) + "/sub"
setFileContent(subpath, "sub")
doc = portal.lookup(subpath)
assert doc == ""
doc2 = portal.lookup(dir + "/sub")
assert doc2 == ""
# But we should be able to re-export the file
reexported_doc = portal.add(subpath)
reexported_docdir = reexported_doc.get_doc_path(None)
assertFileHasContent(reexported_docdir + "/sub", "sub")
portal.delete(reexported_doc.id)
os.unlink(subpath)
# And also re-export a directory
os.mkdir(subpath)
setFileContent(subpath + "/subfile", "subfile")
reexported_doc = portal.add_dir(subpath)
reexported_docdir = reexported_doc.get_doc_path(None)
assertFileHasContent(reexported_docdir + "/subfile", "subfile")
portal.delete(reexported_doc.id)
os.unlink(subpath + "/subfile")
os.rmdir(subpath)
def add_an_app(portal, num_docs):
if num_docs == 0:
return
count = get_a_count("app")
read_app = app_prefix + "read.App" + str(count)
write_app = app_prefix + "write.App" + str(count)
portal.ensure_app_id(read_app)
portal.ensure_app_id(write_app)
docs = portal.get_docs()
ids = []
for i in range(num_docs):
if len(docs) == 0:
continue
indx = random.randint(0, len(docs) - 1)
doc = docs[indx]
del docs[indx]
ids.append(doc.id)
portal.grant_permissions(doc.id, read_app, ["read"])
doc.apps.append(read_app)
portal.grant_permissions(doc.id, write_app, ["read", "write"])
doc.apps.append(write_app)
logv("granted acces to %s and %s for %s" % (read_app, write_app, ids))
def file_transfer_portal_test():
log("File transfer tests")
ft_portal = FileTransferPortal()
key = ft_portal.start_transfer()
file1 = ensure_real_dir_file(True)
file2 = ensure_real_dir_file(True)
res = ft_portal.add_files(key, [file1, file2])
res = ft_portal.retrieve_files(key)
files = res[0]
assert len(files) == 2
# This is the same app, it's not sandboxed
assert files[0] == file1
assert files[1] == file2
log("filetransfer tests ok")
log("filetransfer dir")
key = ft_portal.start_transfer()
dir1 = ensure_real_dir(True)
ft_portal.add_files(key, [file1, dir1[0], file2])
res = ft_portal.retrieve_files(key)
assert len(res[0]) == 3
assert res[0][0] == file1
assert res[0][1] == dir1[0]
assert res[0][2] == file2
log("filetransfer dir ok")
log("filetransfer key")
# Test that an invalid key is rejected
key = ft_portal.start_transfer()
assert key != "1234"
assertRaisesGError(
"GDBus.Error:org.freedesktop.DBus.Error.AccessDenied",
9,
ft_portal.add_files,
"1234",
[file1, file2],
)
# Test stop transfer
key = ft_portal.start_transfer()
ft_portal.add_files(key, [file1, file2])
ft_portal.stop_transfer(key)
assertRaisesGError(
"GDBus.Error:org.freedesktop.DBus.Error.AccessDenied",
9,
ft_portal.retrieve_files,
key,
)
assertRaisesGError(
"GDBus.Error:org.freedesktop.DBus.Error.AccessDenied",
9,
ft_portal.add_files,
key,
[file1, file2],
)
# Test that we can't reuse an old key
new_key = ft_portal.start_transfer()
assertRaisesGError(
"GDBus.Error:org.freedesktop.DBus.Error.AccessDenied",
9,
ft_portal.add_files,
key,
[file1, file2],
)
res = ft_portal.add_files(new_key, [file1, file2])
log("filetransfer key ok")
log("File transfer tests ok")
def run_test(iterations, prefix=None, do_ensure_no_remaining=True):
global app_prefix
global dir_prefix
global ensure_no_remaining
if prefix:
app_prefix = app_prefix + prefix + "."
dir_prefix = dir_prefix + "-" + prefix + "-"
ensure_no_remaining = do_ensure_no_remaining
log("Connecting to portal")
doc_portal = DocPortal()
log("Running fuse tests...")
create_app_by_lookup(doc_portal)
verify_fs_layout(doc_portal)
log("Creating some docs")
for i in range(10):
export_a_doc(doc_portal)
verify_fs_layout(doc_portal)
log("Creating some named docs (existing)")
for i in range(10):
export_a_named_doc(doc_portal, True)
verify_fs_layout(doc_portal)
log("Creating some named docs (non-existing)")
for i in range(10):
export_a_named_doc(doc_portal, False)
verify_fs_layout(doc_portal)
log("Creating some dir docs")
for i in range(10):
export_a_dir_doc(doc_portal)
verify_fs_layout(doc_portal)
log("Creating some apps")
for i in range(10):
add_an_app(doc_portal, 6)
verify_fs_layout(doc_portal)
for i in range(iterations):
log("Checking permissions, pass %d" % (i + 1))
check_perms(doc_portal)
verify_fs_layout(doc_portal)
log("fuse tests ok")
file_transfer_portal_test()
class Process(mp.Process):
def __init__(self, *args, **kwargs):
mp.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = mp.Pipe()
self._exception = None
def run(self):
try:
mp.Process.run(self)
self._cconn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception
class TestDocumentFuse:
def parallel(self, test_function, parallel_tests, parallel_iterations):
procs = []
for i in range(parallel_tests):
p = Process(
target=test_function, args=(parallel_iterations, f"c{i}", False)
)
p.start()
procs.append(p)
for p in procs:
p.join()
if p.exception:
error, traceback = p.exception
raise error
def test_single_thread(self, portals, xdg_document_portal, dbus_con):
run_test(3)
def test_multi_thread(self, portals, xdg_document_portal, dbus_con):
if xdp.run_long_tests():
return self.parallel(run_test, 20, 10)
if xdp.is_in_ci():
return self.parallel(run_test, 5, 3)
self.parallel(run_test, 10, 5)
# Running
# ./tests/run-test.sh -n 0 tests/test_document_fuse.py::TestDocumentFuse::test_multi_thread
# works fine, but with
# ./tests/run-test.sh -n 0 tests/test_document_fuse.py::TestDocumentFuse
# the `test_multi_thread` test is failing.
# For now, let's skip the test and turn it on again when we have fixed it.
pytest.skip("Test has a race condition which can make it fail", allow_module_level=True)
try:
xdp.ensure_fuse_supported()
except xdp.FuseNotSupportedException as e:
pytest.skip(f"No fuse support: {e}", allow_module_level=True)