Some checks failed
Docker. / Ubuntu (push) Has been cancelled
User-agent updater. / User-agent (push) Failing after 15s
Lock Threads / lock (push) Failing after 10s
Waiting for answer. / waiting-for-answer (push) Failing after 22s
Needs user action. / needs-user-action (push) Failing after 8s
Can't reproduce. / cant-reproduce (push) Failing after 8s
Close stale issues and PRs / stale (push) Has been cancelled
1260 lines
39 KiB
Python
1260 lines
39 KiB
Python
# SPDX-License-Identifier: LGPL-2.1-or-later
|
|
#
|
|
# This file is formatted with Python Black
|
|
|
|
import tests as xdp
|
|
|
|
import pytest
|
|
import errno
|
|
import os
|
|
import random
|
|
import stat
|
|
import sys
|
|
import multiprocessing as mp
|
|
import traceback
|
|
from collections import defaultdict
|
|
from gi.repository import Gio, GLib
|
|
|
|
|
|
@pytest.fixture
|
|
def app_id():
|
|
return None
|
|
|
|
|
|
def filename_to_ay(filename):
|
|
return list(filename.encode("utf-8")) + [0]
|
|
|
|
|
|
app_prefix = "org.test."
|
|
dir_prefix = "dir"
|
|
ensure_no_remaining = True
|
|
|
|
running_count: defaultdict[str, int] = defaultdict(int)
|
|
|
|
|
|
def log(str):
|
|
print(str, file=sys.stderr)
|
|
|
|
|
|
def logv(str):
|
|
log(str)
|
|
|
|
|
|
def get_a_count(counter: str):
|
|
global running_count
|
|
running_count[counter] += 1
|
|
return running_count[counter]
|
|
|
|
|
|
def setFileContent(path, content):
|
|
with open(path, "w") as f:
|
|
f.write(content)
|
|
|
|
|
|
def appendFileContent(path, content):
|
|
with open(path, "a") as f:
|
|
f.write(content)
|
|
|
|
|
|
def readFdContent(fd):
|
|
os.lseek(fd, 0, os.SEEK_SET)
|
|
return str(os.read(fd, 64 * 1024), "utf-8")
|
|
|
|
|
|
def replaceFdContent(fd, content):
|
|
os.lseek(fd, 0, os.SEEK_SET)
|
|
os.ftruncate(fd, 0)
|
|
os.write(fd, bytes(content, "utf-8"))
|
|
|
|
|
|
def appendFdContent(fd, content):
|
|
os.lseek(fd, 0, os.SEEK_END)
|
|
os.write(fd, bytes(content, "utf-8"))
|
|
|
|
|
|
DOCUMENT_ADD_FLAGS_REUSE_EXISTING = 1 << 0
|
|
DOCUMENT_ADD_FLAGS_PERSISTENT = 1 << 1
|
|
DOCUMENT_ADD_FLAGS_AS_NEEDED_BY_APP = 1 << 2
|
|
DOCUMENT_ADD_FLAGS_DIRECTORY = 1 << 3
|
|
|
|
|
|
def assertRaises(exc_type, func, *args, **kwargs):
|
|
with pytest.raises(exc_type):
|
|
func(*args, **kwargs)
|
|
|
|
|
|
def assertRaisesErrno(error_nr, func, *args, **kwargs):
|
|
with pytest.raises(OSError) as excinfo:
|
|
func(*args, **kwargs)
|
|
|
|
if excinfo.value.errno != error_nr:
|
|
raise AssertionError(
|
|
"Wrong errno {0} was raised instead of {1}".format(
|
|
excinfo.value.errno, error_nr
|
|
)
|
|
)
|
|
|
|
|
|
def assertRaisesGError(message, code, func, *args, **kwargs):
|
|
with pytest.raises(GLib.GError) as excinfo:
|
|
func(*args, **kwargs)
|
|
|
|
if not excinfo.value.message.startswith(message):
|
|
raise AssertionError(
|
|
"Wrong message {0} doesn't start with {1}".format(
|
|
excinfo.value.message, message
|
|
)
|
|
)
|
|
if excinfo.value.code != code:
|
|
raise AssertionError(
|
|
"Wrong code {0} was raised instead of {1}".format(excinfo.value.code, code)
|
|
)
|
|
|
|
|
|
def assertFileHasContent(path, expected_content):
|
|
with open(path) as f:
|
|
file_content = f.read()
|
|
assert file_content == expected_content
|
|
|
|
|
|
def assertFdHasContent(fd, expected_content):
|
|
content = readFdContent(fd)
|
|
assert content == expected_content
|
|
|
|
|
|
def assertSameStat(a, b, b_mode_mask):
|
|
if not (
|
|
a.st_mode == (b.st_mode & b_mode_mask)
|
|
and a.st_nlink == b.st_nlink
|
|
and a.st_size == b.st_size
|
|
and a.st_uid == b.st_uid
|
|
and a.st_gid == b.st_gid
|
|
and a.st_atime == b.st_atime
|
|
and a.st_mtime == b.st_mtime
|
|
and a.st_ctime == b.st_ctime
|
|
):
|
|
raise AssertionError("Stat value {} was not the expected {})".format(a, b))
|
|
|
|
|
|
def assertFileExist(path):
|
|
try:
|
|
info = os.lstat(path)
|
|
if info.st_mode & stat.S_IFREG != stat.S_IFREG:
|
|
raise AssertionError("File {} is not a regular file".format(path))
|
|
except Exception:
|
|
raise AssertionError("File {} doesn't exist".format(path))
|
|
|
|
|
|
def assertDirExist(path):
|
|
try:
|
|
info = os.lstat(path)
|
|
if info.st_mode & stat.S_IFDIR != stat.S_IFDIR:
|
|
raise AssertionError("File {} is not a directory file".format(path))
|
|
except Exception:
|
|
raise AssertionError("File {} doesn't exist".format(path))
|
|
|
|
|
|
def assertSymlink(path, expected_target):
|
|
try:
|
|
info = os.lstat(path)
|
|
if info.st_mode & stat.S_IFLNK != stat.S_IFLNK:
|
|
raise AssertionError("File {} is not a symlink".format(path))
|
|
target = os.readlink(path)
|
|
if target != expected_target:
|
|
raise AssertionError(
|
|
"File {} has wrong target {}, expected {}".format(
|
|
path, target, expected_target
|
|
)
|
|
)
|
|
except Exception:
|
|
raise AssertionError("Symlink {} doesn't exist".format(path))
|
|
|
|
|
|
def assertFileNotExist(path):
|
|
try:
|
|
os.lstat(path)
|
|
except FileNotFoundError:
|
|
return
|
|
except Exception:
|
|
raise AssertionError(
|
|
"Got wrong execption {} for {}, expected FileNotFoundError".format(
|
|
sys.exc_info()[0], path
|
|
)
|
|
)
|
|
raise AssertionError("Path {} unexpectedly exists".format(path))
|
|
|
|
|
|
def assertDirFiles(path, expected_files, exhaustive=True, volatile_files=None):
|
|
found_files = os.listdir(path)
|
|
remaining = set(found_files)
|
|
for file in expected_files:
|
|
if file in remaining:
|
|
remaining.remove(file)
|
|
elif file not in volatile_files:
|
|
raise AssertionError(
|
|
"Expected file {} not found in dir {} (all: {})".format(
|
|
file, path, found_files
|
|
)
|
|
)
|
|
if exhaustive:
|
|
if len(remaining) != 0:
|
|
raise AssertionError(
|
|
"Unexpected files {} in dir {} (all: {})".format(
|
|
remaining, path, found_files
|
|
)
|
|
)
|
|
|
|
|
|
class Doc:
|
|
def __init__(self, portal, id, path, content, is_dir=False):
|
|
self.portal = portal
|
|
self.id = id
|
|
self.content = content
|
|
self.real_path = path
|
|
self.is_dir = is_dir
|
|
self.apps = []
|
|
self.files = []
|
|
|
|
if is_dir:
|
|
self.real_dirname = path
|
|
self.filename = None
|
|
self.dirname = os.path.basename(path)
|
|
else:
|
|
(self.real_dirname, self.filename) = os.path.split(path)
|
|
self.dirname = None
|
|
if content:
|
|
self.files.append(self.filename)
|
|
|
|
def is_readable_by(self, app_id):
|
|
if app_id:
|
|
return app_id in self.apps
|
|
return True
|
|
|
|
def is_writable_by(self, app_id):
|
|
if app_id:
|
|
return app_id in self.apps and ".write." in app_id
|
|
else:
|
|
return True
|
|
|
|
def get_doc_path(self, app_id):
|
|
if app_id:
|
|
base = self.portal.app_path(app_id) + "/" + self.id
|
|
else:
|
|
base = self.portal.mountpoint + "/" + self.id
|
|
if self.is_dir:
|
|
return base + "/" + self.dirname
|
|
else:
|
|
return base
|
|
|
|
def __str__(self):
|
|
name = self.id
|
|
if self.is_dir:
|
|
return "%s(dir)" % (name)
|
|
elif self.content is None:
|
|
return "%s(missing)" % (name)
|
|
else:
|
|
return "%s" % (name)
|
|
|
|
|
|
class DocPortal:
|
|
def __init__(self):
|
|
self.apps = []
|
|
self.volatile_apps = set()
|
|
self.docs = {}
|
|
self.bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
|
|
self.proxy = Gio.DBusProxy.new_sync(
|
|
self.bus,
|
|
Gio.DBusProxyFlags.NONE,
|
|
None,
|
|
"org.freedesktop.portal.Documents",
|
|
"/org/freedesktop/portal/documents",
|
|
"org.freedesktop.portal.Documents",
|
|
None,
|
|
)
|
|
self.mountpoint = self.get_mount_path()
|
|
|
|
def get_mount_path(self):
|
|
res = self.proxy.call_sync("GetMountPoint", GLib.Variant("()", ()), 0, -1, None)
|
|
return bytearray(res[0][:-1]).decode("utf-8")
|
|
|
|
def grant_permissions(self, doc_id, app_id, permissions):
|
|
self.proxy.call_sync(
|
|
"GrantPermissions",
|
|
GLib.Variant("(ssas)", (doc_id, app_id, permissions)),
|
|
0,
|
|
-1,
|
|
None,
|
|
)
|
|
|
|
def lookup(self, path):
|
|
res = self.proxy.call_sync(
|
|
"Lookup", GLib.Variant("(ay)", (filename_to_ay(path),)), 0, -1, None
|
|
)
|
|
return res[0]
|
|
|
|
def delete(self, doc_id):
|
|
self.proxy.call_sync("Delete", GLib.Variant("(s)", (doc_id,)), 0, -1, None)
|
|
del self.docs[doc_id]
|
|
|
|
def add(self, path, reuse_existing=True):
|
|
fdlist = Gio.UnixFDList.new()
|
|
fd = os.open(path, os.O_PATH)
|
|
handle = fdlist.append(fd)
|
|
os.close(fd)
|
|
res = self.proxy.call_with_unix_fd_list_sync(
|
|
"Add",
|
|
GLib.Variant("(hbb)", (handle, reuse_existing, False)),
|
|
0,
|
|
-1,
|
|
fdlist,
|
|
None,
|
|
)
|
|
doc_id = res[0][0]
|
|
if doc_id in self.docs:
|
|
return self.docs[doc_id]
|
|
|
|
with open(path) as f:
|
|
content = f.read()
|
|
doc = Doc(self, doc_id, path, content)
|
|
self.docs[doc.id] = doc
|
|
return doc
|
|
|
|
def add_named(self, path, reuse_existing=True):
|
|
(dirname, filename) = os.path.split(path)
|
|
fdlist = Gio.UnixFDList.new()
|
|
fd = os.open(dirname, os.O_PATH)
|
|
handle = fdlist.append(fd)
|
|
os.close(fd)
|
|
res = self.proxy.call_with_unix_fd_list_sync(
|
|
"AddNamed",
|
|
GLib.Variant(
|
|
"(haybb)", (handle, filename_to_ay(filename), reuse_existing, False)
|
|
),
|
|
0,
|
|
-1,
|
|
fdlist,
|
|
None,
|
|
)
|
|
doc_id = res[0][0]
|
|
if doc_id in self.docs:
|
|
return self.docs[doc_id]
|
|
|
|
try:
|
|
with open(path) as f:
|
|
content = f.read()
|
|
except Exception:
|
|
content = None
|
|
doc = Doc(self, doc_id, path, content)
|
|
self.docs[doc.id] = doc
|
|
return doc
|
|
|
|
def add_full(self, path, flags):
|
|
fdlist = Gio.UnixFDList.new()
|
|
fd = os.open(path, os.O_PATH)
|
|
handle = fdlist.append(fd)
|
|
os.close(fd)
|
|
res = self.proxy.call_with_unix_fd_list_sync(
|
|
"AddFull",
|
|
GLib.Variant("(ahusas)", ([handle], flags, "", [])),
|
|
0,
|
|
-1,
|
|
fdlist,
|
|
None,
|
|
)
|
|
doc_id = res[0][0][0]
|
|
if doc_id in self.docs:
|
|
return self.docs[doc_id]
|
|
doc = Doc(self, doc_id, path, True, (flags & DOCUMENT_ADD_FLAGS_DIRECTORY) != 0)
|
|
self.docs[doc.id] = doc
|
|
return doc
|
|
|
|
def add_dir(self, path):
|
|
return self.add_full(
|
|
path, DOCUMENT_ADD_FLAGS_REUSE_EXISTING | DOCUMENT_ADD_FLAGS_DIRECTORY
|
|
)
|
|
|
|
def get_docs_for_app(self, app_id):
|
|
docs = []
|
|
for doc in self.docs.values():
|
|
if doc.is_readable_by(app_id):
|
|
docs.append(doc.id)
|
|
return docs
|
|
|
|
def ensure_app_id(self, app_id, volatile=False):
|
|
if app_id not in self.apps:
|
|
self.apps.append(app_id)
|
|
if volatile:
|
|
self.volatile_apps.add(app_id)
|
|
|
|
def get_docs(self):
|
|
return list(self.docs.values())
|
|
|
|
def get_docs_randomized(self):
|
|
docs = list(self.docs.values())
|
|
random.shuffle(docs)
|
|
return docs
|
|
|
|
def get_doc(self, doc_id):
|
|
return self.docs[doc_id]
|
|
|
|
def get_app_ids(self):
|
|
return self.apps
|
|
|
|
def get_volatile_app_ids(self):
|
|
return self.volatile_apps
|
|
|
|
def get_app_ids_randomized(self):
|
|
apps = self.apps.copy()
|
|
random.shuffle(apps)
|
|
return apps
|
|
|
|
def by_app_path(self):
|
|
return self.mountpoint + "/by-app"
|
|
|
|
def app_path(self, app_id):
|
|
return self.mountpoint + "/by-app/" + app_id
|
|
|
|
|
|
class FileTransferPortal(DocPortal):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.ft_proxy = Gio.DBusProxy.new_sync(
|
|
self.bus,
|
|
Gio.DBusProxyFlags.NONE,
|
|
None,
|
|
"org.freedesktop.portal.Documents",
|
|
"/org/freedesktop/portal/documents",
|
|
"org.freedesktop.portal.FileTransfer",
|
|
None,
|
|
)
|
|
|
|
def start_transfer(self):
|
|
res = self.ft_proxy.call_sync(
|
|
"StartTransfer", GLib.Variant("(a{sv})", ([None])), 0, -1, None
|
|
)
|
|
return res[0]
|
|
|
|
def add_files(self, key, files):
|
|
fdlist = Gio.UnixFDList.new()
|
|
handles = []
|
|
for filename in files:
|
|
fd = os.open(filename, os.O_PATH)
|
|
handle = fdlist.append(fd)
|
|
handles.append(handle)
|
|
os.close(fd)
|
|
|
|
res = self.ft_proxy.call_with_unix_fd_list_sync(
|
|
"AddFiles",
|
|
GLib.Variant("(saha{sv})", (key, handles, [])),
|
|
0,
|
|
-1,
|
|
fdlist,
|
|
None,
|
|
)
|
|
return res
|
|
|
|
def retrieve_files(self, key):
|
|
res = self.ft_proxy.call_sync(
|
|
"RetrieveFiles",
|
|
GLib.Variant("(sa{sv})", (key, [])),
|
|
0,
|
|
-1,
|
|
None,
|
|
)
|
|
return res
|
|
|
|
def stop_transfer(self, key):
|
|
res = self.ft_proxy.call_sync(
|
|
"StopTransfer",
|
|
GLib.Variant("(s)", (key,)),
|
|
0,
|
|
-1,
|
|
None,
|
|
)
|
|
return res
|
|
|
|
|
|
def check_virtual_stat(info, writable=False):
|
|
assert info.st_uid == os.getuid()
|
|
assert info.st_gid == os.getgid()
|
|
if writable:
|
|
assert info.st_mode == stat.S_IFDIR | 0o700
|
|
else:
|
|
assert info.st_mode == stat.S_IFDIR | 0o500
|
|
|
|
|
|
def verify_virtual_dir(path, files, volatile_files=None):
|
|
info = os.lstat(path)
|
|
check_virtual_stat(info)
|
|
assert os.access(path, os.R_OK)
|
|
assert not os.access(path, os.W_OK)
|
|
|
|
assertRaises(FileNotFoundError, os.lstat, path + "/not-existing-file")
|
|
|
|
if files is not None:
|
|
assertDirFiles(path, files, ensure_no_remaining, volatile_files)
|
|
|
|
|
|
def verify_doc(doc, app_id=None):
|
|
dir = doc.get_doc_path(app_id)
|
|
|
|
if doc.is_dir:
|
|
vdir = os.path.dirname(dir)
|
|
info = os.lstat(vdir)
|
|
check_virtual_stat(info)
|
|
pass
|
|
else:
|
|
info = os.lstat(dir)
|
|
check_virtual_stat(info, doc.is_writable_by(app_id))
|
|
assert os.access(dir, os.R_OK)
|
|
if doc.is_writable_by(app_id):
|
|
assert os.access(dir, os.W_OK)
|
|
else:
|
|
assert not os.access(dir, os.W_OK)
|
|
|
|
assertRaises(FileNotFoundError, os.lstat, dir + "/not-existing-file")
|
|
|
|
assertDirFiles(dir, doc.files)
|
|
|
|
for file in doc.files:
|
|
filepath = dir + "/" + file
|
|
info = os.lstat(filepath)
|
|
assert info.st_uid == os.getuid()
|
|
assert info.st_gid == os.getgid()
|
|
|
|
assert os.access(filepath, os.R_OK)
|
|
if doc.is_writable_by(app_id):
|
|
assert os.access(filepath, os.W_OK)
|
|
else:
|
|
assert not os.access(filepath, os.W_OK)
|
|
|
|
if doc.filename:
|
|
main_path = dir + "/" + doc.filename
|
|
real_path = doc.real_path
|
|
if doc.content:
|
|
assertFileExist(main_path)
|
|
assertFileHasContent(main_path, doc.content)
|
|
assertFileHasContent(real_path, doc.content)
|
|
|
|
info = os.lstat(main_path)
|
|
real_info = os.lstat(real_path)
|
|
mode_mask = ~(stat.S_ISUID | stat.S_ISGID | stat.S_ISVTX)
|
|
if not doc.is_writable_by(app_id):
|
|
mode_mask = mode_mask & ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
|
|
assertSameStat(info, real_info, mode_mask)
|
|
|
|
else:
|
|
assertRaises(FileNotFoundError, os.lstat, main_path)
|
|
assertRaises(FileNotFoundError, os.open, main_path, os.O_RDONLY)
|
|
assertRaises(FileNotFoundError, os.lstat, doc.real_path)
|
|
assertRaises(FileNotFoundError, os.open, doc.real_path, os.O_RDONLY)
|
|
|
|
# Ensure no leftover temp files
|
|
for real_file in os.listdir(os.path.dirname(doc.real_path)):
|
|
assert not real_file.startswith(".xdp")
|
|
|
|
|
|
def verify_fs_layout(portal):
|
|
verify_virtual_dir(portal.mountpoint, ["by-app"] + list(portal.docs.keys()))
|
|
verify_virtual_dir(
|
|
portal.by_app_path(), portal.get_app_ids(), portal.get_volatile_app_ids()
|
|
)
|
|
|
|
for doc in portal.get_docs():
|
|
verify_doc(doc)
|
|
|
|
# Verify the by-app subdirs (just the directory for now)
|
|
for app_id in portal.get_app_ids():
|
|
docs_for_app = portal.get_docs_for_app(app_id)
|
|
verify_virtual_dir(portal.app_path(app_id), docs_for_app)
|
|
for doc_id in docs_for_app:
|
|
doc = portal.get_doc(doc_id)
|
|
verify_doc(doc, app_id)
|
|
|
|
|
|
def check_virtdir_perms(path):
|
|
assertRaises(PermissionError, os.mkdir, path + "/a_dir")
|
|
assertRaises(PermissionError, os.open, path + "/a-file", os.O_RDWR | os.O_CREAT)
|
|
|
|
|
|
def check_root_perms(path):
|
|
check_virtdir_perms(path)
|
|
assertRaises(PermissionError, os.rename, path + "/by-app", path + "/by-app2")
|
|
assertRaises(PermissionError, os.rmdir, path + "/by-app")
|
|
|
|
|
|
def check_byapp_perms(path):
|
|
check_virtdir_perms(path)
|
|
assertRaises(PermissionError, os.mkdir, path + "/a_dir")
|
|
|
|
|
|
def check_regular_doc_perms(doc, app_id):
|
|
path = doc.get_doc_path(app_id)
|
|
writable = doc.is_writable_by(app_id)
|
|
# regular documents, can't do most stuff
|
|
assertRaises(PermissionError, os.mkdir, path + "/dir")
|
|
assertRaises(PermissionError, os.symlink, "symlink-value", path + "/symlink")
|
|
|
|
docpath = path + "/" + doc.filename
|
|
tmppath = path + "/a-tmpfile"
|
|
tmppath2 = path + "/another-tmpfile"
|
|
if doc.content: # Main file exists
|
|
assertFileExist(docpath)
|
|
assertFileExist(doc.real_path)
|
|
assertRaises(PermissionError, os.link, docpath, path + "/a-hardlink")
|
|
assertRaises(NotADirectoryError, os.rmdir, docpath)
|
|
assertRaises(PermissionError, os.setxattr, docpath, "user.attr", b"foo")
|
|
assertRaises(PermissionError, os.removexattr, docpath, "user.attr")
|
|
|
|
fd = os.open(docpath, os.O_RDONLY, 0o600)
|
|
os.close(fd)
|
|
|
|
if not writable:
|
|
assertRaises(
|
|
PermissionError, os.open, docpath, os.O_RDONLY | os.O_TRUNC, 0o600
|
|
)
|
|
assertRaises(PermissionError, os.open, docpath, os.O_WRONLY, 0o600)
|
|
assertRaises(PermissionError, os.open, docpath, os.O_RDWR, 0o600)
|
|
assertRaises(PermissionError, os.rename, docpath, docpath + "renamed")
|
|
assertRaises(PermissionError, os.truncate, docpath, 1)
|
|
assertRaises(PermissionError, os.unlink, docpath)
|
|
assertRaises(PermissionError, os.utime, docpath)
|
|
else:
|
|
# Can't move file out of docdir or into other version of same docdir
|
|
assertRaisesErrno(
|
|
errno.EXDEV, os.rename, docpath, path + "/../" + doc.filename
|
|
)
|
|
if app_id:
|
|
assertRaisesErrno(
|
|
errno.EXDEV,
|
|
os.rename,
|
|
docpath,
|
|
doc.get_doc_path(None) + doc.filename,
|
|
)
|
|
if doc.apps and app_id != doc.apps[0]:
|
|
assertRaisesErrno(
|
|
errno.EXDEV,
|
|
os.rename,
|
|
docpath,
|
|
doc.get_doc_path(doc.apps[0]) + doc.filename,
|
|
)
|
|
|
|
# Ensure we can read it (multiple times)
|
|
fd = os.open(docpath, os.O_RDONLY, 0o600)
|
|
assertFdHasContent(fd, doc.content)
|
|
assertFdHasContent(fd, doc.content)
|
|
|
|
# Ensure we can rename it
|
|
os.rename(docpath, docpath + "_renamed")
|
|
assertRaises(FileNotFoundError, os.open, docpath, os.O_RDONLY, 0o600)
|
|
# ... and still read it
|
|
assertFdHasContent(fd, doc.content)
|
|
|
|
# Ensure we can delete it
|
|
os.unlink(docpath + "_renamed")
|
|
# ... and still read it
|
|
assertFdHasContent(fd, doc.content)
|
|
os.close(fd)
|
|
|
|
# Replace main file with rename of tmpfile
|
|
setFileContent(docpath, "orig-data")
|
|
fd1 = os.open(docpath, os.O_RDONLY, 0o600)
|
|
|
|
setFileContent(tmppath, "new-data")
|
|
fd2 = os.open(tmppath, os.O_RDONLY, 0o600)
|
|
|
|
os.rename(tmppath, tmppath2)
|
|
assertRaises(FileNotFoundError, os.lstat, tmppath)
|
|
assertFdHasContent(fd2, "new-data")
|
|
assertFileHasContent(tmppath2, "new-data")
|
|
|
|
os.rename(tmppath2, docpath)
|
|
assertRaises(FileNotFoundError, os.lstat, tmppath2)
|
|
assertFdHasContent(fd1, "orig-data")
|
|
assertFdHasContent(fd2, "new-data")
|
|
assertFileHasContent(docpath, "new-data")
|
|
appendFileContent(docpath, "-more")
|
|
assertFdHasContent(fd2, "new-data-more")
|
|
|
|
setFileContent(tmppath, "replace-this-data")
|
|
fd3 = os.open(tmppath, os.O_RDONLY, 0o600)
|
|
os.rename(docpath, tmppath)
|
|
assertFdHasContent(fd2, "new-data-more")
|
|
assertFdHasContent(fd3, "replace-this-data")
|
|
fd4 = os.open(tmppath, os.O_RDWR, 0o600)
|
|
assertFdHasContent(fd4, "new-data-more")
|
|
|
|
# Restore original version
|
|
os.rename(tmppath, docpath)
|
|
replaceFdContent(fd4, doc.content)
|
|
assertFdHasContent(fd2, doc.content)
|
|
assertFdHasContent(fd4, doc.content)
|
|
assertFileHasContent(docpath, doc.content)
|
|
assertFdHasContent(fd1, "orig-data")
|
|
assertFdHasContent(fd3, "replace-this-data")
|
|
|
|
os.close(fd1)
|
|
os.close(fd2)
|
|
os.close(fd3)
|
|
os.close(fd4)
|
|
|
|
assertRaises(NotADirectoryError, os.rmdir, docpath)
|
|
assertRaises(PermissionError, os.link, docpath, path + "/a-hardlink")
|
|
assertRaises(PermissionError, os.setxattr, docpath, "user.attr", b"foo")
|
|
assertRaises(PermissionError, os.removexattr, docpath, "user.attr")
|
|
|
|
else: # Main file doesn't exist
|
|
assertFileNotExist(docpath)
|
|
assertFileNotExist(doc.real_path)
|
|
if writable: # But we can create it
|
|
setFileContent(docpath, "some-data")
|
|
assertFileHasContent(docpath, "some-data")
|
|
os.unlink(docpath)
|
|
else: # And we can't create it
|
|
assertRaises(
|
|
PermissionError,
|
|
os.open,
|
|
docpath,
|
|
os.O_CREAT | os.O_RDONLY | os.O_TRUNC,
|
|
0o600,
|
|
)
|
|
assertRaises(
|
|
PermissionError, os.open, docpath, os.O_CREAT | os.O_WRONLY, 0o600
|
|
)
|
|
assertRaises(
|
|
PermissionError, os.open, docpath, os.O_CREAT | os.O_RDWR, 0o600
|
|
)
|
|
|
|
# Ensure it show up if created outside
|
|
setFileContent(doc.real_path, "from-outside")
|
|
assertFileExist(docpath)
|
|
assertFileHasContent(docpath, "from-outside")
|
|
if writable:
|
|
os.unlink(docpath)
|
|
else:
|
|
assertRaises(PermissionError, os.unlink, docpath)
|
|
os.unlink(doc.real_path)
|
|
assertFileNotExist(docpath)
|
|
|
|
if writable: # We can create tempfiles, do some simple checks
|
|
setFileContent(tmppath, "tempdata")
|
|
assertFileHasContent(tmppath, "tempdata")
|
|
assertRaises(NotADirectoryError, os.rmdir, tmppath)
|
|
assertRaises(PermissionError, os.link, tmppath, path + "/a-hardlink")
|
|
assertRaises(PermissionError, os.setxattr, tmppath, "user.attr", b"foo")
|
|
assertRaises(PermissionError, os.removexattr, tmppath, "user.attr")
|
|
|
|
os.rename(tmppath, tmppath2)
|
|
assertFileHasContent(tmppath2, "tempdata")
|
|
os.unlink(tmppath2)
|
|
else:
|
|
# We should be unable to create tempfiles
|
|
assertRaises(
|
|
PermissionError,
|
|
os.open,
|
|
tmppath,
|
|
os.O_CREAT | os.O_RDONLY | os.O_TRUNC,
|
|
0o600,
|
|
)
|
|
assertRaises(PermissionError, os.open, tmppath, os.O_CREAT | os.O_WRONLY, 0o600)
|
|
assertRaises(PermissionError, os.open, tmppath, os.O_CREAT | os.O_RDWR, 0o600)
|
|
|
|
|
|
def check_directory_doc_perms(doc, app_id):
|
|
writable = doc.is_writable_by(app_id)
|
|
|
|
docpath = doc.get_doc_path(app_id)
|
|
realpath = doc.real_path
|
|
|
|
# We should not be able to do anything with the toplevel document dir (other than reading the real dir)
|
|
vpath = os.path.dirname(docpath)
|
|
assertDirExist(vpath)
|
|
assertDirFiles(vpath, [doc.dirname])
|
|
assertRaises(PermissionError, os.mkdir, vpath + "/a_dir")
|
|
assertRaises(PermissionError, os.rename, docpath, vpath + "/foo")
|
|
assertRaises(PermissionError, os.rmdir, docpath)
|
|
assertRaises(
|
|
PermissionError, os.open, vpath + "/a_file", os.O_CREAT | os.O_RDWR, 0o600
|
|
)
|
|
|
|
assertDirExist(docpath)
|
|
|
|
# Create some pre-existing files:
|
|
|
|
real_dir = realpath + "/dir"
|
|
os.mkdir(real_dir)
|
|
setFileContent(real_dir + "/realfile", "real1")
|
|
setFileContent(real_dir + "/readonly", "readonly")
|
|
os.chmod(real_dir + "/readonly", 0o500)
|
|
os.mkdir(real_dir + "/subdir")
|
|
os.link(real_dir + "/realfile", real_dir + "/subdir/hardlink")
|
|
os.symlink("realfile", real_dir + "/symlink")
|
|
os.symlink("the-void", real_dir + "/broken-symlink")
|
|
|
|
# Ensure they are visible via portal
|
|
|
|
dir = docpath + "/dir"
|
|
assertDirFiles(docpath, ["dir"])
|
|
assertDirExist(dir)
|
|
assertDirFiles(dir, ["realfile", "readonly", "subdir", "symlink", "broken-symlink"])
|
|
assertDirExist(dir + "/subdir")
|
|
assertDirFiles(dir + "/subdir", ["hardlink"])
|
|
assertFileHasContent(dir + "/realfile", "real1")
|
|
assertFileHasContent(dir + "/readonly", "readonly")
|
|
assertFileHasContent(dir + "/subdir/hardlink", "real1")
|
|
assert (
|
|
os.lstat(dir + "/realfile").st_ino == os.lstat(dir + "/subdir/hardlink").st_ino
|
|
)
|
|
assertSymlink(dir + "/symlink", "realfile")
|
|
assertSymlink(dir + "/broken-symlink", "the-void")
|
|
|
|
filepath = docpath + "/a-file"
|
|
real_filepath = doc.real_path + "/a-file"
|
|
filepath2 = docpath + "/dir/a-file2"
|
|
real_filepath2 = doc.real_path + "/dir/a-file2"
|
|
|
|
if writable: # We can create files
|
|
if os.environ.get("TEST_IN_ROOTED_CI"):
|
|
assertRaises(PermissionError, os.open, dir + "/readonly", os.O_RDWR)
|
|
os.chmod(dir + "/readonly", 0o700)
|
|
fd = os.open(dir + "/readonly", os.O_RDWR) # Works now
|
|
os.close(fd)
|
|
|
|
setFileContent(filepath, "filedata")
|
|
assertFileHasContent(filepath, "filedata")
|
|
assertFileHasContent(real_filepath, "filedata")
|
|
|
|
fd = os.open(filepath, os.O_RDONLY)
|
|
fd2 = os.open(filepath, os.O_RDWR)
|
|
assertFdHasContent(fd, "filedata")
|
|
assertFdHasContent(fd2, "filedata")
|
|
appendFdContent(fd2, "-more")
|
|
assertFdHasContent(fd, "filedata-more")
|
|
assertFdHasContent(fd2, "filedata-more")
|
|
|
|
os.link(filepath, filepath2)
|
|
assert os.lstat(filepath).st_ino == os.lstat(filepath2).st_ino
|
|
assert os.lstat(filepath).st_ino == os.fstat(fd).st_ino
|
|
assertFileHasContent(filepath2, "filedata-more")
|
|
assertFileHasContent(real_filepath2, "filedata-more")
|
|
|
|
os.unlink(filepath)
|
|
assertFileNotExist(filepath)
|
|
assertFileNotExist(real_filepath)
|
|
assertFdHasContent(fd, "filedata-more")
|
|
assertFdHasContent(fd2, "filedata-more")
|
|
|
|
replaceFdContent(fd2, "replaced")
|
|
assertFileHasContent(filepath2, "replaced")
|
|
assertFileHasContent(real_filepath2, "replaced")
|
|
assertFileNotExist(filepath)
|
|
assertFileNotExist(real_filepath)
|
|
assertFdHasContent(fd, "replaced")
|
|
assertFdHasContent(fd2, "replaced")
|
|
|
|
# Move between dirs
|
|
os.rename(filepath2, docpath + "/moved")
|
|
assertFileHasContent(docpath + "/moved", "replaced")
|
|
|
|
assertRaisesErrno(errno.EXDEV, os.rename, docpath, doc.portal.mountpoint)
|
|
|
|
os.unlink(docpath + "/moved")
|
|
|
|
os.close(fd)
|
|
os.close(fd2)
|
|
|
|
os.symlink("realfile", dir + "/symlink2")
|
|
os.symlink("the-void", dir + "/broken-symlink2")
|
|
assertSymlink(dir + "/symlink2", "realfile")
|
|
assertSymlink(dir + "/broken-symlink2", "the-void")
|
|
os.unlink(dir + "/symlink2")
|
|
os.unlink(dir + "/broken-symlink2")
|
|
|
|
else:
|
|
# We should be unable to create files
|
|
assertRaises(
|
|
PermissionError,
|
|
os.open,
|
|
filepath,
|
|
os.O_CREAT | os.O_RDONLY | os.O_TRUNC,
|
|
0o600,
|
|
)
|
|
assertRaises(
|
|
PermissionError, os.open, filepath, os.O_CREAT | os.O_WRONLY, 0o600
|
|
)
|
|
assertRaises(PermissionError, os.open, filepath, os.O_CREAT | os.O_RDWR, 0o600)
|
|
|
|
assertRaises(PermissionError, os.open, dir + "/realfile", os.O_RDWR)
|
|
assertRaises(PermissionError, os.open, dir + "/readonly", os.O_RDWR)
|
|
assertRaises(PermissionError, os.truncate, dir + "/realfile", 0)
|
|
assertRaises(PermissionError, os.link, dir + "/realfile", dir + "/foo")
|
|
assertRaises(PermissionError, os.symlink, "foo", dir + "/new-symlink")
|
|
assertRaises(PermissionError, os.rename, dir + "/realfile", dir + "/foo")
|
|
assertRaises(PermissionError, os.unlink, dir + "/realfile")
|
|
assertRaises(PermissionError, os.chmod, dir + "/realfile", 0o700)
|
|
assertRaises(PermissionError, os.rmdir, dir + "/subdir")
|
|
|
|
os.unlink(real_dir + "/realfile")
|
|
os.unlink(real_dir + "/readonly")
|
|
os.unlink(real_dir + "/subdir/hardlink")
|
|
os.unlink(real_dir + "/symlink")
|
|
os.unlink(real_dir + "/broken-symlink")
|
|
os.rmdir(real_dir + "/subdir")
|
|
os.rmdir(real_dir)
|
|
|
|
|
|
def check_doc_perms(doc, app_id):
|
|
path = doc.get_doc_path(app_id)
|
|
readable = doc.is_readable_by(app_id)
|
|
if not readable:
|
|
assertRaises(FileNotFoundError, os.lstat, path)
|
|
if doc.is_dir: # Non readable dir means we can't even see the toplevel dir
|
|
assertRaises(FileNotFoundError, os.mkdir, path)
|
|
else:
|
|
assertRaises(PermissionError, os.mkdir, path)
|
|
return
|
|
|
|
assertRaises(PermissionError, os.rmdir, path)
|
|
assertRaises(PermissionError, os.rename, path, path + "_renamed")
|
|
assertRaises(IsADirectoryError, os.unlink, path)
|
|
if doc.is_dir:
|
|
check_directory_doc_perms(doc, app_id)
|
|
else:
|
|
check_regular_doc_perms(doc, app_id)
|
|
|
|
|
|
def check_perms(portal):
|
|
check_root_perms(portal.mountpoint)
|
|
check_byapp_perms(portal.by_app_path())
|
|
|
|
for doc in portal.get_docs_randomized():
|
|
check_doc_perms(doc, None)
|
|
for app_id in portal.get_app_ids_randomized():
|
|
check_doc_perms(doc, app_id)
|
|
|
|
|
|
# Ensure that a single lookup by app-id creates that app id (we need this for when mounting the subdir for an app)
|
|
def create_app_by_lookup(portal):
|
|
# Should only work for valid app ids
|
|
assertRaises(FileNotFoundError, os.lstat, portal.app_path("not-an-app-id"))
|
|
|
|
app_id = app_prefix + "Lookup"
|
|
info = os.lstat(portal.app_path(app_id))
|
|
check_virtual_stat(info)
|
|
portal.ensure_app_id(app_id, volatile=True)
|
|
|
|
|
|
def ensure_real_dir(create_hidden_file=True):
|
|
count = get_a_count("doc")
|
|
dir = os.environ["TMPDIR"] + "/" + dir_prefix + str(count)
|
|
os.makedirs(dir)
|
|
if create_hidden_file:
|
|
setFileContent(dir + "/cant-see-this-file", "s3krit")
|
|
return (dir, count)
|
|
|
|
|
|
def ensure_real_dir_file(create_file):
|
|
(dir, count) = ensure_real_dir()
|
|
path = dir + "/the-file"
|
|
if create_file:
|
|
setFileContent(path, "data" + str(count))
|
|
return path
|
|
|
|
|
|
def export_a_doc(portal):
|
|
path = ensure_real_dir_file(True)
|
|
doc = portal.add(path)
|
|
logv("exported %s as %s" % (path, doc))
|
|
|
|
lookup = portal.lookup(path)
|
|
assert lookup == doc.id
|
|
|
|
lookup_on_fuse = portal.lookup(doc.get_doc_path(None) + "/" + doc.filename)
|
|
assert lookup_on_fuse == doc.id
|
|
|
|
reused_doc = portal.add(path)
|
|
assert doc is reused_doc
|
|
|
|
not_reused_doc = portal.add(path, False)
|
|
assert doc is not not_reused_doc
|
|
|
|
# We should not be able to re-export a tmpfile
|
|
tmppath = doc.get_doc_path(None) + "/tmpfile"
|
|
setFileContent(tmppath, "tempdata")
|
|
|
|
# Should not be able to add a tempfile on the fuse mount, or look it up
|
|
assertRaises(GLib.Error, portal.add, tmppath)
|
|
lookup = portal.lookup(tmppath)
|
|
assert lookup == ""
|
|
|
|
os.unlink(tmppath)
|
|
|
|
|
|
def export_a_named_doc(portal, create_file):
|
|
path = ensure_real_dir_file(create_file)
|
|
doc = portal.add_named(path)
|
|
logv("exported (named) %s as %s" % (path, doc))
|
|
|
|
if create_file:
|
|
lookup = portal.lookup(path)
|
|
assert lookup == doc.id
|
|
|
|
reused_doc = portal.add_named(path)
|
|
assert doc is reused_doc
|
|
|
|
not_reused_doc = portal.add_named(path, False)
|
|
assert doc is not not_reused_doc
|
|
|
|
|
|
def export_a_dir_doc(portal):
|
|
(dir, count) = ensure_real_dir(False)
|
|
doc = portal.add_dir(dir)
|
|
logv("exported (dir) %s as %s" % (dir, doc))
|
|
|
|
lookup = portal.lookup(dir)
|
|
assert lookup == doc.id
|
|
|
|
lookup_on_fuse = portal.lookup(doc.get_doc_path(None))
|
|
assert lookup_on_fuse == doc.id
|
|
|
|
# We should not be able to portal lookup a file in the dir doc
|
|
subpath = doc.get_doc_path(None) + "/sub"
|
|
setFileContent(subpath, "sub")
|
|
doc = portal.lookup(subpath)
|
|
assert doc == ""
|
|
doc2 = portal.lookup(dir + "/sub")
|
|
assert doc2 == ""
|
|
|
|
# But we should be able to re-export the file
|
|
reexported_doc = portal.add(subpath)
|
|
reexported_docdir = reexported_doc.get_doc_path(None)
|
|
assertFileHasContent(reexported_docdir + "/sub", "sub")
|
|
portal.delete(reexported_doc.id)
|
|
|
|
os.unlink(subpath)
|
|
|
|
# And also re-export a directory
|
|
os.mkdir(subpath)
|
|
setFileContent(subpath + "/subfile", "subfile")
|
|
reexported_doc = portal.add_dir(subpath)
|
|
reexported_docdir = reexported_doc.get_doc_path(None)
|
|
assertFileHasContent(reexported_docdir + "/subfile", "subfile")
|
|
portal.delete(reexported_doc.id)
|
|
|
|
os.unlink(subpath + "/subfile")
|
|
os.rmdir(subpath)
|
|
|
|
|
|
def add_an_app(portal, num_docs):
|
|
if num_docs == 0:
|
|
return
|
|
count = get_a_count("app")
|
|
read_app = app_prefix + "read.App" + str(count)
|
|
write_app = app_prefix + "write.App" + str(count)
|
|
portal.ensure_app_id(read_app)
|
|
portal.ensure_app_id(write_app)
|
|
|
|
docs = portal.get_docs()
|
|
ids = []
|
|
for i in range(num_docs):
|
|
if len(docs) == 0:
|
|
continue
|
|
indx = random.randint(0, len(docs) - 1)
|
|
doc = docs[indx]
|
|
del docs[indx]
|
|
ids.append(doc.id)
|
|
portal.grant_permissions(doc.id, read_app, ["read"])
|
|
doc.apps.append(read_app)
|
|
portal.grant_permissions(doc.id, write_app, ["read", "write"])
|
|
doc.apps.append(write_app)
|
|
logv("granted acces to %s and %s for %s" % (read_app, write_app, ids))
|
|
|
|
|
|
def file_transfer_portal_test():
|
|
log("File transfer tests")
|
|
ft_portal = FileTransferPortal()
|
|
|
|
key = ft_portal.start_transfer()
|
|
|
|
file1 = ensure_real_dir_file(True)
|
|
file2 = ensure_real_dir_file(True)
|
|
res = ft_portal.add_files(key, [file1, file2])
|
|
|
|
res = ft_portal.retrieve_files(key)
|
|
files = res[0]
|
|
assert len(files) == 2
|
|
# This is the same app, it's not sandboxed
|
|
assert files[0] == file1
|
|
assert files[1] == file2
|
|
log("filetransfer tests ok")
|
|
|
|
log("filetransfer dir")
|
|
key = ft_portal.start_transfer()
|
|
dir1 = ensure_real_dir(True)
|
|
ft_portal.add_files(key, [file1, dir1[0], file2])
|
|
res = ft_portal.retrieve_files(key)
|
|
assert len(res[0]) == 3
|
|
assert res[0][0] == file1
|
|
assert res[0][1] == dir1[0]
|
|
assert res[0][2] == file2
|
|
log("filetransfer dir ok")
|
|
|
|
log("filetransfer key")
|
|
# Test that an invalid key is rejected
|
|
key = ft_portal.start_transfer()
|
|
assert key != "1234"
|
|
assertRaisesGError(
|
|
"GDBus.Error:org.freedesktop.DBus.Error.AccessDenied",
|
|
9,
|
|
ft_portal.add_files,
|
|
"1234",
|
|
[file1, file2],
|
|
)
|
|
|
|
# Test stop transfer
|
|
key = ft_portal.start_transfer()
|
|
ft_portal.add_files(key, [file1, file2])
|
|
ft_portal.stop_transfer(key)
|
|
assertRaisesGError(
|
|
"GDBus.Error:org.freedesktop.DBus.Error.AccessDenied",
|
|
9,
|
|
ft_portal.retrieve_files,
|
|
key,
|
|
)
|
|
assertRaisesGError(
|
|
"GDBus.Error:org.freedesktop.DBus.Error.AccessDenied",
|
|
9,
|
|
ft_portal.add_files,
|
|
key,
|
|
[file1, file2],
|
|
)
|
|
|
|
# Test that we can't reuse an old key
|
|
new_key = ft_portal.start_transfer()
|
|
assertRaisesGError(
|
|
"GDBus.Error:org.freedesktop.DBus.Error.AccessDenied",
|
|
9,
|
|
ft_portal.add_files,
|
|
key,
|
|
[file1, file2],
|
|
)
|
|
res = ft_portal.add_files(new_key, [file1, file2])
|
|
log("filetransfer key ok")
|
|
|
|
log("File transfer tests ok")
|
|
|
|
|
|
def run_test(iterations, prefix=None, do_ensure_no_remaining=True):
|
|
global app_prefix
|
|
global dir_prefix
|
|
global ensure_no_remaining
|
|
|
|
if prefix:
|
|
app_prefix = app_prefix + prefix + "."
|
|
dir_prefix = dir_prefix + "-" + prefix + "-"
|
|
|
|
ensure_no_remaining = do_ensure_no_remaining
|
|
|
|
log("Connecting to portal")
|
|
doc_portal = DocPortal()
|
|
|
|
log("Running fuse tests...")
|
|
create_app_by_lookup(doc_portal)
|
|
verify_fs_layout(doc_portal)
|
|
|
|
log("Creating some docs")
|
|
for i in range(10):
|
|
export_a_doc(doc_portal)
|
|
verify_fs_layout(doc_portal)
|
|
|
|
log("Creating some named docs (existing)")
|
|
for i in range(10):
|
|
export_a_named_doc(doc_portal, True)
|
|
verify_fs_layout(doc_portal)
|
|
|
|
log("Creating some named docs (non-existing)")
|
|
for i in range(10):
|
|
export_a_named_doc(doc_portal, False)
|
|
verify_fs_layout(doc_portal)
|
|
|
|
log("Creating some dir docs")
|
|
for i in range(10):
|
|
export_a_dir_doc(doc_portal)
|
|
verify_fs_layout(doc_portal)
|
|
|
|
log("Creating some apps")
|
|
for i in range(10):
|
|
add_an_app(doc_portal, 6)
|
|
verify_fs_layout(doc_portal)
|
|
|
|
for i in range(iterations):
|
|
log("Checking permissions, pass %d" % (i + 1))
|
|
check_perms(doc_portal)
|
|
verify_fs_layout(doc_portal)
|
|
|
|
log("fuse tests ok")
|
|
file_transfer_portal_test()
|
|
|
|
|
|
class Process(mp.Process):
|
|
def __init__(self, *args, **kwargs):
|
|
mp.Process.__init__(self, *args, **kwargs)
|
|
self._pconn, self._cconn = mp.Pipe()
|
|
self._exception = None
|
|
|
|
def run(self):
|
|
try:
|
|
mp.Process.run(self)
|
|
self._cconn.send(None)
|
|
except Exception as e:
|
|
tb = traceback.format_exc()
|
|
self._cconn.send((e, tb))
|
|
|
|
@property
|
|
def exception(self):
|
|
if self._pconn.poll():
|
|
self._exception = self._pconn.recv()
|
|
return self._exception
|
|
|
|
|
|
class TestDocumentFuse:
|
|
def parallel(self, test_function, parallel_tests, parallel_iterations):
|
|
procs = []
|
|
for i in range(parallel_tests):
|
|
p = Process(
|
|
target=test_function, args=(parallel_iterations, f"c{i}", False)
|
|
)
|
|
p.start()
|
|
procs.append(p)
|
|
|
|
for p in procs:
|
|
p.join()
|
|
|
|
if p.exception:
|
|
error, traceback = p.exception
|
|
raise error
|
|
|
|
def test_single_thread(self, portals, xdg_document_portal, dbus_con):
|
|
run_test(3)
|
|
|
|
def test_multi_thread(self, portals, xdg_document_portal, dbus_con):
|
|
if xdp.run_long_tests():
|
|
return self.parallel(run_test, 20, 10)
|
|
if xdp.is_in_ci():
|
|
return self.parallel(run_test, 5, 3)
|
|
self.parallel(run_test, 10, 5)
|
|
|
|
|
|
# Running
|
|
# ./tests/run-test.sh -n 0 tests/test_document_fuse.py::TestDocumentFuse::test_multi_thread
|
|
# works fine, but with
|
|
# ./tests/run-test.sh -n 0 tests/test_document_fuse.py::TestDocumentFuse
|
|
# the `test_multi_thread` test is failing.
|
|
# For now, let's skip the test and turn it on again when we have fixed it.
|
|
pytest.skip("Test has a race condition which can make it fail", allow_module_level=True)
|
|
|
|
try:
|
|
xdp.ensure_fuse_supported()
|
|
except xdp.FuseNotSupportedException as e:
|
|
pytest.skip(f"No fuse support: {e}", allow_module_level=True)
|