Skip to content

API Reference (PathOps)

Facade

sciwork.fs.pathops.PathOps(base_dir=None, *, dry_run=False, input_func=None)

Bases: Paths, Dirs, Create, Delete, Transfer, GetContents, Open, Select, Load, TreeOps, Archives

One-stop helper combining path utilities, listing, selection, transfers, deletion, loading, archiving and simple OS openers.

Examples

fs = PathOps.from_cwd() p = fs.resolve_path("data", "file.csv") choice = fs.select_paths("data", path_type="files")

Source code in src/sciwork/fs/pathops.py
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(
		self,
		base_dir: Optional[PathLike] = None,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = None,
) -> None:
	Paths.__init__(
		self,
		base_dir=base_dir or Path.cwd(),
		dry_run=dry_run,
		input_func=(input_func or input)
	)

__repr__()

Source code in src/sciwork/fs/pathops.py
52
53
def __repr__(self) -> str:
	return f"PathOps(base_dir={self.base_dir!r}, dry_run={self.dry_run}, input_func={self.input_func})"

Core building blocks

PathOpsBase

sciwork.fs.base.PathOpsBase(base_dir=None, *, dry_run=False, input_func=None, **kwargs)

Shared state and core helpers for filesystem operations.

This base keeps a root directory for resolving relative paths, a global dry-run flag (log actions instead of touching the disk), and an injectable input function useful for confirmations and tests. Higher-level modules (delete, listdir, transfer, archives, ...) can subclass or accept an instance of this base to reuse the same context.

Parameters:

Name Type Description Default
base_dir Optional[PathLike]

Base directory used to resolve relative paths. Defaults to Path.cwd().

None
dry_run bool

When True, operations that would modify the filesystem only log the intended action and return early.

False
input_func Optional[Callable[[str], str]]

Function used for interactive prompts. Defaults to :func:input.

None
Source code in src/sciwork/fs/base.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(
		self,
		base_dir: Optional[PathLike] = None,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = None,
		**kwargs
) -> None:
	self.base_dir = Path(base_dir).resolve() if base_dir else Path.cwd()
	self.dry_run = bool(dry_run)
	self.input_func = input_func or input

base_dir = Path(base_dir).resolve() if base_dir else Path.cwd() instance-attribute

dry_run = bool(dry_run) instance-attribute

input_func = input_func or input instance-attribute

__repr__()

Source code in src/sciwork/fs/base.py
155
156
157
158
159
160
def __repr__(self) -> str:
	return (
		f"{self.__class__.__name__}("
		f"base_dir={str(self.base_dir)!r}, dry_run={self.dry_run!r}, "
		f"input_func={getattr(self.input_func, '__name__', type(self.input_func).__name__)!r})"
	)

__str__()

Source code in src/sciwork/fs/base.py
152
153
def __str__(self) -> str:
	return f"{self.__class__.__name__}[base={self.base_dir}, dry_run={self.dry_run}]"

_abs(p)

Resolve p relative to :attr:base_dir if it's not absolute.

Parameters:

Name Type Description Default
p PathLike

Absolute or relative file system path.

required

Returns:

Type Description
Path

Absolute path.

Source code in src/sciwork/fs/base.py
87
88
89
90
91
92
93
94
95
def _abs(self, p: PathLike) -> Path:
	"""
	Resolve *p* relative to :attr:`base_dir` if it's not absolute.

	:param p: Absolute or relative file system path.
	:return: Absolute path.
	"""
	pth = Path(p)
	return pth if pth.is_absolute() else (self.base_dir / pth)

_apply_mode(target, mode) staticmethod

Best-effort chmod; log, do not propagate errors.

Source code in src/sciwork/fs/base.py
 97
 98
 99
100
101
102
103
104
105
@staticmethod
def _apply_mode(target: Path, mode: Optional[int]) -> None:
	"""Best-effort chmod; log, do not propagate errors."""
	if mode is None:
		return
	try:
		target.chmod(mode)
	except Exception as exc:
		LOG.warning("Failed to chmod %o on %s: %s", mode, target, exc)

_pick_prompt(override=None, confirm=False)

Choose a (str)→str prompter. 1) explicit override 2) lazy import sciwork.console.Prompter().prompt 3) self.input_func (if present) 4) builtin input

Source code in src/sciwork/fs/base.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def _pick_prompt(self, override: Optional[PrompterProtocol] = None, confirm=False) -> PrompterProtocol:
	"""
	Choose a (str)→str prompter.
		1) explicit override
		2) lazy import ``sciwork.console.Prompter().prompt``
		3) ``self.input_func`` (if present)
		4) builtin ``input``
	"""
	if override is not None:
		return override

	try:
		from ..console.prompter import Prompter
		if confirm:
			return Prompter().confirm
		return Prompter().prompt
	except Exception:
		pass

	f = getattr(self, "input_func", None)
	if callable(f):
		return f  # type: ignore[return-value]

	return input

at(base, *, dry_run=False, input_func=input, **kwargs) classmethod

Create an instance rooted at a given base directory.

Parameters:

Name Type Description Default
base PathLike

Base directory to anchor relative paths.

required
dry_run bool

If True, do not modify the filesystem (log-only).

False
input_func Optional[Callable[[str], str]]

Function used for interactive confirmations.

input

Returns:

Type Description
'PathOpsBase'

PathOpsBase: A new instance with base_dir = Path.cwd().

Source code in src/sciwork/fs/base.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@classmethod
def at(
		cls,
		base: PathLike,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = input,
		**kwargs
) -> "PathOpsBase":
	"""
	Create an instance rooted at a given base directory.

	:param base: Base directory to anchor relative paths.
	:param dry_run: If True, do not modify the filesystem (log-only).
	:param input_func: Function used for interactive confirmations.
	:return: PathOpsBase: A new instance with base_dir = Path.cwd().
	"""
	return cls(base_dir=Path(base), dry_run=dry_run, input_func=input_func, **kwargs)

coerce_file_path(path)

Return an absolute path to a file. Rejects trailing path separators that make the path look like a directory.

Source code in src/sciwork/fs/base.py
133
134
135
136
137
138
139
140
141
142
def coerce_file_path(self, path: PathLike) -> Path:
	"""
	Return an absolute path to a *file*.
	Rejects trailing path separators that make the path look like a directory.
	"""
	p = self._abs(path)
	raw = str(path)
	if raw.endswith(("/", "\\")):
		raise IsADirectoryError(f"Path looks like a directory (trailing slash): {raw!r}")
	return p

coerce_folder_path(path)

Return an absolute path to a folder.

Source code in src/sciwork/fs/base.py
144
145
146
147
148
149
def coerce_folder_path(self, path: PathLike) -> Path:
	"""Return an absolute path to a *folder*."""
	p = self._abs(path)
	if not p.is_dir():
		raise NotADirectoryError(f"Path is not a directory: {p!r}")
	return p

from_cwd(*, dry_run=False, input_func=input, **kwargs) classmethod

Create an instance rooted at the current working directory.

Parameters:

Name Type Description Default
dry_run bool

If True, do not modify the filesystem (log-only).

False
input_func Optional[Callable[[str], str]]

Function used for interactive confirmations.

input

Returns:

Type Description
'PathOpsBase'

PathOpsBase: A new instance with base_dir = Path.cwd().

Source code in src/sciwork/fs/base.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@classmethod
def from_cwd(
		cls,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = input,
		**kwargs
) -> "PathOpsBase":
	"""
	Create an instance rooted at the current working directory.

	:param dry_run: If True, do not modify the filesystem (log-only).
	:param input_func: Function used for interactive confirmations.
	:return: PathOpsBase: A new instance with base_dir = Path.cwd().
	"""
	return cls(base_dir=Path.cwd(), dry_run=dry_run, input_func=input_func, **kwargs)

Paths

sciwork.fs.paths.Paths(base_dir=None, *, dry_run=False, input_func=None, **kwargs)

Bases: PathOpsBase

Small helper layer with user-facing path utilities.

Source code in src/sciwork/fs/base.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(
		self,
		base_dir: Optional[PathLike] = None,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = None,
		**kwargs
) -> None:
	self.base_dir = Path(base_dir).resolve() if base_dir else Path.cwd()
	self.dry_run = bool(dry_run)
	self.input_func = input_func or input

_default_prompt(*, kind, has_default) staticmethod

Create a readable prompt message.

Source code in src/sciwork/fs/paths.py
33
34
35
36
37
38
39
@staticmethod
def _default_prompt(*, kind: str, has_default: bool) -> str:
	"""Create a readable prompt message."""
	suffix = " (leave empty for default)" if has_default else ""
	if kind in {'file', 'folder'}:
		return f"Enter the path to the {kind}{suffix} (Ctrl+Shift+C to copy the {kind}path)"
	return f"Enter the path (Ctrl+Shift+C to copy the path)"

_looks_like_dir(raw) staticmethod

Heuristic for directory-like input (trailing slash).

Source code in src/sciwork/fs/paths.py
41
42
43
44
@staticmethod
def _looks_like_dir(raw: str) -> bool:
	"""Heuristic for directory-like input (trailing slash)."""
	return raw.endswith("/") or raw.endswith("\\")

_normalize_exts(exts) staticmethod

Normalize an extension list to lower-case with a leading dot.

Source code in src/sciwork/fs/paths.py
20
21
22
23
24
25
26
27
28
29
30
31
@staticmethod
def _normalize_exts(exts: Optional[Iterable[str]]) -> set[str]:
	"""Normalize an extension list to lower-case with a leading dot."""
	if not exts:
		return set()
	out: set[str] = set()
	for e in exts:
		t = str(e).strip().lower()
		if not t:
			continue
		out.add(t if t.startswith(".") else f".{t}")
	return out

_type_guard(path, *, kind) staticmethod

Ensure the existing path matches the requested kind, else raise.

Source code in src/sciwork/fs/paths.py
46
47
48
49
50
51
52
@staticmethod
def _type_guard(path: Path, *, kind: str) -> None:
	"""Ensure the existing path matches the requested kind, else raise."""
	if kind == "file" and path.is_dir():
		raise IsADirectoryError(f"Path looks like a directory (trailing slash): {path!r}")
	if kind == "folder" and not path.is_dir():
		raise NotADirectoryError(f"Path is not a directory: {path!r}")

prompt_path(*, kind='any', must_exist=True, allowed_exts=None, default=None, prompt=None, prompter=None)

Prompt the user for a file or folder path and validate it.

The prompter selection processes in this order: 1) explicit prompter argument if provided, 2) method self.prompt (from :class:sciwork.console.Prompter, if available), 3) attribute self.input_func (if present), 4) builtin :func:input.

Parameters:

Name Type Description Default
kind Literal['file', 'folder', 'any']

Either "file", "folder" or "any" (default).

'any'
must_exist bool

When True, the path must exist and match the requested kind.

True
allowed_exts Optional[Iterable[str]]

Optional iterable of allowed file extensions (e.g., ['.csv', '.jpg']). Only enforced when kind="file". Case-insensitive; dots are optional.

None
default Optional[PathLike]

Optional default value returned when the user submits an empty input.

None
prompt Optional[str]

Optional custom prompt text. If None, a sensible default is used.

None
prompter Optional[PrompterProtocol]

Optional callable to ask the user. If omitted, it tries to use self.prompt / self.input_func / builtin input.

None

Returns:

Type Description
Optional[Path]

Absolute, resolved path.

Raises:

Type Description
ValueError

If kind is invalid or file extension is not allowed.

NotADirectoryError

When kind="folder" but the path is not a directory (for existing paths).

IsADirectoryError

When kind="file" but the path is a directory.

Source code in src/sciwork/fs/paths.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def prompt_path(
		self,
		*,
		kind: Literal['file', 'folder', 'any'] = "any",
		must_exist: bool = True,
		allowed_exts: Optional[Iterable[str]] = None,
		default: Optional[PathLike] = None,
		prompt: Optional[str] = None,
		prompter: Optional[PrompterProtocol] = None
) -> Optional[Path]:
	"""
	Prompt the user for a file or folder path and validate it.

	The prompter selection processes in this order:
	1) explicit ``prompter`` argument if provided,
	2) method ``self.prompt`` (from :class:`sciwork.console.Prompter`, if available),
	3) attribute ``self.input_func`` (if present),
	4) builtin :func:`input`.

	:param kind: Either ``"file"``, ``"folder"`` or ``"any"`` (default).
	:param must_exist: When ``True``, the path must exist and match the requested ``kind``.
	:param allowed_exts: Optional iterable of allowed file extensions (e.g., ``['.csv', '.jpg']``).
						 Only enforced when ``kind="file"``. Case-insensitive; dots are optional.
	:param default: Optional default value returned when the user submits an empty input.
	:param prompt: Optional custom prompt text. If ``None``, a sensible default is used.
	:param prompter: Optional callable to ask the user. If omitted, it tries to use
					 ``self.prompt`` / ``self.input_func`` / builtin ``input``.
	:return: Absolute, resolved path.
	:raises ValueError: If ``kind`` is invalid or file extension is not allowed.
	:raises NotADirectoryError: When ``kind="folder"`` but the path is not a directory (for existing paths).
	:raises IsADirectoryError: When ``kind="file"`` but the path is a directory.
	"""
	if kind not in {"file", "folder", "any"}:
		raise ValueError(f"kind must be 'file' or 'folder': {kind!r}")

	ask = self._pick_prompt(prompter)
	norm_exts = self._normalize_exts(allowed_exts)
	message = prompt or self._default_prompt(kind=kind, has_default=default is not None)

	while True:
		raw = ask(message).strip().strip('"').strip("'")
		if not raw and default is not None:
			raw = str(default)

		# expand env/home and resolve relative to base_dir
		s = os.path.expandvars(os.path.expanduser(raw))
		candidate = self._abs(s)

		# must-exist logic and type guard
		if must_exist:
			if not candidate.exists():
				# re-prompt when missing
				continue
			self._type_guard(candidate, kind=kind)
		else:
			if candidate.exists():
				self._type_guard(candidate, kind=kind)
			else:
				# for a would-be file, reject obvious directory-like input (trailing slash)
				if kind == "file" and self._looks_like_dir(raw):
					raise IsADirectoryError(f"Path looks like a directory (trailing slash): {raw!r}")

		# extension filter for files
		if kind == "file" and norm_exts:
			ext = candidate.suffix.lower()
			if ext not in norm_exts:
				raise ValueError(
					f"File must have one of the extensions: {sorted(norm_exts)} (got {ext or '<none>'})."
				)

		return candidate.resolve()

rename_path(old_path, new_path, *, overwrite=False, create_parents=True)

Rename (or move within the same filesystem) a path to a new path.

If new_path is an existing directory, the entry will be moved into it under its original name.

This prefers :py:meth:Path.rename (atomic on the same FS). If it fails with EXDEV (cross-device), it falls back to :func:shutil.move.

Parameters:

Name Type Description Default
old_path PathLike

Existing file or directory.

required
new_path PathLike

The target file or directory path or existing directory.

required
overwrite bool

If True, remove an existing target before renaming.

False
create_parents bool

Ensure the parent directory for the target exists.

True

Returns:

Type Description
Path

Absolute resolved target path.

Raises:

Type Description
FileNotFoundError

Old path does not exist.

FileExistsError

Target exists and the overwrite=False.

PermissionError

On OS-level errors.

OSError

On OS-level errors.

Source code in src/sciwork/fs/paths.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def rename_path(
		self,
		old_path: PathLike,
		new_path: PathLike,
		*,
		overwrite: bool = False,
		create_parents: bool = True
) -> Path:
	"""
	Rename (or move within the same filesystem) a path to a new path.

	If *new_path* is an existing directory, the entry will be moved **into** it
	under its original name.

	This prefers :py:meth:`Path.rename` (atomic on the same FS). If it fails
	with EXDEV (cross-device), it falls back to :func:`shutil.move`.

	:param old_path: Existing file or directory.
	:param new_path: The target file or directory path or existing directory.
	:param overwrite: If True, remove an existing target before renaming.
	:param create_parents: Ensure the parent directory for the target exists.
	:return: Absolute resolved target path.
	:raises FileNotFoundError: Old path does not exist.
	:raises FileExistsError: Target exists and the ``overwrite=False``.
	:raises PermissionError: On OS-level errors.
	:raises OSError: On OS-level errors.
	"""
	try:
		from .transfer import Transfer
	except ImportError:
		LOG.error("'rename_path' method requires the optional dependency 'sciwork.fs.Transfer' to work.")
		raise

	src, target = Transfer().prepare_transfer(
		old_path, new_path,
		overwrite=overwrite, create_parents=create_parents
	)

	if self.dry_run:
		LOG.info("[dry-run] rename %s -> %s", src, target)
		return target.resolve()

	# Try atomic rename first; fallback to shutil.move on EXDEV
	try:
		src.rename(target)
	except OSError as exc:
		if getattr(exc, "errno", None) in (18, getattr(os, "EXDEV", 18)):
			try:
				shutil.move(str(src), str(target))
			except Exception:
				LOG.exception("shutil.move fallback failed: %s -> %s", src, target)
				raise
		else:
			LOG.exception("OS error while renaming '%s' -> '%s': %s", src, target, exc)
			raise

	LOG.info("Renamed %s -> %s", src, target)
	return target.resolve()

resolve_path(*parts)

Join one or more path parts relative to base_dir (unless the first part is absolute) and return an absolute, resolved path.

Parameters:

Name Type Description Default
parts PathLike

Path segments (str or :class:pathlib.Path). If empty, returns base_dir.resolve().

()

Returns:

Type Description
Path

Absolute, resolved path.

Source code in src/sciwork/fs/paths.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def resolve_path(self, *parts: PathLike) -> Path:
	"""
	Join one or more path *parts* relative to ``base_dir`` (unless the first
	part is absolute) and return an absolute, resolved path.

	:param parts: Path segments (``str`` or :class:`pathlib.Path`). If empty, returns
					``base_dir.resolve()``.
	:return: Absolute, resolved path.
	"""
	if not parts:
		return self.base_dir.resolve()

	first = Path(parts[0])
	p = first if first.is_absolute() else (self.base_dir / first)
	for seg in parts[1:]:
		p = p / Path(seg)
	return p.resolve()

Dirs

sciwork.fs.dirs.Dirs(base_dir=None, *, dry_run=False, input_func=None, **kwargs)

Bases: PathOpsBase

Directory-centric helpers built on top of :class:~sciwork.fs.base.PathOpsBase.

Provides quick checks for emptiness and a simple polling utility to wait until a directory contains at least one matching entry.

Source code in src/sciwork/fs/base.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(
		self,
		base_dir: Optional[PathLike] = None,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = None,
		**kwargs
) -> None:
	self.base_dir = Path(base_dir).resolve() if base_dir else Path.cwd()
	self.dry_run = bool(dry_run)
	self.input_func = input_func or input

is_folder_empty(folder_path=None, *, include_hidden=True, pattern=None, antipattern=None, shell_pattern=None, files_only=False)

Return True if the folder contains no entries passing the filters.

If no filters are provided, all direct children are considered. Scanning is non-recursive.

Parameters:

Name Type Description Default
folder_path Optional[PathLike]

Directory to inspect (absolute or relative to base_dir). If None, the :attr:base_dir is used.

None
include_hidden bool

If False, ignore entries whose name starts with ..

True
pattern Optional[str]

Include only entries whose name contains this substring.

None
antipattern Optional[str]

Exclude entries whose name contains this substring.

None
shell_pattern Optional[str]

Shell-like pattern matched against the name (e.g., "*.csv"). Applied in addition to pattern filters.

None
files_only bool

If True, consider files only (no subdirectories).

False

Returns:

Type Description
bool

True when no entries match the filters; False otherwise.

Raises:

Type Description
FileNotFoundError

Folder does not exist.

NotADirectoryError

Path exists but is not a directory.

Source code in src/sciwork/fs/dirs.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def is_folder_empty(
		self,
		folder_path: Optional[PathLike] = None,
		*,
		include_hidden: bool = True,
		pattern: Optional[str] = None,
		antipattern: Optional[str] = None,
		shell_pattern: Optional[str] = None,
		files_only: bool = False
) -> bool:
	"""
	Return ``True`` if the folder contains **no entries** passing the filters.

	If no filters are provided, all direct children are considered.
	Scanning is non-recursive.

	:param folder_path: Directory to inspect (absolute or relative to ``base_dir``).
						If ``None``, the :attr:`base_dir` is used.
	:param include_hidden: If ``False``, ignore entries whose *name* starts with ``.``.
	:param pattern: Include only entries whose *name* contains this substring.
	:param antipattern: Exclude entries whose *name* contains this substring.
	:param shell_pattern: Shell-like pattern matched against the *name* (e.g., ``"*.csv"``). Applied in addition to ``pattern`` filters.
	:param files_only: If True, consider files only (no subdirectories).
	:return: True when no entries match the filters; False otherwise.
	:raises FileNotFoundError: Folder does not exist.
	:raises NotADirectoryError: Path exists but is not a directory.
	"""
	inspected = folder_path if folder_path else self.base_dir
	root = self.try_get_dir(inspected)
	assert root is not None  # try_get_dir raises when missing_ok=False

	# Fast path: return on first match
	for entry in root.iterdir():
		if files_only and not entry.is_file():
			continue
		if matches_filters(
				entry.name,
				include_hidden=include_hidden,
				pattern=pattern,
				antipattern=antipattern,
				shell_pattern=shell_pattern
		):
			LOG.info("Folder %s is not empty", root)
			return False
	LOG.info("Folder %s is empty", root)
	return True

require_dir(p, *, create=False)

Ensure p exists and is a directory; optionally create it.

Parameters:

Name Type Description Default
p PathLike

Directory path (absolute or relative).

required
create bool

If True and the directory does not exist, it will be created. Respects :attr:dry_run.

False

Returns:

Type Description
Path

Resolved an absolute directory path.

Raises:

Type Description
NotADirectoryError

When the path exists and is not a directory.

FileNotFoundError

When the directory does not exist and create=False.

Source code in src/sciwork/fs/dirs.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def require_dir(self, p: PathLike, *, create: bool = False) -> Path:
	"""
	Ensure *p* exists and is a directory; optionally create it.

	:param p: Directory path (absolute or relative).
	:param create: If ``True`` and the directory does not exist, it will be created.
					Respects :attr:`dry_run`.
	:return: Resolved an absolute directory path.
	:raises NotADirectoryError: When the path exists and is not a directory.
	:raises FileNotFoundError: When the directory does not exist and ``create=False``.
	"""
	target = self._abs(p)
	if target.exists():
		if not target.is_dir():
			raise NotADirectoryError(f"Path exists and is not a directory: {target}")
		return target.resolve()

	if create:
		from .create import Create
		return Create().make_folder(target, exist_ok=True)

	raise FileNotFoundError(f"Directory not found: {target}")

temp_dir(folder_path=None, *, prefix='sciwork-', suffix='', cleanup=True)

Create a temporary directory and yield its path.

In :attr:dry_run mode, a hypothetical path is yielded and no filesystem changes are performed. When cleanup is True, the directory is removed on context exit; otherwise, it is kept on disk.

Usage

with self.temp_dir() as td: # do some stuff with td

:yields: :class:pathlib.Path pointing to the (real or hypothetical) temporary directory.

Parameters:

Name Type Description Default
folder_path Optional[PathLike]

Parent directory for the temp directory.

None
prefix str

Prefix for the temp directory name.

'sciwork-'
suffix str

Suffix for the temp directory name.

''
cleanup bool

Remove the directory when leaving the context.

True
Source code in src/sciwork/fs/dirs.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
@contextmanager
def temp_dir(
		self,
		folder_path: Optional[PathLike] = None, *,
		prefix: str = "sciwork-",
		suffix: str = "",
		cleanup: bool = True
) -> Generator[Path, Any, None]:
	"""
	Create a temporary directory and yield its path.

	In :attr:`dry_run` mode, a hypothetical path is yielded and no filesystem
	changes are performed. When ``cleanup`` is True, the directory is removed
	on context exit; otherwise, it is kept on disk.

	Usage
	-----
	with self.temp_dir() as td:
		# do some stuff with td

	:param folder_path: Parent directory for the temp directory.
	:param prefix: Prefix for the temp directory name.
	:param suffix: Suffix for the temp directory name.
	:param cleanup: Remove the directory when leaving the context.
	:yields: :class:`pathlib.Path` pointing to the (real or hypothetical) temporary directory.
	"""
	temp_parent = self.try_get_dir(folder_path) or self.base_dir
	if self.dry_run:
		hypothetical = (temp_parent / f"{prefix}XXXXXX{suffix}").resolve()
		LOG.info("[dry-run] would create temp dir: %s", hypothetical)
		try:
			yield hypothetical
		finally:
			LOG.info("[dry-run] would %s temp dir: %s", "remove" if cleanup else "keep", hypothetical)
		return

	if cleanup:
		td = tempfile.TemporaryDirectory(prefix=prefix, suffix=suffix, dir=temp_parent)
		path = Path(td.name).resolve()
		LOG.info("Created temp dir: %s", path)
		try:
			yield path
		finally:
			LOG.info("Removing temp dir: %s", path)
			td.cleanup()
	else:
		path = Path(tempfile.mkdtemp(prefix=prefix, suffix=suffix, dir=temp_parent)).resolve()
		LOG.info("Created temp dir: %s", path)
		try:
			yield path
		finally:
			LOG.info("Keeping temp dir (no cleanup): %s", path)

try_get_dir(folder_path, *, missing_ok=False)

Resolve folder_path relative to :attr:base_dir, ensure it is a directory, and return the resolved :class:pathlib.Path.

Parameters:

Name Type Description Default
folder_path PathLike

Target folder (absolute or relative to base_dir).

required
missing_ok bool

If True, return None when the path does not exist; otherwise, raise :class:FileNotFoundError.

False

Returns:

Type Description
Optional[Path]

Resolved directory path, or None when missing and missing_ok=True.

Raises:

Type Description
FileNotFoundError

When missing and missing_ok is False.

NotADirectoryError

Whe the path exists but is not a directory.

Source code in src/sciwork/fs/dirs.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def try_get_dir(self, folder_path: PathLike, *, missing_ok: bool = False) -> Optional[Path]:
	"""
	Resolve *folder_path* relative to :attr:`base_dir`, ensure it is a directory,
	and return the resolved :class:`pathlib.Path`.

	:param folder_path: Target folder (absolute or relative to ``base_dir``).
	:param missing_ok: If ``True``, return ``None`` when the path does not exist;
					   otherwise, raise :class:`FileNotFoundError`.
	:return: Resolved directory path, or ``None`` when missing and ``missing_ok=True``.
	:raises FileNotFoundError: When missing and ``missing_ok`` is ``False``.
	:raises NotADirectoryError: Whe the path exists but is not a directory.
	"""
	raw = str(folder_path).strip() if isinstance(folder_path, PathLike) else ""
	if raw in {"", ".", "./", ".\\"}:
		root = self.base_dir
	else:
		root = self._abs(raw)

	if not root.exists() and raw and raw == self.base_dir.name and self.base_dir.exists():
		root = self.base_dir

	if not root.exists():
		if missing_ok:
			LOG.warning("Directory not found (missing_ok=True): %s", root)
			return None
		raise FileNotFoundError(f"Directory does not exist: {root}")
	if not root.is_dir():
		raise NotADirectoryError(f"Path exists but is not a directory: {root}")
	return root.resolve()

wait_until_not_empty(folder_path=None, *, timeout=30.0, poll_interval=0.5, include_hidden=True, pattern=None, antipattern=None, shell_pattern=None, files_only=False)

Poll a folder until at least one entry matches the filters.

Parameters:

Name Type Description Default
folder_path Optional[PathLike]

Folder to watch. If None, the :attr:base_dir is used.

None
timeout float

Maximum time to wait in seconds. Use 0 for a single check.

30.0
poll_interval float

Delay between checks in seconds.

0.5
include_hidden bool

If False, ignore dot-files.

True
pattern Optional[str]

Include only names containing this substring.

None
antipattern Optional[str]

Exclude names containing this substring.

None
shell_pattern Optional[str]

Shell-like pattern for names (e.g., "*.csv").

None
files_only bool

If True, consider files only (no subdirectories).

False

Returns:

Type Description
int

Number of matching entries when the condition is met (> 0).

Raises:

Type Description
FileNotFoundError

Folder does not exist.

NotADirectoryError

Path exists but is not a directory.

TimeoutError

If the folder stays empty past timeout.

Source code in src/sciwork/fs/dirs.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def wait_until_not_empty(
		self,
		folder_path: Optional[PathLike] = None,
		*,
		timeout: float = 30.0,
		poll_interval: float = 0.5,
		include_hidden: bool = True,
		pattern: Optional[str] = None,
		antipattern: Optional[str] = None,
		shell_pattern: Optional[str] = None,
		files_only: bool = False,
) -> int:
	"""
	Poll a folder until at least one entry matches the filters.

	:param folder_path: Folder to watch.
						If ``None``, the :attr:`base_dir` is used.
	:param timeout:  Maximum time to wait in seconds. Use 0 for a single check.
	:param poll_interval: Delay between checks in seconds.
	:param include_hidden: If False, ignore dot-files.
	:param pattern: Include only names containing this substring.
	:param antipattern: Exclude names containing this substring.
	:param shell_pattern: Shell-like pattern for names (e.g., "*.csv").
	:param files_only: If True, consider files only (no subdirectories).
	:return: Number of matching entries when the condition is met (> 0).
	:raises FileNotFoundError: Folder does not exist.
	:raises NotADirectoryError: Path exists but is not a directory.
	:raises TimeoutError: If the folder stays empty past *timeout*.
	"""
	inspected = folder_path if folder_path else self.base_dir
	root = self.try_get_dir(inspected)
	assert root is not None

	deadline = time.monotonic() + max(0.0, timeout)

	while True:
		try:
			# count (stop early on first match for speed)
			count = 0
			for _ in iter_dir_filtered(
					root,
					include_hidden=include_hidden,
					pattern=pattern,
					antipattern=antipattern,
					shell_pattern=shell_pattern,
					files_only=files_only
			):
				count += 1
				if count > 0:
					LOG.info("Folder became non-empty (%d match%s): %s",
					         count, "" if count == 1 else "es", root)
					return count
		except PermissionError as exc:
			LOG.warning("Permission issue while scanning %s: %s", root, exc)

		if time.monotonic() >= deadline:
			raise TimeoutError(f"Timeout waiting for folder '{root}' to become non-empty.")
		time.sleep(max(0.05, poll_interval))

Create

sciwork.fs.create.Create(base_dir=None, *, dry_run=False, input_func=None, **kwargs)

Bases: PathOpsBase

Creation helpers: folders, files, temp dirs, parents. Expects PathOpsBase fields+helpers: base_dir, dry_run, _abs(...).

Source code in src/sciwork/fs/base.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(
		self,
		base_dir: Optional[PathLike] = None,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = None,
		**kwargs
) -> None:
	self.base_dir = Path(base_dir).resolve() if base_dir else Path.cwd()
	self.dry_run = bool(dry_run)
	self.input_func = input_func or input

create_file(path, *, op='a', create_parents=True, permissions=None, encoding='utf-8')

Create an empty file using the given open mode.

Modes: -'a': creation if missing, open for appending (do not truncate if exists). - 'w': creation or truncate to zero lengths. - 'x': exclusive creation; fail if the file already exists.

Respects base_dir and dry_run.

Parameters:

Name Type Description Default
path PathLike

File path (absolute or relative to base_dir).

required
op str

One of {'a', 'w', 'x'}.

'a'
create_parents bool

If True, ensure the parent directory exists first.

True
permissions Optional[int]

Optional permission bits to apply after creation/update.

None
encoding str

Text encoding that is used when creating the file handle.

'utf-8'

Returns:

Type Description
Path

Resolved the absolute file path.

Raises:

Type Description
ValueError

Invalid operation.

FileExistsError

With op='x' when the file exists.

PermissionError

When permission is denied.

OSError

Other OS-level errors.

Source code in src/sciwork/fs/create.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def create_file(
		self,
		path: PathLike,
		*,
		op: str = "a",
		create_parents: bool = True,
		permissions: Optional[int] = None,
		encoding: str = "utf-8"
) -> Path:
	"""
	Create an empty file using the given open mode.

	Modes:
		-``'a'``: creation if missing, open for appending (do not truncate if exists).
		- ``'w'``: creation or truncate to zero lengths.
		- ``'x'``: exclusive creation; fail if the file already exists.

	Respects ``base_dir`` and ``dry_run``.

	:param path: File path (absolute or relative to ``base_dir``).
	:param op: One of {'a', 'w', 'x'}.
	:param create_parents: If True, ensure the parent directory exists first.
	:param permissions: Optional permission bits to apply after creation/update.
	:param encoding: Text encoding that is used when creating the file handle.
	:return: Resolved the absolute file path.
	:raises ValueError: Invalid operation.
	:raises FileExistsError: With ``op='x'`` when the file exists.
	:raises PermissionError: When permission is denied.
	:raises OSError: Other OS-level errors.
	"""
	if op not in {"a", "w", "x"}:
		raise ValueError(f"Invalid op: {op}. Expected 'a', 'w', or 'x'.")

	target: Path = self.coerce_file_path(path)

	if create_parents:
		self.ensure_parent(target)

	if self.dry_run:
		LOG.info("[dry-run] create file (%s): %s", op, target)
		return target.resolve()

	try:
		# create / truncate, according to mode; we don't write content
		with open(target, op, encoding=encoding):
			pass
	except PermissionError:
		LOG.exception("Permission denied while creating file (%s): %s:", op, target)
		raise
	except OSError as exc:
		LOG.exception("OS error while creating file '%s' (%s): %s", target, op, exc)
		raise

	self._apply_mode(target, permissions)

	LOG.info("Created file (%s): %s", op, target)
	return target.resolve()

ensure_parent(path, *, mode=511)

Ensure the parent directory of a file path exists.

Resolves path relative to base_dir (if not absolute) and ensures that path.parent exists (creating it if needed). Honors dry_run.

Parameters:

Name Type Description Default
path PathLike

File path whose parent directory will be ensured.

required
mode int

Permission bits for newly created directories (ignored on Windows).

511

Returns:

Type Description
Path

Resolved the absolute parent directory path.

Source code in src/sciwork/fs/create.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def ensure_parent(self, path: PathLike, *, mode: int = 0o777) -> Path:
	"""
	Ensure the parent directory of a file path exists.

	Resolves ``path`` relative to ``base_dir`` (if not absolute) and ensures
	that ``path.parent`` exists (creating it if needed). Honors ``dry_run``.

	:param path: File path whose parent directory will be ensured.
	:param mode: Permission bits for newly created directories (ignored on Windows).
	:return: Resolved the absolute parent directory path.
	"""
	parent = self._abs(path).parent
	self.make_folder(parent, exist_ok=True, mode=mode)
	return parent.resolve()

make_folder(path, *, exist_ok=True, mode=511)

Create a directory (and parents) at path.

The path is resolved relative to self.base_dir if it is not absolute. Honors dry_run: when True, the action is only logged and no filesystem changes are performed.

Parameters:

Name Type Description Default
path PathLike

Target directory path (absolute or relative to base_dir).

required
exist_ok bool

If True, do not raise if the directory already exists. If False, raise: class:FileExistsError when it exists.

True
mode int

Permission bits for newly created directories (ignored on Windows).

511

Returns:

Type Description
Path

Resolved an absolute path to the directory (existing or created).

Raises:

Type Description
NotADirectoryError

When a non-directory exists at the given path.

FileExistsError

When the directory exists and exist_ok is False.

PermissionError

When permission is denied.

OSError

For other OS-level errors.

Source code in src/sciwork/fs/create.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def make_folder(
		self,
		path: PathLike,
		*,
		exist_ok: bool = True,
		mode: int = 0o777,
) -> Path:
	"""
	Create a directory (and parents) at *path*.

	The path is resolved relative to ``self.base_dir`` if it is not absolute.
	Honors ``dry_run``: when True, the action is only logged and no filesystem
	changes are performed.

	:param path: Target directory path (absolute or relative to ``base_dir``).
	:param exist_ok: If True, do not raise if the directory already exists.
					If False, raise: class:`FileExistsError` when it exists.
	:param mode: Permission bits for newly created directories (ignored on Windows).
	:return: Resolved an absolute path to the directory (existing or created).
	:raises NotADirectoryError: When a non-directory exists at the given path.
	:raises FileExistsError: When the directory exists and ``exist_ok`` is False.
	:raises PermissionError: When permission is denied.
	:raises OSError: For other OS-level errors.
	"""
	target = self._abs(path)

	if target.exists():
		if not target.is_dir():
			msg = f"Path exists and is not a directory: {target}"
			LOG.error(msg)
			raise NotADirectoryError(msg)
		if exist_ok:
			LOG.info("Directory already exists: %s", target)
			return target.resolve()
		msg = f"Directory already exists: {target}"
		LOG.error(msg)
		raise FileExistsError(msg)

	if self.dry_run:
		LOG.info("[dry-run] mkdir -p %s", target)
		return target.resolve()

	try:
		Path(target).mkdir(parents=True, exist_ok=True, mode=mode)
		LOG.info("Created directory: %s", target)
		return target.resolve()
	except PermissionError:
		LOG.exception("Permission denied while creating directory: %s", target)
		raise
	except OSError as exc:
		LOG.exception("OS error while creating directory '%s': %s", target, exc)
		raise

touch_file(path, *, create_parents=True, exist_ok=True, mode=None)

Create a file if it does not exist or update its modification time.

Respects base_dir and dry_run. When create_parents is True, missing parent directories are created.

Parameters:

Name Type Description Default
path PathLike

File path (absolute or relative to base_dir).

required
create_parents bool

If True, ensure parent directories exist.

True
exist_ok bool

If False and file exist, raise class:FileExistsError.

True
mode Optional[int]

Optional permission bits to apply after creation/update.

None

Returns:

Type Description
Path

Resolved the absolute file path.

Raises:

Type Description
IsADirectoryError

When the path is not a file.

FileExistsError

When the file exists and exist_ok is False.

PermissionError

When permission is denied.

OSError

Other OS-level errors.

Source code in src/sciwork/fs/create.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def touch_file(
		self,
		path: PathLike,
		*,
		create_parents: bool = True,
		exist_ok: bool = True,
		mode: Optional[int] = None
) -> Path:
	"""
	Create a file if it does not exist or update its modification time.

	Respects ``base_dir`` and ``dry_run``. When ``create_parents`` is True,
	missing parent directories are created.

	:param path: File path (absolute or relative to ``base_dir``).
	:param create_parents: If True, ensure parent directories exist.
	:param exist_ok: If False and file exist, raise class:`FileExistsError`.
	:param mode: Optional permission bits to apply after creation/update.
	:return: Resolved the absolute file path.
	:raises IsADirectoryError: When the path is not a file.
	:raises FileExistsError: When the file exists and ``exist_ok`` is False.
	:raises PermissionError: When permission is denied.
	:raises OSError: Other OS-level errors.
	"""
	target = self.coerce_file_path(path)

	if target.exists():
		if not target.is_file():
			msg = f"Path exists and is not a file: {target}"
			LOG.error(msg)
			raise IsADirectoryError(msg)  # or NotAFileError in future Python
		if not exist_ok:
			msg = f"File already exists: {target}"
			LOG.error(msg)
			raise FileExistsError(msg)
		if self.dry_run:
			LOG.info("[dry-run] touch (update mtime): %s", target)
			return target.resolve()
		target.touch(exist_ok=True)
		if mode is not None:
			try:
				target.chmod(mode)
			except Exception as exc:
				LOG.warning("Failed to apply mode %o to %s: %s", mode, target, exc)
		LOG.info("Touched file: %s", target)
		return target.resolve()

	# not exists
	if create_parents:
		self.ensure_parent(target)

	if self.dry_run:
		LOG.info("[dry-run] create file: %s", target)
		return target.resolve()

	try:
		target.touch(exist_ok=True)
		if mode is not None:
			try:
				target.chmod(mode)
			except Exception as exc:
				LOG.warning("Failed to apply mode %o to %s: %s", mode, target, exc)
		LOG.info("Created file: %s", target)
		return target.resolve()
	except PermissionError:
		LOG.exception("Permission denied while touching file: %s", target)
		raise
	except OSError as exc:
		LOG.exception("OS error while touching file '%s': %s", target, exc)
		raise

Delete

sciwork.fs.delete.Delete(base_dir=None, *, dry_run=False, input_func=None, **kwargs)

Bases: PathOpsBase

Deletion utilities: remove files/dirs, move to trash, and clear folder contents.

The class respects :attr:dry_run from :class:~sciwork.fs.base.PathOpsBase and uses :class:~sciwork.console.Prompter for interactive confirmations (when available).

Source code in src/sciwork/fs/base.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(
		self,
		base_dir: Optional[PathLike] = None,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = None,
		**kwargs
) -> None:
	self.base_dir = Path(base_dir).resolve() if base_dir else Path.cwd()
	self.dry_run = bool(dry_run)
	self.input_func = input_func or input

_confirm_removal(action)

Confirms the removal of a file by prompting the user with a confirmation message.

Parameters:

Name Type Description Default
action str

The action is described in the confirmation message.

required

Returns:

Type Description
bool

True if the user confirms the action, otherwise False.

Source code in src/sciwork/fs/delete.py
32
33
34
35
36
37
38
39
40
41
def _confirm_removal(self, action: str) -> bool:
	"""
	Confirms the removal of a file by prompting the user with a confirmation message.

	:param action: The action is described in the confirmation message.
	:return: True if the user confirms the action, otherwise False.
	"""
	ask = self._pick_prompt(None, confirm=True)
	message = f"Are you sure you want to {action}?"
	return ask(message)

_delete_one(entry, *, trash, recursive, follow_symlinks)

Delete or trash a single entry (file/dir/symlink). Obeys :attr:dry_run.

Parameters:

Name Type Description Default
entry Path

Path to remove.

required
trash bool

If True, move to OS trash (Send2Trash).

required
recursive bool

For directories, delete contents recursively when not trashing.

required
follow_symlinks bool

Follow symlinks to directories when not trashing.

required
Source code in src/sciwork/fs/delete.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def _delete_one(
		self,
		entry: Path,
		*,
		trash: bool,
		recursive: bool,
		follow_symlinks: bool
) -> None:
	"""
	Delete or trash a single entry (file/dir/symlink).
	Obeys :attr:`dry_run`.

	:param entry: Path to remove.
	:param trash: If True, move to OS trash (Send2Trash).
	:param recursive: For directories, delete contents recursively when not trashing.
	:param follow_symlinks: Follow symlinks to directories when not trashing.
	"""
	if self.dry_run:
		action = "trash" if trash else "delete"
		kind = "symlink" if entry.is_symlink() else ("dir" if entry.is_dir() else "file")
		suffix = " (recursive)" if (not trash and entry.is_dir() and recursive) else ""
		LOG.info(f"[dry-run] %s %s%s: %s", action, kind, suffix, entry)
		return

	if trash:
		self.trash(entry, missing_ok=True)
		return

	if entry.is_symlink():
		_delete_symlink(entry, follow_symlinks=follow_symlinks)
	elif entry.is_file():
		_delete_file(entry)
	if entry.is_dir():
		_delete_dir(entry, recursive=recursive)
	else:
		# Rare FS entries (FIFO, sockets...). Best-effort unlink (3.12+ has missing_ok).
		try:
			entry.unlink(missing_ok=True)  # type: ignore[arg-type]
		except TypeError:
			# fallback for older Python
			try:
				entry.unlink()
			except FileNotFoundError:
				pass

_preflight_confirm(root, *, trash, recursive, include_hidden, pattern, antipattern, shell_pattern, threshold)

Estimate the number of candidates and ask for confirmation if count ≥ threshold. Uses :meth:Prompter.confirm when available; otherwise falls back to a plain prompt.

Parameters:

Name Type Description Default
root Path

The root path.

required
trash bool

If True, moves files to trash.

required
recursive bool

If True, recursively traverses the directory tree.

required
include_hidden bool

If True, includes hidden files.

required
pattern Optional[str]

Include only names containing this substring.

required
antipattern Optional[str]

Exclude names containing this substring.

required
shell_pattern Optional[str]

Shell-like pattern for names (e.g., "*.jpg").

required
threshold int

Maximal number of items to be removed without confirmation.

required

Returns:

Type Description
bool

True if confirmed or count<threshold; False if the user declined.

Source code in src/sciwork/fs/delete.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def _preflight_confirm(
		self,
		root: Path,
		*,
		trash: bool,
		recursive: bool,
		include_hidden: bool,
		pattern: Optional[str],
		antipattern: Optional[str],
		shell_pattern: Optional[str],
		threshold: int
) -> bool:
	"""
	Estimate the number of candidates and ask for confirmation if count ≥ threshold.
	Uses :meth:`Prompter.confirm` when available; otherwise falls back to a plain prompt.

	:param root: The root path.
	:param trash: If True, moves files to trash.
	:param recursive: If True, recursively traverses the directory tree.
	:param include_hidden: If True, includes hidden files.
	:param pattern: Include only names containing this substring.
	:param antipattern: Exclude names containing this substring.
	:param shell_pattern: Shell-like pattern for names (e.g., "*.jpg").
	:param threshold: Maximal number of items to be removed without confirmation.
	:return: True if confirmed or count<threshold; False if the user declined.
	"""
	est = 0
	try:
		for entry in iter_clear_candidates(root, trash=trash, recursive=recursive):
			if not include_hidden and is_hidden_path(root, entry):
				continue
			if not matches_filters(
					entry.name,
					include_hidden=True,
					pattern=pattern,
					antipattern=antipattern,
					shell_pattern=shell_pattern
			):
				continue
			est += 1
			if est >= threshold:
				break
	except PermissionError as exc:
		LOG.warning("Permission error while pre-scanning '%s': %s", root, exc)
		est = threshold

	if est >= threshold:
		msg = f"About to remove at least {est} entr{'y' if est == 1 else 'ies'} from: {root}. Continue?"
		try:
			ask = self._pick_prompt(None, confirm=True)
			return ask(msg)
		except Exception:
			# very defensive fallback
			ans = input(f"{msg} [y/N]: ").strip().lower()
			return ans in {"y", "yes"}
	return True

clear_folder(folder, *, recursive=True, include_hidden=True, pattern=None, antipattern=None, shell_pattern=None, trash=False, follow_symlinks=False, missing_ok=False, ignore_errors=False, confirm_if_over=None)

Remove all entries from a folder while keeping the folder itself.

This method requires the optional dependency sciwork.fs.Dirs.

By default, removes files and subdirectories recursively (like rm -rf on the folder contents). When trash is True, entries are moved to the OS recycle bin (requires mod:Send2Trash) instead of being permanently deleted.

Filtering can be applied using a simple substring pattern and/or antipattern (both tested against the entry name), or a shell-like glob (e.g., "*.tmp"). Hidden files/dirs (names starting with .) can be excluded with include_hidden=False.

Safety rail: If confirm_is_over is set (e.g., 200), a pre-flight pass estimates the number of matching entries. If it meets/exceeds the threshold, the user is prompted once via input_func to confirm. On decline, no deletions are performed and the method returns 0.

Respects dry_run (logs intended actions without touching the filesystem).

Parameters:

Name Type Description Default
folder PathLike

Target the folder whose contents will be removed.

required
recursive bool

If True (default), remove directory trees; when False, only direct children are considered (dirs must be empty).

True
include_hidden bool

If False, skip entries with any components starting with .

True
pattern Optional[str]

Keep only entries whose name contains this substring.

None
antipattern Optional[str]

Exclude entries whose name contains this substring.

None
shell_pattern Optional[str]

Optional shell pattern matched against the name (e.g., "*.log"). Applied in addition to pattern filters.

None
trash bool

Move entries to OS trash instead of deleting permanently.

False
follow_symlinks bool

If True and deleting permanently, follow symlinks to dirs (dangerous). When trash=True, symlinks are trashed as links.

False
missing_ok bool

If True, do not raise when folder does not exist.

False
ignore_errors bool

If True, log errors and continue; otherwise, re-raise.

False
confirm_if_over Optional[int]

Threshold for interactive confirmation before deletion. If the number of matching entries is >= this value, prompt the user via input_func.

None

Returns:

Type Description
int

Number of entries removed (or that would be if dry_run is True).

Raises:

Type Description
FileNotFoundError

If folder does not exist and missing_ok is False.

NotADirectoryError

If folder exists but is not a directory.

Source code in src/sciwork/fs/delete.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def clear_folder(
		self,
		folder: PathLike,
		*,
		recursive: bool = True,
		include_hidden: bool = True,
		pattern: Optional[str] = None,
		antipattern: Optional[str] = None,
		shell_pattern: Optional[str] = None,
		trash: bool = False,
		follow_symlinks: bool = False,
		missing_ok: bool = False,
		ignore_errors: bool = False,
		confirm_if_over: Optional[int] = None
) -> int:
	"""
	Remove all entries from a folder while **keeping the folder itself**.

	This method requires the optional dependency ``sciwork.fs.Dirs``.

	By default, removes files and subdirectories **recursively** (like ``rm -rf`` on
	the folder *contents*). When ``trash`` is True, entries are moved to the OS
	recycle bin (requires mod:`Send2Trash`) instead of being permanently deleted.

	Filtering can be applied using a simple substring ``pattern`` and/or
	``antipattern`` (both tested against the entry **name**), or a shell-like
	``glob`` (e.g., ``"*.tmp"``). Hidden files/dirs (names starting with ``.``)
	can be excluded with ``include_hidden=False``.

	Safety rail:
		If ``confirm_is_over`` is set (e.g., 200), a pre-flight pass estimates the
		number of matching entries. If it meets/exceeds the threshold, the user
		is prompted once via ``input_func`` to confirm. On decline, no deletions
		are performed and the method returns 0.

	Respects ``dry_run`` (logs intended actions without touching the filesystem).

	:param folder: Target the folder whose *contents* will be removed.
	:param recursive: If True (default), remove directory trees; when False,
						only direct children are considered (dirs must be empty).
	:param include_hidden: If False, skip entries with any components starting with ``.``
	:param pattern: Keep only entries whose *name* contains this substring.
	:param antipattern: Exclude entries whose *name* contains this substring.
	:param shell_pattern: Optional shell pattern matched against the *name*
							(e.g., ``"*.log"``). Applied in addition to ``pattern`` filters.
	:param trash: Move entries to OS trash instead of deleting permanently.
	:param follow_symlinks: If True and deleting permanently, follow symlinks to dirs
							(dangerous). When ``trash=True``, symlinks are trashed as links.
	:param missing_ok: If True, do not raise when *folder* does not exist.
	:param ignore_errors: If True, log errors and continue; otherwise, re-raise.
	:param confirm_if_over: Threshold for interactive confirmation before deletion.
							If the number of matching entries is >= this value,
							prompt the user via ``input_func``.
	:return: Number of entries removed (or that would be if ``dry_run`` is True).
	:raises FileNotFoundError: If *folder* does not exist and ``missing_ok`` is False.
	:raises NotADirectoryError: If *folder* exists but is not a directory.
	"""
	try:
		from .dirs import Dirs
	except Exception:
		LOG.error("clear_folder requires the optional dependency 'sciwork.fs.Dirs' to work.")
		raise

	root = Dirs().try_get_dir(folder, missing_ok=missing_ok)
	if root is None:
		return 0

	# Safety rail: preflight confirmation
	if confirm_if_over and confirm_if_over > 0:

		if not self._preflight_confirm(
			root,
			trash=trash,
			recursive=recursive,
			include_hidden=include_hidden,
			pattern=pattern,
			antipattern=antipattern,
			shell_pattern=shell_pattern,
			threshold=confirm_if_over
		):
			LOG.warning("User declined clear_folder in %s", root)
			return 0

	removed = 0
	try:
		for entry in iter_clear_candidates(root, trash=trash, recursive=recursive):
			# hidden filter
			if not include_hidden and is_hidden_path(root, entry):
				continue
			# name filters
			if not matches_filters(
					entry.name,
					include_hidden=True,  # hidden paths handled above
					pattern=pattern,
					antipattern=antipattern,
					shell_pattern=shell_pattern
			):
				continue

			try:
				self._delete_one(
					entry,
					trash=trash,
					recursive=recursive,
					follow_symlinks=follow_symlinks
				)
				removed += 1
			except Exception as exc:
				if ignore_errors:
					LOG.error("Failed to delete '%s': %s", entry, exc)
					continue
				raise
	except PermissionError as exc:
		if ignore_errors:
			LOG.warning("Permission error while clearing '%s': %s", root, exc)
		else:
			raise

	LOG.info("Cleared %d entr%s from: %s", removed, "y" if removed == 1 else "ies", root)
	return removed

delete(path, *, missing_ok=False, recursive=False, follow_symlinks=False, confirm=False)

Deletes a path (file, directory, or symlink).

  • Files and symlinks are unlinked.
  • Directories: * if recursive is True, remove the whole tree; * if False (default), only remove an empty directory (like rmdir).
  • Symlinks to directories are treated as symlinks (unlinked) unless follow_symlinks is True.

Respects dry_run (only logs actions).

Parameters:

Name Type Description Default
path PathLike

Path of the target file or directory (absolute or relative to base_dir).

required
missing_ok bool

If True, do not raise when the path does not exist.

False
recursive bool

When deleting a directory, remove contents recursively.

False
follow_symlinks bool

If True, and path is a symlink to a directory, delete the target directory tree instead of the link.

False
confirm bool

If True, prompt the user before deleting.

False

Raises:

Type Description
FileNotFoundError

When the path does not exist and missing_ok is False.

ValueError

Unsupported path type or path type.

Source code in src/sciwork/fs/delete.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def delete(
		self,
		path: PathLike,
		*,
		missing_ok: bool = False,
		recursive: bool = False,
		follow_symlinks: bool = False,
		confirm: bool = False
) -> None:
	"""
	Deletes a *path* (file, directory, or symlink).

	- Files and symlinks are unlinked.
	- Directories:
		* if ``recursive`` is True, remove the whole tree;
		* if False (default), only remove an empty directory (like ``rmdir``).
	- Symlinks to directories are treated as symlinks (unlinked) unless
		``follow_symlinks`` is True.

	Respects ``dry_run`` (only logs actions).

	:param path: Path of the target file or directory (absolute or relative to ``base_dir``).
	:param missing_ok: If True, do not raise when the path does not exist.
	:param recursive: When deleting a directory, remove contents recursively.
	:param follow_symlinks: If True, and *path* is a symlink to a directory,
							delete the target directory tree instead of the link.
	:param confirm: If True, prompt the user before deleting.
	:raises FileNotFoundError: When the path does not exist and ``missing_ok`` is False.
	:raises ValueError: Unsupported path type or path type.
	"""
	target = self._abs(path)

	if not target.exists():
		if missing_ok:
			LOG.warning("delete skipped (missing): %s", target)
			return
		raise FileNotFoundError(f"Path '{target}' not found.")

	if self.dry_run:
		kind = "symlink" if target.is_symlink() else ("dir" if target.is_dir() else "file")
		suffix = " (recursive)" if (kind == "dir" and recursive) else ""
		LOG.info(f"[dry-run] delete %s%s: %s", kind, suffix, target)
		return

	confirmation = self._confirm_removal(f"delete {target}") if confirm else True

	# Actual deletion
	try:
		if confirmation:
			# Symlinks (to files or dirs) - unlink by default
			if target.is_symlink() and not follow_symlinks:
				_delete_symlink(target, follow_symlinks=follow_symlinks)
				LOG.info("Deleted symlink: %s", target)
			elif target.is_file():
				_delete_file(target)
				LOG.info("Deleted file: %s", target)
			elif target.is_dir():
				_delete_dir(target, recursive=recursive)
				LOG.info("Deleted %s directory: %s: ", "recursive" if recursive else "empty", target)
			else:
				# Fallback - unusual filesystem entry
				raise ValueError(f"Unsupported path type: {target}")
		else:
			LOG.warning("The user has not confirmed the deletion of: %s", target)
			return
	except FileNotFoundError:
		if missing_ok:
			LOG.warning("delete skipped (disappeared): %s", target)
			return
		raise

trash(path, *, missing_ok=False, confirm=False)

Move a file or directory to the OS thrash/recycling bin. Requires the optional dependency Send2Trash

Parameters:

Name Type Description Default
path PathLike

Target path (absolute or relative to base_dir).

required
missing_ok bool

If True, do not raise when the path does not exist.

False
confirm bool

If True, prompt the user before moving to trash.

False

Raises:

Type Description
FileNotFoundError

When the path does not exist and missing_ok is False.

Source code in src/sciwork/fs/delete.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def trash(self, path: PathLike, *, missing_ok: bool = False, confirm: bool = False) -> None:
	"""
	Move a file or directory to the OS thrash/recycling bin.
	Requires the optional dependency ``Send2Trash``

	:param path: Target path (absolute or relative to ``base_dir``).
	:param missing_ok: If True, do not raise when the path does not exist.
	:param confirm: If True, prompt the user before moving to trash.
	:raises FileNotFoundError: When the path does not exist and ``missing_ok`` is False.
	"""
	target = self._abs(path)
	if not target.exists():
		if missing_ok:
			LOG.warning("thrash skipped (missing): %s", target)
			return
		raise FileNotFoundError(f"Path '{target}' not found.")

	if self.dry_run:
		LOG.info("[dry-run] move to thrash: %s", target)
		return

	confirmation = self._confirm_removal(f"move to thrash {target}") if confirm else True
	if confirmation:
		Send2Trash.send2trash(str(target))
	else:
		LOG.warning("The user has not confirmed the thrashing of: %s", target)
		return

	LOG.info("Moved to thrash: %s", target)

Transfer

sciwork.fs.transfer.Transfer(base_dir=None, *, dry_run=False, input_func=None, **kwargs)

Bases: PathOpsBase

File/dir transfer utilities (copy/move/delete).

This class reuses: - :class:PathOpsBase for path resolution and dry-run flags - :class:Create for :meth:ensure_parent - :class:Deleter for removing existing targets (when overwrite=True)

    Parameters mirror shutil semantics but add safety rails and consistent logging.
Source code in src/sciwork/fs/base.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(
		self,
		base_dir: Optional[PathLike] = None,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = None,
		**kwargs
) -> None:
	self.base_dir = Path(base_dir).resolve() if base_dir else Path.cwd()
	self.dry_run = bool(dry_run)
	self.input_func = input_func or input

_copy_dir(src, dst, *, preserve_metadata) staticmethod

Copy a directory tree.

We use the copytree with dirs_exist_ok=True (target already removed if overwrite). The copy function is copy2 when preserve_metadata; else copy.

Source code in src/sciwork/fs/transfer.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@staticmethod
def _copy_dir(
		src: Path,
		dst: Path,
		*,
		preserve_metadata: bool,
) -> None:
	"""
	Copy a directory tree.

	We use the copytree with dirs_exist_ok=True (target already removed if overwrite).
	The copy function is copy2 when *preserve_metadata*; else copy.
	"""
	copy_func = shutil.copy2 if preserve_metadata else shutil.copy
	shutil.copytree(src, dst, dirs_exist_ok=True, copy_function=copy_func)

_copy_file(src, dst, *, preserve_metadata, follow_symlinks) staticmethod

Copy a single file with desired semantics.

Source code in src/sciwork/fs/transfer.py
85
86
87
88
89
90
91
92
93
94
95
96
97
@staticmethod
def _copy_file(
		src: Path,
		dst: Path,
		*,
		preserve_metadata: bool,
		follow_symlinks: bool
) -> None:
	"""Copy a single file with desired semantics."""
	if preserve_metadata:
		shutil.copy2(src, dst, follow_symlinks=follow_symlinks)
	else:
		shutil.copy(src, dst, follow_symlinks=follow_symlinks)

_move_any(src, dst) staticmethod

Cross-device save move.

Source code in src/sciwork/fs/transfer.py
115
116
117
118
@staticmethod
def _move_any(src: Path, dst: Path) -> None:
	"""Cross-device save move."""
	shutil.move(str(src), str(dst))

_preflight_target(target, *, overwrite, create_parents)

Ensure parent, and optionally remove existing target if overwrite=True. Respects dry_run.

Parameters:

Name Type Description Default
target Path

The target path to create or overwrite.

required
overwrite bool

If True and the target exist, remove it.

required
create_parents bool

Ensure the parent directory exists.

required
Source code in src/sciwork/fs/transfer.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def _preflight_target(
		self,
		target: Path,
		*,
		overwrite: bool,
		create_parents: bool
) -> None:
	"""
	Ensure parent, and optionally remove existing *target* if overwrite=True.
	Respects ``dry_run``.

	:param target: The target path to create or overwrite.
	:param overwrite: If True and the target exist, remove it.
	:param create_parents: Ensure the parent directory exists.
	"""
	if create_parents:
		try:
			from .create import Create
		except Exception:
			LOG.error("Transfer class requires the optional dependency 'sciwork.fs.Create' to work.")
			raise
		Create().ensure_parent(target)

	if target.exists() and overwrite:
		if self.dry_run:
			LOG.info("[dry-run] remove existing target: %s", target)
			return
		# Remove file/symlink or directory tree
		if target.is_dir():
			try:
				from .delete import Delete
			except Exception:
				LOG.error("Transfer class requires the optional dependency 'sciwork.fs.Delete' to work.")
				raise
			Delete().delete(target, recursive=True, missing_ok=True)
		else:
			try:
				target.unlink()
			except FileNotFoundError:
				pass  # race-safe

_resolve_transfer_target(src, dst) staticmethod

Compute the effective target path for a transfer.

If dst exists and is a directory, returns dst / src.name. Otherwise, returns dst as-is.

Parameters:

Name Type Description Default
src Path

Source path (must exist).

required
dst Path

Destination path (can be a dir or file path).

required

Returns:

Type Description
Path

Effective target path.

Source code in src/sciwork/fs/transfer.py
29
30
31
32
33
34
35
36
37
38
39
40
41
@staticmethod
def _resolve_transfer_target(src: Path, dst: Path) -> Path:
	"""
	Compute the effective target path for a transfer.

	If *dst* exists and is a directory, returns ``dst / src.name``.
	Otherwise, returns *dst* as-is.

	:param src: Source path (must exist).
	:param dst: Destination path (can be a dir or file path).
	:return: Effective target path.
	"""
	return (dst / src.name) if (dst.exists() and dst.is_dir()) else dst

prepare_transfer(source, destination, *, overwrite, create_parents)

Prepare file transfer by validating the source and destination paths, handling target preparation, and resolving conflicts such as existing files based on the overwrite policy.

Parameters:

Name Type Description Default
source Path

Path object representing the source file or directory.

required
destination Path

Path object representing the destination file or directory.

required
overwrite bool

Boolean indicating whether to overwrite existing target files.

required
create_parents bool

A boolean indicating whether to create non-existent parent directories for the destination path.

required

Returns:

Type Description

A tuple containing the absolute path of the source and the resolved target destination path.

Source code in src/sciwork/fs/transfer.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def prepare_transfer(
		self,
		source: Path,
		destination: Path,
		*,
		overwrite: bool,
		create_parents: bool
):
	"""
	Prepare file transfer by validating the source and destination paths,
	handling target preparation, and resolving conflicts such as existing
	files based on the `overwrite` policy.

	:param source: Path object representing the source file or directory.
	:param destination: Path object representing the destination file or directory.
	:param overwrite: Boolean indicating whether to overwrite existing target files.
	:param create_parents: A boolean indicating whether to create non-existent parent
	    directories for the destination path.
	:return: A tuple containing the absolute path of the source and the resolved
	    target destination path.
	"""
	src = self._abs(source)
	if not src.exists():
		raise FileNotFoundError(f"Source path '{src}' does not exist.")

	dst_in = self._abs(destination)
	target = self._resolve_transfer_target(src, dst_in)

	# refuse clobbering when overwrite=False
	if target.exists() and not overwrite:
		raise FileExistsError(f"Target path '{target}' exists (use overwrite=True).")

	# prep target (parents and optionally clear)
	self._preflight_target(target, overwrite=overwrite, create_parents=create_parents)

	return src, target

transfer(source, destination, *, operation='copy', overwrite=False, preserve_metadata=True, create_parents=True, follow_symlinks=True)

Copy or move a file/directory.

Behavior
  • If destination is an existing directory, the entry is copied/moved into it.
  • If destination looks like a file path, the entry is copied/moved to that path.
  • When overwrite=True, an existing target is removed first (file/symlink or whole tree).
  • move uses :func:shutil.move (cross-device).

Parameters:

Name Type Description Default
source PathLike

Source file or directory.

required
destination PathLike

Destination directory of the final path.

required
operation Literal['copy', 'move']

"copy" (default) or "move".

'copy'
overwrite bool

Remove the existing target first (default: False).

False
preserve_metadata bool

For file copies, use :func:shutil.copy2 (default: True). For directory trees, the copytree uses copy2 when True.

True
create_parents bool

Ensure the parent directory for target exists (default: True).

True
follow_symlinks bool

For file copies, pass through to shutil to follow symlinks (default: True).

True

Returns:

Type Description
Path

The absolute resolved target path.

Raises:

Type Description
FileNotFoundError

Source does not exist.

FileExistsError

Target exists and the overwrite=False.

ValueError

Invalid operation.

PermissionError

On OS-level errors.

OSError

On OS-level errors.

Source code in src/sciwork/fs/transfer.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def transfer(
		self,
		source: PathLike,
		destination: PathLike,
		*,
		operation: Literal["copy", "move"] = "copy",
		overwrite: bool = False,
		preserve_metadata: bool = True,
		create_parents: bool = True,
		follow_symlinks: bool = True
) -> Path:
	"""
	Copy or move a file/directory.

	Behavior
	--------
	* If *destination* is an existing **directory**, the entry is copied/moved *into* it.
	* If *destination* looks like a file path, the entry is copied/moved **to that path**.
	* When ``overwrite=True``, an existing target is removed first (file/symlink or whole tree).
	* ``move`` uses :func:`shutil.move` (cross-device).

	:param source: Source file or directory.
	:param destination: Destination directory of the final path.
	:param operation: ``"copy"`` (default) or ``"move"``.
	:param overwrite: Remove the existing *target* first (default: False).
	:param preserve_metadata: For file copies, use :func:`shutil.copy2` (default: True).
							  For directory trees, the copytree uses ``copy2`` when True.
	:param create_parents: Ensure the parent directory for *target* exists (default: True).
	:param follow_symlinks: For **file copies**, pass through to shutil to follow symlinks (default: True).
	:return: The absolute resolved *target* path.
	:raises FileNotFoundError: Source does not exist.
	:raises FileExistsError: Target exists and the ``overwrite=False``.
	:raises ValueError: Invalid *operation*.
	:raises PermissionError: On OS-level errors.
	:raises OSError: On OS-level errors.
	"""
	src, target = self.prepare_transfer(
		source, destination,
		overwrite=overwrite, create_parents=create_parents
	)

	if self.dry_run:
		LOG.info("[dry-run] %s %s -> %s", operation, src, target)
		return target.resolve()

	try:
		if operation == "copy":
			if src.is_dir():
				self._copy_dir(src, target, preserve_metadata=preserve_metadata)
			else:
				self._copy_file(
					src, target,
					preserve_metadata=preserve_metadata,
					follow_symlinks=follow_symlinks
				)
		elif operation == "move":
			self._move_any(src, target)
		else:
			raise ValueError(f"Unsupported operation (must be 'copy' or 'move': {operation}")
	except PermissionError:
		LOG.exception("Permission denied during %s: %s -> %s", operation, src, target)
		raise
	except OSError as exc:
		LOG.exception("OS error during %s '%s' -> '%s': %s", operation, src, target, exc)
		raise

	LOG.info("%s OK: %s -> %s", operation.capitalize(), src, target)
	return target.resolve()

GetContents

sciwork.fs.getcontents.GetContents(base_dir=None, *, dry_run=False, input_func=None, **kwargs)

Bases: PathOpsBase

Handles directory listing and filtering of files and folders based on various criteria.

This class provides two main APIs: self.get_files_or_folders for getting files and folders as separate lists or getting directory contents with detailed metadata by self.get_contents. It supports filters like pattern matching, time-based cutoffs, and optionally includes EXIF metadata for files.

Source code in src/sciwork/fs/base.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(
		self,
		base_dir: Optional[PathLike] = None,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = None,
		**kwargs
) -> None:
	self.base_dir = Path(base_dir).resolve() if base_dir else Path.cwd()
	self.dry_run = bool(dry_run)
	self.input_func = input_func or input

__iter_matching_entries(root, *, recursive, follow_symlinks, include_hidden, pattern, antipattern, shell_pattern, cutoff_older, cutoff_newer, ignore_errors=True) staticmethod

Yield (entry, stat_result) for items under root that pass name+time filters. Uses lstat() to keep symlink semantics consistent with metadata. For parameters, see :meth:get_contents.

Source code in src/sciwork/fs/getcontents.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@staticmethod
def __iter_matching_entries(
		root: Path,
		*,
		recursive: bool,
		follow_symlinks: bool,
		include_hidden: bool,
		pattern: Optional[str],
		antipattern: Optional[str],
		shell_pattern: Optional[str],
		cutoff_older: Optional[float],
		cutoff_newer: Optional[float],
		ignore_errors: bool = True,
):
	"""
	Yield (entry, stat_result) for items under *root* that pass name+time filters.
	Uses lstat() to keep symlink semantics consistent with metadata.
	For parameters, see :meth:`get_contents`.
	"""
	it = (root.rglob("*") if recursive else root.iterdir())
	if recursive and not follow_symlinks:
		it = (p for p in it if not (p.is_dir() and p.is_symlink()))

	for entry in it:
		name = entry.name
		if not matches_filters(
				name, include_hidden=include_hidden,
				pattern=pattern, antipattern=antipattern, shell_pattern=shell_pattern
		):
			continue
		try:
			st = entry.lstat()
			mtime = float(st.st_mtime)
			if cutoff_older is not None and mtime < cutoff_older:
				continue
			if cutoff_newer is not None and mtime > cutoff_newer:
				continue
		except PermissionError as exc:
			if ignore_errors:
				LOG.warning("Permission denied while stat() '%s': %s", entry, exc)
				continue
			raise

		yield entry, st

_confirm_threshold_once(*, threshold, root)

Ask the user for confirmation when the listing grows past a threshold.

Uses sciwork.console.Prompter.confirm() for a consistent UX.

Source code in src/sciwork/fs/getcontents.py
53
54
55
56
57
58
59
60
def _confirm_threshold_once(self, *, threshold: int, root: Path) -> bool:
	"""
	Ask the user for confirmation when the listing grows past a threshold.
    Uses `sciwork.console.Prompter.confirm()` for a consistent UX.
	"""
	ask = self._pick_prompt(None, confirm=True)
	msg = f"Listing has exceeded {threshold} items in '{root}'. Continue?"
	return ask(msg)

_time_cutoff_pair(older_than, newer_than) staticmethod

Turn older/newer specs into POSIX timestamps (or None).

Source code in src/sciwork/fs/getcontents.py
36
37
38
39
40
41
42
@staticmethod
def _time_cutoff_pair(
		older_than: Optional[Union[float, int, str, datetime]],
		newer_than: Optional[Union[float, int, str, datetime]],
) -> tuple[Optional[float], Optional[float]]:
	"""Turn older/newer specs into POSIX timestamps (or None)."""
	return coerce_time_cutoff(older_than), coerce_time_cutoff(newer_than)

_try_exif(entry) staticmethod

Attach best-effort EXIF (lazy import) for files.

Source code in src/sciwork/fs/getcontents.py
44
45
46
47
48
49
50
51
@staticmethod
def _try_exif(entry: Path) -> Optional[Dict[str, Any]]:
	"""Attach best-effort EXIF (lazy import) for files."""
	try:
		return extract_exif(entry, file_metadata=False)
	except Exception as exc:
		LOG.debug("EXIF unavailable for %s: %s", entry, exc)
		return None

get_contents(folder_path=None, *, recursive=False, include_hidden=True, pattern=None, antipattern=None, shell_pattern=None, follow_symlinks=False, exif=False, max_items=None, confirm_if_over=None, ignore_errors=True, return_absolute_paths=False, older_than=None, newer_than=None)

List directory contents and return a mapping of path → metadata.

Metadata per entry includes: - type: 'file' | 'dir' | 'symlink' - size (bytes, files only) - created / modified / accessed (ISO 8601, UTC) - mime (best-effort via :mod:mimetypes) - ext (file extension, lowercase, including dot) - optional exif (if exif=True and Pillow is available)

Parameters:

Name Type Description Default
folder_path Optional[PathLike]

Folder to list. If None, the inspected folder is set to self.base_dir.

None
recursive bool

Recurse into subdirectories.

False
include_hidden bool

If False, skip dot-entries.

True
pattern Optional[str]

Include only names containing this substring.

None
antipattern Optional[str]

Exclude names containing this substring.

None
shell_pattern Optional[str]

Shell-like pattern for names (e.g., "*.jpg").

None
follow_symlinks bool

If True, follow directory symlinks during recursion.

False
exif bool

If True, try to attach a small EXIF dict for images (best-effort).

False
max_items Optional[int]

Stop after collecting this many items.

None
confirm_if_over Optional[int]

If set (e.g., 500), prompt once via sciwork.console.Prompter when the number of the result meets/exceeds this threshold. Abort listing in the user declines.

None
ignore_errors bool

If True, log and continue on errors; else re-raise.

True
return_absolute_paths bool

If True, keys are absolute paths; otherwise, keys are relative to folder.

False
older_than Optional[Union[float, int, str, datetime]]

Only include entries older that this cutoff (mtime). Accepts seconds-ago (number), ISO 8601 string, duration like '2h', '7d', or a class datetime.datetime.

None
newer_than Optional[Union[float, int, str, datetime]]

Only include entries newer that this cutoff (mtime). Same accepted formats as older_than.

None

Returns:

Type Description
Dict[str, Dict[str, Any]]

Dict[path_str, dict] mapping to metadata.

Raises:

Type Description
FileNotFoundError

If folder does not exist.

NotADirectoryError

If the path is not a directory.

Source code in src/sciwork/fs/getcontents.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def get_contents(
		self,
		folder_path: Optional[PathLike] = None,
		*,
		recursive: bool = False,
		include_hidden: bool = True,
		pattern: Optional[str] = None,
		antipattern: Optional[str] = None,
		shell_pattern: Optional[str] = None,
		follow_symlinks: bool = False,
		exif: bool = False,
		max_items: Optional[int] = None,
		confirm_if_over: Optional[int] = None,
		ignore_errors: bool = True,
		return_absolute_paths: bool = False,
		older_than: Optional[Union[float, int, str, datetime]] = None,
		newer_than: Optional[Union[float, int, str, datetime]] = None
) -> Dict[str, Dict[str, Any]]:
	"""
	List directory contents and return a mapping of *path* → metadata.

	Metadata per entry includes:
		- ``type``: 'file' | 'dir' | 'symlink'
		- ``size`` (bytes, files only)
		- ``created`` / ``modified`` / ``accessed`` (ISO 8601, UTC)
		- ``mime`` (best-effort via :mod:`mimetypes`)
		- ``ext`` (file  extension, lowercase, including dot)
		- optional ``exif`` (if ``exif=True`` and Pillow is available)

	:param folder_path: Folder to list.
						If ``None``, the inspected folder is set to ``self.base_dir``.
	:param recursive: Recurse into subdirectories.
	:param include_hidden: If False, skip dot-entries.
	:param pattern: Include only names containing this substring.
	:param antipattern: Exclude names containing this substring.
	:param shell_pattern: Shell-like pattern for names (e.g., ``"*.jpg"``).
	:param follow_symlinks: If True, follow directory symlinks during recursion.
	:param exif: If True, try to attach a small EXIF dict for images (best-effort).
	:param max_items: Stop after collecting this many items.
	:param confirm_if_over: If set (e.g., 500), prompt once via ``sciwork.console.Prompter``
							when the number of the result meets/exceeds this threshold.
							Abort listing in the user declines.
	:param ignore_errors: If True, log and continue on errors; else re-raise.
	:param return_absolute_paths: If True, keys are absolute paths; otherwise,
								  keys are relative to *folder*.
	:param older_than: Only include entries **older** that this cutoff (mtime).
					   Accepts seconds-ago (number), ISO 8601 string, duration
					   like '2h', '7d', or a class `datetime.datetime`.
	:param newer_than: Only include entries **newer** that this cutoff (mtime).
					   Same accepted formats as ``older_than``.
	:return: Dict[path_str, dict] mapping to metadata.
	:raises FileNotFoundError: If *folder* does not exist.
	:raises NotADirectoryError: If the path is not a directory.
	"""
	inspected = folder_path if folder_path else self.base_dir
	dirs = Dirs(inspected)

	if dirs.is_folder_empty(folder_path):
		LOG.info("Folder %s is empty", folder_path)
		return {}

	root = dirs.try_get_dir(folder_path)

	cutoff_older, cutoff_newer = self._time_cutoff_pair(older_than, newer_than)

	result: Dict[str, Dict[str, Any]] = {}
	stop_listing = False
	confirmed = (confirm_if_over is None)  # if a threshold not set, we're implicitly confirmed

	def _add(entry: Path, sst) -> None:
		nonlocal result, stop_listing, confirmed
		if stop_listing:
			return
		info = build_metadata(entry, st=sst)
		if exif and info.get("type") == "file":
			ex = self._try_exif(entry)
			if ex:
				info["exif"] = ex
		key = str(entry if return_absolute_paths else entry.relative_to(root))
		result[key] = info

		# threshold confirmation (once)
		if not confirmed and confirm_if_over is not None and len(result) >= confirm_if_over:
			if not self._confirm_threshold_once(threshold=confirm_if_over, root=root):
				LOG.warning("User declined listing beyond %d items in %s", confirm_if_over, root)
				stop_listing = True
				return
			LOG.info("User confirmed listing beyond %d items in %s", confirm_if_over, root)
			confirmed = True

	for e, st in self.__iter_matching_entries(
			root, recursive=recursive, follow_symlinks=follow_symlinks,
			include_hidden=include_hidden, pattern=pattern, antipattern=antipattern,
			shell_pattern=shell_pattern, cutoff_older=cutoff_older, cutoff_newer=cutoff_newer,
			ignore_errors=ignore_errors
	):
		if stop_listing:
			break
		_add(e, st)
		if stop_listing:
			break
		if max_items is not None and len(result) >= max_items:
			LOG.info("Hit max_items=%d; stopping listing in %s", max_items, root)
			break

	LOG.info("Listed %d entr%s from: %s", len(result), "y" if len(result) == 1 else "ies", root)
	return result

get_files_and_folders(folder_path=None, *, recursive=False, include_hidden=True, pattern=None, antipattern=None, shell_pattern=None, follow_symlinks=False, return_absolute_paths=False, max_items=None, older_than=None, newer_than=None)

Return two lists (files, folders) with paths matching the filters.

The method mirrors the filtering knobs of :meth:get_contents but collects only paths (no metadata), which is both faster and simpler to consume when you just need the file/directory lists.

Parameters:

Name Type Description Default
folder_path Optional[PathLike]

Folder to list. If None, the inspected folder is set to self.base_dir.

None
recursive bool

Recurse into subdirectories.

False
include_hidden bool

If False, skip dot-entries.

True
pattern Optional[str]

Include only names containing this substring.

None
antipattern Optional[str]

Exclude names containing this substring.

None
shell_pattern Optional[str]

Shell-like pattern for names (e.g., "*.jpg").

None
follow_symlinks bool

If True, follow directory symlinks during recursion.

False
return_absolute_paths bool

If True, keys are absolute paths; otherwise, keys are relative to folder.

False
max_items Optional[int]

Stop after collecting this many items.

None
older_than Optional[Union[float, int, str, datetime]]

Only include entries older that this cutoff (mtime). Accepts seconds-ago (number), ISO 8601 string, duration like '2h', '7d', or a class datetime.datetime.

None
newer_than Optional[Union[float, int, str, datetime]]

Only include entries newer that this cutoff (mtime). Same accepted formats as older_than.

None

Returns:

Type Description
Dict[str, list[Path]] | None
Source code in src/sciwork/fs/getcontents.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def get_files_and_folders(
		self,
		folder_path: Optional[PathLike] = None,
		*,
		recursive: bool = False,
		include_hidden: bool = True,
		pattern: Optional[str] = None,
		antipattern: Optional[str] = None,
		shell_pattern: Optional[str] = None,
		follow_symlinks: bool = False,
		return_absolute_paths: bool = False,
		max_items: Optional[int] = None,
		older_than: Optional[Union[float, int, str, datetime]] = None,
		newer_than: Optional[Union[float, int, str, datetime]] = None
) -> Dict[str, list[Path]] | None:
	"""
	Return two lists ``(files, folders)`` with paths matching the filters.

	The method mirrors the filtering knobs of :meth:`get_contents` but
	collects *only* paths (no metadata), which is both faster and simpler
	to consume when you just need the file/directory lists.

	:param folder_path: Folder to list.
						If ``None``, the inspected folder is set to ``self.base_dir``.
	:param recursive: Recurse into subdirectories.
	:param include_hidden: If False, skip dot-entries.
	:param pattern: Include only names containing this substring.
	:param antipattern: Exclude names containing this substring.
	:param shell_pattern: Shell-like pattern for names (e.g., ``"*.jpg"``).
	:param follow_symlinks: If True, follow directory symlinks during recursion.
	:param return_absolute_paths: If True, keys are absolute paths; otherwise,
								  keys are relative to *folder*.
	:param max_items: Stop after collecting this many items.
	:param older_than: Only include entries **older** that this cutoff (mtime).
					   Accepts seconds-ago (number), ISO 8601 string, duration
					   like '2h', '7d', or a class `datetime.datetime`.
	:param newer_than: Only include entries **newer** that this cutoff (mtime).
					   Same accepted formats as ``older_than``.
	:return:
	"""
	inspected = folder_path if folder_path else self.base_dir
	dirs = Dirs(inspected)

	if dirs.is_folder_empty(folder_path):
		LOG.info("Folder %s is empty", folder_path)
		return {"files": [], "folders": []}

	root = dirs.try_get_dir(folder_path)

	cutoff_older, cutoff_newer = self._time_cutoff_pair(older_than, newer_than)

	files: list[Path] = []
	folders: list[Path] = []

	for entry, _st in self.__iter_matching_entries(
			root, recursive=recursive, follow_symlinks=follow_symlinks,
			include_hidden=include_hidden, pattern=pattern, antipattern=antipattern,
			shell_pattern=shell_pattern, cutoff_older=cutoff_older, cutoff_newer=cutoff_newer,
			ignore_errors=True
	):
		key = entry.resolve() if return_absolute_paths else entry.relative_to(root)
		(folders if entry.is_dir() else files).append(key)

		if max_items is not None and len(files) + len(folders) >= max_items:
			LOG.info("Hit max_items=%d; stopping listing in %s", max_items, root)
			break

	LOG.info("Collected %d files and %d folders from: %s", len(files), len(folders), root)
	return {"files": files, "folders": folders}

Select

sciwork.fs.select.Select(base_dir=None, *, dry_run=False, input_func=None, **kwargs)

Bases: PathOpsBase

High-level selector built on top of class:PathOpsBase.

Reuses a listing/filtering pipeline (get_files_and_folders and get_contents) and adds sorting and single/multiple selection. Programmatic via indices or interactive via :class:sciwork.console.Prompter.

Source code in src/sciwork/fs/base.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(
		self,
		base_dir: Optional[PathLike] = None,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = None,
		**kwargs
) -> None:
	self.base_dir = Path(base_dir).resolve() if base_dir else Path.cwd()
	self.dry_run = bool(dry_run)
	self.input_func = input_func or input

_collect_fast(root, *, recursive, include_hidden, follow_symlinks, pattern, antipattern, shell_pattern, path_type, allowed_exts) staticmethod

Collect only paths (no metadata) via :meth:get_files_and_folders.

Source code in src/sciwork/fs/select.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@staticmethod
def _collect_fast(
		root: Path,
		*,
		recursive: bool,
		include_hidden: bool,
		follow_symlinks: bool,
		pattern: Optional[str],
		antipattern: Optional[str],
		shell_pattern: Optional[str],
		path_type: str,
		allowed_exts: Optional[Iterable[str]],
) -> List[Path]:
	"""Collect only paths (no metadata) via :meth:`get_files_and_folders`."""
	ff_dict = GetContents().get_files_and_folders(
		root,
		recursive=recursive,
		include_hidden=include_hidden,
		follow_symlinks=follow_symlinks,
		pattern=pattern,
		antipattern=antipattern,
		shell_pattern=shell_pattern,
		return_absolute_paths=True
	)
	files, folders = ff_dict["files"], ff_dict["folders"]
	if allowed_exts:
		norms = norm_exts(allowed_exts)
		files = [p for p in files if p.suffix.lower() in norms]

	out = files if path_type == "files" else folders if path_type == "folders" else (files + folders)
	return [p if isinstance(p, Path) else Path(p) for p in out]

_collect_with_meta(root, *, recursive, include_hidden, follow_symlinks, pattern, antipattern, shell_pattern, path_type, allowed_exts) staticmethod

Collect (path, metadata) pairs via :meth:get_contents and filter by type/extensions.

Source code in src/sciwork/fs/select.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@staticmethod
def _collect_with_meta(
		root: Path,
		*,
		recursive: bool,
		include_hidden: bool,
		follow_symlinks: bool,
		pattern: Optional[str],
		antipattern: Optional[str],
		shell_pattern: Optional[str],
		path_type: str,
		allowed_exts: Optional[Iterable[str]],
) -> List[Tuple[Path, dict]]:
	"""
	Collect (path, metadata) pairs via :meth:`get_contents` and filter by
	type/extensions.
	"""
	listing = GetContents().get_contents(
		root,
		recursive=recursive,
		include_hidden=include_hidden,
		follow_symlinks=follow_symlinks,
		pattern=pattern,
		antipattern=antipattern,
		shell_pattern=shell_pattern,
		exif=False,
		max_items=None,
		ignore_errors=True,
		return_absolute_paths=True
	)
	if not listing:
		return []

	norms = norm_exts(allowed_exts or [])
	items: List[Tuple[Path, dict]] = []
	for p_str, meta in listing.items():
		p = Path(p_str)
		kind = meta.get("type")
		if path_type == "files" and kind != "file":
			continue
		if path_type == "folders" and kind != "dir":
			continue
		if allowed_exts and kind == "file" and p.suffix.lower() not in norms:
			continue
		items.append((p, meta))
	return items

_prompt_texts(root, candidates, selection_type) staticmethod

Build a list of lines and a hint for the prompt.

Source code in src/sciwork/fs/select.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@staticmethod
def _prompt_texts(root: Path, candidates: list[Path], selection_type: Literal["one", "many"]) -> Tuple[list[str], str]:
	"""Build a list of lines and a hint for the prompt."""
	instruction = "Please select one" if selection_type == "one" else "Please select many"
	prompt_c = prompter.palette.get("prompt", "") if prompter else ""
	reset_c = prompter.palette.get("reset", "") if prompter else ""
	hint_c = prompter.palette.get("hint", "") if prompter else ""

	lines = [f"{prompt_c}\nMultiple entries found in '{root}'. {instruction}:{reset_c}\n"]
	width = len(str(len(candidates)))
	for i, p in enumerate(candidates, 1):
		suffix = " (dir)" if p.is_dir() else ""
		lines.append(f"{i:>{width}}: {p.name}{suffix}")
	lines.append("")
	hint = (f"Enter a number {hint_c}[1...{len(candidates)}]" if selection_type == "one"
	        else f"Enter indices {hint_c}(comma/range, e.g. 1,3,5-7) within 1..{len(candidates)}")
	return lines, hint

_select_many(candidates, *, root, default_indices, prompt_text)

Pick multiple paths.

Accepts comma-separated indices and ranges in the prompt, e.g.: 1,3,5-7. Indices are 1-based.

Source code in src/sciwork/fs/select.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def _select_many(
		self,
		candidates: List[Path],
		*,
		root: Path,
		default_indices: Optional[Iterable[int]],
		prompt_text: Optional[str]
) -> List[Path]:
	"""
	Pick multiple paths.

	Accepts comma-separated indices and ranges in the prompt, e.g.:
	``1,3,5-7``. Indices are 1-based.
	"""
	if default_indices:
		indices = normalize_indices(default_indices, len(candidates))
		selected = [candidates[i - 1].resolve() for i in indices]
		LOG.info("Selected by default_indices %s (%d items)", list(indices), len(selected))
		return selected

	lines, hint = self._prompt_texts(root, candidates, "many")

	if prompter is not None:
		prompter.print_lines(lines)
	else:
		print([f"{line}\n" for line in lines])

	ptxt = prompt_text or hint

	def _transform_list(s: str):
		return parse_index_list(s, len(candidates))

	def _validate_list(lst: list[int]):
		if not lst:
			raise ValueError("No indices selected")
		n = len(candidates)
		if any((i < 1 or i > n) for i in lst):
			raise ValueError(f"Enter indices within 1...{n}.")

	if prompter is not None:
		indices: list[int] = prompter.prompt(
			ptxt,
			transform=_transform_list,
			validate=_validate_list,
			allow_empty=False,
			retries=3
		)
	else:
		indices = [int(i) for i in input(ptxt).split(",")]

	selected = [candidates[int(i) - 1].resolve() for i in indices]
	LOG.info("Selected by prompt %s (%d items)", indices, len(selected))
	return selected

_select_one(candidates, *, root, default_index, prompt_text)

Pick a single path from the candidates (index or interactive).

Source code in src/sciwork/fs/select.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def _select_one(
		self,
		candidates: List[Path],
		*,
		root: Path,
		default_index: Optional[int],
		prompt_text: Optional[str]
) -> Path:
	"""Pick a single path from the candidates (index or interactive)."""
	if len(candidates) == 1 and default_index is None:
		LOG.info("Single candidate in '%s': %s", root, candidates[0])
		return candidates[0].resolve()

	if default_index is not None:
		if not (1 <= default_index <= len(candidates)):
			raise ValueError(f"default_index out of range (1..{len(candidates)}), got {default_index}")
		sel = candidates[default_index - 1]
		LOG.info("Selected by default_index %d: %s", default_index, sel)
		return sel.resolve()

	lines, hint = self._prompt_texts(root, candidates, "one")
	prompter.print_lines(lines)
	ptxt = prompt_text or hint

	def _validate_num(s: str):
		number = int(s)
		if not (1 <= number <= len(candidates)):
			raise ValueError(f"Enter 1...{len(candidates)}")

	if prompter is not None:
		n = prompter.prompt(ptxt, validate=_validate_num, allow_empty=False, retries=3)
	else:
		n = input(ptxt)

	sel = candidates[int(n) - 1]
	LOG.info("Selected by prompt: %s", sel)
	return sel.resolve()

select_paths(folder_path=None, *, recursive=False, include_hidden=True, follow_symlinks=False, pattern=None, antipattern=None, shell_pattern=None, sort_by='name', path_type='files', allowed_exts=None, descending=False, multiple=False, default_index=None, default_indices=None, prompt_text=None, return_absolute_paths=False)

Selector one or many paths from a folder using the existing listing/filtering pipeline.

Strategy
  • If sorting is name/ext only → use a fast path via :meth:get_files_and_folders.
  • If sorting is ctime/mtime/size → use :meth:get_contents. (needs metadata)
  • A single candidate returns immediately; otherwise select by index/indices, or interactively via :class:sciwork.console.Prompter.

Parameters:

Name Type Description Default
folder_path Optional[PathLike]

Root directory (absolute or relative to base_dir). If None, folder_path defaults to self.base_dir.

None
recursive bool

Recurse into subdirectories.

False
include_hidden bool

Include dot-entries.

True
follow_symlinks bool

Follow directory symlinks during recursion.

False
pattern Optional[str]

Text pattern to be included in the file names.

None
antipattern Optional[str]

Text pattern to be excluded from the file names.

None
shell_pattern Optional[str]

Shell-like pattern to be included in the file names.

None
sort_by Literal['name', 'ctime', 'mtime', 'size', 'ext', 'exts']

'name' (default), 'ctime', 'mtime', 'size', 'ext', ('exts' is an alias).

'name'
path_type Literal['files', 'folders', 'any']

'files' (default), 'folders', 'any'.

'files'
allowed_exts Optional[Iterable[str]]

Allowed extensions for files (e.g., ['.csv', '.jpg']).

None
descending bool

Sort descending.

False
multiple bool

If True, allow selecting multiple entries (returns a list).

False
default_index Optional[int]

Single selection (1-based). If set, bypasses the prompt.

None
default_indices Optional[Iterable[int]]

Multiple selection (1-based). If set, bypasses the prompt.

None
prompt_text Optional[str]

Custom prompt text (when prompting is needed).

None
return_absolute_paths bool

If False, results are returned relative to the chosen root.

False

Returns:

Type Description
Union[Path, List[Path]]

Either a single selected path (multiple=False), or a list of paths (multiple=True).

Raises:

Type Description
FileNotFoundError

When no matching entries are found.

ValueError

On invalid parameters, selection or index is out of range.

Source code in src/sciwork/fs/select.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def select_paths(
		self,
		folder_path: Optional[PathLike] = None,
		*,
		recursive: bool = False,
		include_hidden: bool = True,
		follow_symlinks: bool = False,
		pattern: Optional[str] = None,
		antipattern: Optional[str] = None,
		shell_pattern: Optional[str] = None,
		sort_by: Literal['name', 'ctime', 'mtime', 'size', 'ext', 'exts'] = "name",
		path_type: Literal['files', 'folders', 'any'] = "files",
		allowed_exts: Optional[Iterable[str]] = None,
		descending: bool = False,
		multiple: bool = False,  # allow selecting multiple entries
		default_index: Optional[int] = None,  # single selection (1-based)
		default_indices: Optional[Iterable[int]] = None,  # multiple selection (1-based)
		prompt_text: Optional[str] = None,
		return_absolute_paths: bool = False
) -> Union[Path, List[Path]]:
	"""
	Selector one or many paths from a folder using the existing listing/filtering pipeline.

	Strategy
	--------
	* If sorting is ``name``/``ext`` only → use a fast path via
	  :meth:`get_files_and_folders`.
	* If sorting is ``ctime``/``mtime``/``size`` → use :meth:`get_contents`.
	  (needs metadata)
	* A single candidate returns immediately; otherwise select by index/indices, or
	  interactively via :class:`sciwork.console.Prompter`.

	:param folder_path: Root directory (absolute or relative to ``base_dir``).
						If ``None``, `folder_path` defaults to ``self.base_dir``.
	:param recursive: Recurse into subdirectories.
	:param include_hidden: Include dot-entries.
	:param follow_symlinks: Follow directory symlinks during recursion.
	:param pattern: Text pattern to be included in the file names.
	:param antipattern: Text pattern to be excluded from the file names.
	:param shell_pattern: Shell-like pattern to be included in the file names.
	:param sort_by: ``'name'`` (default), ``'ctime'``, ``'mtime'``, ``'size'``, ``'ext'``, (``'exts'`` is an alias).
	:param path_type: ``'files'`` (default), ``'folders'``, ``'any'``.
	:param allowed_exts: Allowed extensions for files (e.g., ``['.csv', '.jpg']``).
	:param descending: Sort descending.
	:param multiple: If ``True``, allow selecting **multiple** entries (returns a list).
	:param default_index: Single selection (1-based). If set, bypasses the prompt.
	:param default_indices: Multiple selection (1-based). If set, bypasses the prompt.
	:param prompt_text: Custom prompt text (when prompting is needed).
	:param return_absolute_paths: If ``False``, results are returned relative to the chosen root.
	:return: Either a single selected path (``multiple=False``), or a list of paths (``multiple=True``).
	:raises FileNotFoundError: When no matching entries are found.
	:raises ValueError: On invalid parameters, selection or index is out of range.
	"""
	try:
		from .dirs import Dirs
	except ImportError:
		LOG.error("'Select.select_paths' method requires the optional dependency 'sciwork.fs.dirs.Dirs' to work.")
		raise

	root = Dirs().try_get_dir(folder_path) if folder_path else self.base_dir

	sort_norm = sort_by.lower()
	if sort_norm == "exts":
		sort_norm = "ext"
	if path_type not in {"files", "folders", "any"}:
		raise ValueError(f"path_type must be one of: 'files', 'folders', 'any': {path_type}")
	need_meta = sort_norm in {"ctime", "mtime", "size"}

	# 1) Collect candidates
	if need_meta:
		candidates = self._collect_with_meta(
			root,
			recursive=recursive,
			include_hidden=include_hidden,
			follow_symlinks=follow_symlinks,
			pattern=pattern,
			antipattern=antipattern,
			shell_pattern=shell_pattern,
			path_type=path_type,
			allowed_exts=allowed_exts
		)
		candidates = sort_with_meta(candidates, sort_norm, descending)
		cand_paths = [p for p, _ in candidates]
	else:
		cand_paths = self._collect_fast(
			root,
			recursive=recursive,
			include_hidden=include_hidden,
			follow_symlinks=follow_symlinks,
			pattern=pattern,
			antipattern=antipattern,
			shell_pattern=shell_pattern,
			path_type=path_type,
			allowed_exts=allowed_exts
		)
		cand_paths = sort_fast(cand_paths, sort_norm, descending)

	if not cand_paths:
		raise FileNotFoundError(f"No matching entries found in '{root}'")

	# 2) Selection
	if multiple:
		selected = self._select_many(
			cand_paths,
			root=root,
			default_indices=default_indices,
			prompt_text=prompt_text
		)
		return maybe_rel_list(selected, root, return_absolute_paths)

	# single selection
	chosen = self._select_one(
		cand_paths,
		root=root,
		default_index=default_index,
		prompt_text=prompt_text
	)
	return maybe_rel_one(chosen, root, return_absolute_paths)

Open

sciwork.fs.open.Open(base_dir=None, *, dry_run=False, input_func=None, **kwargs)

Bases: PathOpsBase

Cross-platform helpers to open files/folders in the system explorer/viewer.

Expects the host class to provide: - self._abs(path) from :class:PathOpsBase - self.dry_run from :class:PathOpsBase - Prompter().confirm(...) from :class:sciwork.console.Prompter

Source code in src/sciwork/fs/base.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(
		self,
		base_dir: Optional[PathLike] = None,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = None,
		**kwargs
) -> None:
	self.base_dir = Path(base_dir).resolve() if base_dir else Path.cwd()
	self.dry_run = bool(dry_run)
	self.input_func = input_func or input

_system_open(target) staticmethod

Open target (folder/file) in the system's file explorer/viewer.

Returns a Popen if the opener is a regular process that we can wait on. On Windows, os.startfile is preferred and returns None (detached).

Parameters:

Name Type Description Default
target Path

The resolved path to open.

required

Returns:

Type Description
Optional[Popen]

subprocess.Popen.

Raises:

Type Description
RunTimeError

Unsupported OS or no opener found.

OSError

spawn/permission errors.

Source code in src/sciwork/fs/open.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@staticmethod
def _system_open(target: Path) -> Optional[subprocess.Popen]:
	"""
	Open *target* (folder/file) in the system's file explorer/viewer.

	Returns a Popen if the opener is a regular process that we can wait on.
	On Windows, `os.startfile` is preferred and returns None (detached).

	:param target: The resolved path to open.
	:return: subprocess.Popen.
	:raises RunTimeError: Unsupported OS or no opener found.
	:raises OSError: spawn/permission errors.
	"""
	target = Path(target)
	sysname = platform.system()

	try:
		if sysname == "Windows":
			# Prefer the native API; it usually detaches (no Popen to wait on)
			try:
				os.startfile(str(target))  # ignore[attr-defined]
				return None
			except AttributeError:
				# Rare: non-CPython-on-Windows without startfile → fall back
				proc = subprocess.Popen(["explorer", str(target)])
				return proc

		elif sysname == "Darwin":
			# LaunchServices
			proc = subprocess.Popen(["open", str(target)])
			return proc

		elif sysname == "Linux":
			# Try a few common openers; `gio open <path>` needs the subcommand
			candidates: List[Tuple[str, List[str]]] = []
			if shutil.which("xdg-open"):
				candidates.append(("xdg-open", ["xdg-open", str(target)]))
			if shutil.which("gio"):
				candidates.append(("gio", ["gio", "open", str(target)]))
			if shutil.which("kde-open"):
				candidates.append(("kde-open", ["kde-open", str(target)]))
			if shutil.which("gnome-open"):
				candidates.append(("gnome-open", ["gnome-open", str(target)]))

			for _, cmd in candidates:
				try:
					proc = subprocess.Popen(cmd)
					return proc
				except Exception:
					continue

			raise RuntimeError("No system opener found (tried xdg-open/gio/kde-open/gnome-open).")

		else:
			raise RuntimeError(f"Unsupported operating system: {sysname}")
	except Exception as exc:
		LOG.exception("Failed to open system explorer for %s: %s", target, exc)
		raise

open_folder_and_wait(folder_path, *, confirm_manual=True, wait=False, timeout=None)

Open a folder in the system file explorer and optionally wait/confirms.

Behavior: - Validates the path exists and is a directory. - Respects base_dir and dry_run. - If wait=True, blocks until the spawned opener process exits (not usual; most openers detach-on Win/Mac the opener returns immediately). - If confirm_manual=True, asks the user to press Enter to continue.

Parameters:

Name Type Description Default
folder_path PathLike

Folder to open (absolute or relative to base_dir).

required
confirm_manual bool

Prompt the user to continue after opening.

True
wait bool

Try to wait for the opener to exit (best-effort; many GUIs detach).

False
timeout Optional[float]

Optional timeout for waiting (seconds).

None

Returns:

Type Description
Path

The resolved absolute folder path.

Raises:

Type Description
FileNotFoundError

Folder does not exist.

NotADirectoryError

Path exists but is not a directory.

RuntimeError

Unsupported OS or no opener available.

OSError

OS-level failures (permissions, spawn errors).

Source code in src/sciwork/fs/open.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def open_folder_and_wait(
		self,
		folder_path: PathLike,
		*,
		confirm_manual: bool = True,
		wait: bool = False,
		timeout: Optional[float] = None
) -> Path:
	"""
	Open a folder in the system file explorer and optionally wait/confirms.

	Behavior:
		- Validates the path exists and is a directory.
		- Respects ``base_dir`` and ``dry_run``.
		- If ``wait=True``, blocks until the spawned opener process exits (not usual;
			most openers detach-on Win/Mac the opener returns immediately).
		- If ``confirm_manual=True``, asks the user to press Enter to continue.

	:param folder_path: Folder to open (absolute or relative to ``base_dir``).
	:param confirm_manual: Prompt the user to continue after opening.
	:param wait: Try to wait for the opener to exit (best-effort; many GUIs detach).
	:param timeout: Optional timeout for waiting (seconds).
	:return: The resolved absolute folder path.
	:raises FileNotFoundError: Folder does not exist.
	:raises NotADirectoryError: Path exists but is not a directory.
	:raises RuntimeError: Unsupported OS or no opener available.
	:raises OSError: OS-level failures (permissions, spawn errors).
	"""
	target = self._abs(folder_path)
	if not target.exists():
		raise FileNotFoundError(f"Folder does not exist: {target}")
	if not target.is_dir():
		raise NotADirectoryError(f"Path is not a directory: {target}")

	if getattr(self, "dry_run"):
		LOG.info("[dry-run] open folder: %s", target)
		return target.resolve()

	LOG.info("Opening folder: %s", target)
	proc = self._system_open(target)

	# Optional wait for the spawned process (best-effort)
	if wait and proc is not None:
		try:
			proc.wait(timeout=timeout)
		except subprocess.TimeoutExpired:
			LOG.warning("Opener did not exit within %.1fs for %s", timeout or 0.0, target)

	if confirm_manual:
		ask = self._pick_prompt(None, confirm=True)
		msg = "Folder is open. Press Enter (Y) to continue."
		ask(msg)
		LOG.info("User confirmed continuation.")

	return target.resolve()

TreeOps

sciwork.fs.trees.TreeOps(base_dir=None, *, dry_run=False, input_func=None, **kwargs)

Bases: PathOpsBase

Filesystem tree builder.

Source code in src/sciwork/fs/base.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(
		self,
		base_dir: Optional[PathLike] = None,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = None,
		**kwargs
) -> None:
	self.base_dir = Path(base_dir).resolve() if base_dir else Path.cwd()
	self.dry_run = bool(dry_run)
	self.input_func = input_func or input

build_tree(tree_dict, *, file_mode='a')

Create a filesystem tree based on an adjacency dictionary.

Input shape - tree_dict["parents"]: list of root components (strings) - For each internal node: parent_name: [child1, child2, ...] - Each leaf is the last component of the branch. If it looks like a file (it has known suffix), creates an empty file, otherwise creates a directory.

Example:: { "parents": ["data"], "data": ["raw", "processed"], "raw": ["in.csv", "out.txt"], "processed": ["table.xlsx"] }

Parameters:

Name Type Description Default
tree_dict Dict[str, List[str]]

Adjacency mapping with a required 'parents' key.

required
file_mode str

Mode passed to :meth:create_file for leaf files ('a'|'w'|'x').

'a'

Returns:

Type Description
Dict[str, Path]

Mapping leaf name -> absolute Path created (dir or file).

Raises:

Type Description
KeyError

If 'parents' key is missing.

ValueError

On invalid topology (cycles, multiple parents, orphan nodes).

Source code in src/sciwork/fs/trees.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def build_tree(
		self,
		tree_dict: Dict[str, List[str]],
		*,
		file_mode: str = "a"
) -> Dict[str, Path]:
	"""
	Create a filesystem tree based on an adjacency dictionary.

	**Input shape**
		- ``tree_dict["parents"]``: list of root components (strings)
		- For each internal node: ``parent_name: [child1, child2, ...]``
		- Each *leaf* is the last component of the branch. If it looks like a file
			(it has known suffix), creates an empty file, otherwise creates a directory.

	Example::
		{
			"parents": ["data"],
			"data": ["raw", "processed"],
			"raw": ["in.csv", "out.txt"],
			"processed": ["table.xlsx"]
		}

	:param tree_dict: Adjacency mapping with a required 'parents' key.
	:param file_mode: Mode passed to :meth:`create_file` for leaf files ('a'|'w'|'x').
	:return: Mapping leaf name -> absolute Path created (dir or file).
	:raises KeyError: If 'parents' key is missing.
	:raises ValueError: On invalid topology (cycles, multiple parents, orphan nodes).
	"""
	if "parents" not in tree_dict:
		raise KeyError("tree_dict must contain a 'parents' key with root nodes.")
	roots = list(tree_dict["parents"])
	if not isinstance(roots, list) or not roots:
		raise ValueError("'parents' must be a non-empty list of root components.")

	# Determine leaves: children that never appear as keys (i.e., not parents)
	all_children = {c for k, vals in tree_dict.items() if k != "parents" for c in vals}
	leaves = sorted(all_children - set(tree_dict.keys()))  # children that are not parents themselves

	created: Dict[str, Path] = {}
	for leaf in leaves:
		components = build_branch(leaf, tree_dict, roots)
		target = self.base_dir.joinpath(*components)

		if looks_like_file(leaf):
			# ensure parent and create a file
			Create().create_file(target, op=file_mode, create_parents=True)
			kind = "file"
		else:
			Create().make_folder(target, exist_ok=True)
			kind = "folder"

		abs_path = target.resolve()
		created[leaf] = abs_path
		LOG.info("Created %s: %s", kind, abs_path)

	return created

Archives

sciwork.fs.archives.Archives(base_dir=None, *, dry_run=False, input_func=None, **kwargs)

Bases: PathOpsBase

Archive utilities: safe extraction and creation of archives.

  • Heavy modules are imported lazily in helpers (zipfile, pyzipper, tarfile, rarfile).
  • Safe extraction with traversal protection via :func:assert_within_dir.
  • Respects :attr:dry_run.
Source code in src/sciwork/fs/base.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(
		self,
		base_dir: Optional[PathLike] = None,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = None,
		**kwargs
) -> None:
	self.base_dir = Path(base_dir).resolve() if base_dir else Path.cwd()
	self.dry_run = bool(dry_run)
	self.input_func = input_func or input

_default_ext_for(format_name) staticmethod

Return the default extension for a given format name.

Source code in src/sciwork/fs/archives.py
107
108
109
110
@staticmethod
def _default_ext_for(format_name: Literal["zip", "tar", "gztar", "bztar", "xztar"]) -> str:
	"""Return the default extension for a given format name."""
	return ARCHIVE_SUFFIXES.get(format_name)

_destination_dir(src, extract_to, *, overwrite)

Figure out (and prepare) the output directory for extraction.

Source code in src/sciwork/fs/archives.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def _destination_dir(
		self, src: Path, extract_to: Optional[PathLike], *, overwrite: bool
) -> Path:
	"""Figure out (and prepare) the output directory for extraction."""
	dest = self._abs(extract_to) if extract_to else src.with_suffix("")
	if dest.exists():
		# use Dirs.is_folder_empty for consistency
		try:
			try:
				from .dirs import Dirs
			except Exception:
				LOG.error("extract_archive requires the optional dependency 'sciwork.fs.Dirs' to work.")
				raise
			empty = Dirs().is_folder_empty(dest)
		except Exception:
			empty = False
		if not empty and not overwrite:
			raise FileExistsError(f"Destination not empty: '{dest}' (use overwrite=True).")
	else:
		if not self.dry_run:
			if dest.suffix == ".tar":
				dest = dest.with_suffix("")
			dest.mkdir(parents=True, exist_ok=True)
	return dest

_ensure_archive_suffix(path, arch_format)

Ensure the archive path has the correct suffix.

Source code in src/sciwork/fs/archives.py
112
113
114
115
116
117
118
def _ensure_archive_suffix(self, path: Path, arch_format: Literal["zip", "tar", "gztar", "bztar", "xztar"]) -> Path:
	"""Ensure the archive path has the correct suffix."""
	want = self._default_ext_for(arch_format)
	s = str(path)
	if s.lower().endswith(tuple(ARCHIVE_SUFFIXES.values())):
		return path
	return Path(s + want)

_extract_rar(src, dest, *, password, safe) staticmethod

Extract RAR archive.

Source code in src/sciwork/fs/archives.py
 97
 98
 99
100
101
102
103
104
@staticmethod
def _extract_rar(src: Path, dest: Path, *, password: Optional[str], safe: bool) -> None:
	"""Extract RAR archive."""
	with RARFILE.RarFile(str(src), "r") as rf:  # type: ignore[attr-defined]
		if safe:
			safe_extract_rar(rf, dest, password=password)
		else:
			rf.extractall(path=str(dest), pwd=password)

_extract_tar(src, dest, *, safe) staticmethod

Extract TAR archive.

Source code in src/sciwork/fs/archives.py
88
89
90
91
92
93
94
95
@staticmethod
def _extract_tar(src: Path, dest: Path, *, safe: bool) -> None:
	"""Extract TAR archive."""
	with tarfile.open(src, "r:*") as tf:
		if safe:
			safe_extract_tar(tf, dest)
		else:
			tf.extractall(dest)

_extract_zip(src, dest, *, password, safe) staticmethod

Extract ZIP from src to dest. If password is provided, use pyzipper (AES/ZipCrypto); otherwise use the standard zipfile. Safe extraction (anti-traversal) via safe_extract_zip().

Source code in src/sciwork/fs/archives.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@staticmethod
def _extract_zip(src: Path, dest: Path, *, password: Optional[str], safe: bool) -> None:
	"""
	Extract ZIP from *src* to *dest*. If *password* is provided, use pyzipper
	(AES/ZipCrypto); otherwise use the standard zipfile. Safe extraction (anti-traversal)
	via ``safe_extract_zip()``.
	"""
	if password:
		with pyzipper.AESZipFile(src, "r") as zf:
			if safe:
				safe_extract_zip(zf, dest, password=password)
			else:
				zf.setpassword(password.encode("utf-8"))
				zf.extractall(dest)
		return

	with zipfile.ZipFile(src, "r") as zf:
		if safe:
			safe_extract_zip(zf, dest, password=password)
		else:
			zf.extractall(dest)

_resolve_output_path(src_dir, output_archive_path, arch_format, *, overwrite)

Source code in src/sciwork/fs/archives.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def _resolve_output_path(
		self,
		src_dir: Path,
		output_archive_path: Optional[PathLike],
		arch_format: Literal["zip", "tar", "gztar", "bztar", "xztar"],
		*,
		overwrite: bool
) -> Path:
	if output_archive_path is None:
		dest = src_dir.parent / (src_dir.name + self._default_ext_for(arch_format))
	else:
		dest = self._ensure_archive_suffix(self._abs(output_archive_path), arch_format)
		if not self.dry_run:
			dest.parent.mkdir(parents=True, exist_ok=True)

	if dest.exists() and not overwrite:
		raise FileExistsError(f"Archive already exists: '{dest}'.")

	return dest

Unlink the destination if it exists and overwrite=True.

Source code in src/sciwork/fs/archives.py
140
141
142
143
144
145
146
def _unlink_before_compression(self, dest: Path, overwrite: bool) -> None:
	"""Unlink the destination if it exists and ``overwrite=True``."""
	if dest.exists() and overwrite and not self.dry_run:
		try:
			dest.unlink()
		except FileNotFoundError:
			pass

compress_to_archive(source, output_archive_path=None, *, arch_format='zip', overwrite=False, password=None, include_hidden=True, compresslevel=None)

Compress a directory into an archive (ZIP/TAR*). Supports password only for ZIP via pyzipper.

Parameters:

Name Type Description Default
source PathLike

Directory to compress.

required
output_archive_path Optional[PathLike]

Output archive path. If None, create alongside source with the appropriate extension.

None
arch_format Literal['zip', 'tar', 'gztar', 'bztar', 'xztar']

'zip' | 'tar' | 'gztar' | 'bztar' | 'xztar'

'zip'
overwrite bool

If True and output exist, remove it first.

False
password Optional[str]

Optional password (ZIP only; AES-256 via pyzipper).

None
include_hidden bool

Include dot-files/directories.

True
compresslevel Optional[int]

Compression level (None for default).

None

Returns:

Type Description
Path

Absolute path to the created archive.

Raises:

Type Description
FileNotFoundError

Source not found.

NotADirectoryError

Source is not a directory.

FileExistsError

Output exists and overwrite=False.

ValueError

Invalid format or password used with TAR.

Source code in src/sciwork/fs/archives.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def compress_to_archive(
		self,
		source: PathLike,
		output_archive_path: Optional[PathLike] = None,
		*,
		arch_format: Literal['zip', 'tar', 'gztar', 'bztar', 'xztar'] = "zip",
		overwrite: bool = False,
		password: Optional[str] = None,  # only for ZIP
		include_hidden: bool = True,
		compresslevel: Optional[int] = None  # 0..9 for zip / bz2
) -> Path:
	"""
	Compress a directory into an archive (ZIP/TAR*). Supports password only for ZIP via pyzipper.

	:param source: Directory to compress.
	:param output_archive_path: Output archive path. If None, create alongside *source*
								with the appropriate extension.
	:param arch_format: 'zip' | 'tar' | 'gztar' | 'bztar' | 'xztar'
	:param overwrite: If True and output exist, remove it first.
	:param password: Optional password (ZIP only; AES-256 via pyzipper).
	:param include_hidden: Include dot-files/directories.
	:param compresslevel: Compression level (None for default).
	:return: Absolute path to the created archive.
	:raises FileNotFoundError: Source not found.
	:raises NotADirectoryError: Source is not a directory.
	:raises FileExistsError: Output exists and ``overwrite=False``.
	:raises ValueError: Invalid format or password used with TAR.
	"""
	src_dir = self._abs(source)
	if not src_dir.exists():
		raise FileNotFoundError(f"Source directory not found: '{src_dir}'.")
	if not src_dir.is_dir():
		raise NotADirectoryError(f"Source is not a directory: '{src_dir}'.")

	dest = self._resolve_output_path(src_dir, output_archive_path, arch_format, overwrite=overwrite)

	# prepare the file list
	files = list(iter_archive_files(src_dir, include_hidden=include_hidden))
	if not files:
		raise FileNotFoundError(f"No files to archive under '{src_dir}'.")

	if self.dry_run:
		LOG.info("[dry-run] make %s archive (%d files): %s -> %s", arch_format, len(files), src_dir, dest)
		return dest.resolve()

	if arch_format == "zip":
		self._unlink_before_compression(dest, overwrite=overwrite)
		make_zip_archive(src_dir, files, dest, password=password, compresslevel=compresslevel)
	else:
		if password:
			raise ValueError("Passwords are supported only for ZIP archives.")
		self._unlink_before_compression(dest, overwrite=overwrite)
		make_tar_archive(src_dir, files, dest, tar_format=arch_format, compresslevel=compresslevel)

	LOG.info("Archive created (%s): %s -> %s", arch_format, src_dir, dest)
	return dest.resolve()

extract_archive(archive_path, extract_to=None, *, overwrite=False, password=None, safe=True)

Extract an archive into a directory.

Supported: - ZIP (password optional; legacy ZipCrypto) - TAR / TAR.GZ / TAR.BZ2 / TAR.XZ (no password) - RAR (password optional, requires mod:rarfile and an external backend)

Parameters:

Name Type Description Default
archive_path PathLike

Path to the archive file.

required
extract_to Optional[PathLike]

Target directory; default is <archive_dir>/<archive_stem>.

None
overwrite bool

Allow extraction into a non-empty target dir.

False
password Optional[str]

Optional password for ZIP/RAR. Ignored for TAR*.

None
safe bool

If True, protect against path traversal (recommended).

True

Returns:

Type Description
Path

Absolute path to the extraction directory.

Raises:

Type Description
FileNotFoundError

Archive not found.

FileExistsError

Destination exists and is not empty and overwrite=False.

ValueError

Unsupported type or password provided for unsupported format.

PermissionError

On OS errors.

OSError

On OS errors.

Source code in src/sciwork/fs/archives.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def extract_archive(
		self,
		archive_path: PathLike,
		extract_to: Optional[PathLike] = None,
		*,
		overwrite: bool = False,
		password: Optional[str] = None,
		safe: bool = True
) -> Path:
	"""
	Extract an archive into a directory.

	Supported:
		- ZIP (password optional; legacy ZipCrypto)
		- TAR / TAR.GZ / TAR.BZ2 / TAR.XZ (no password)
		- RAR (password optional, requires mod:`rarfile` and an external backend)

	:param archive_path: Path to the archive file.
	:param extract_to: Target directory; default is ``<archive_dir>/<archive_stem>``.
	:param overwrite: Allow extraction into a non-empty target dir.
	:param password: Optional password for ZIP/RAR. Ignored for TAR*.
	:param safe: If True, protect against path traversal (recommended).
	:return: Absolute path to the extraction directory.
	:raises FileNotFoundError: Archive not found.
	:raises FileExistsError: Destination exists and is not empty and ``overwrite=False``.
	:raises ValueError: Unsupported type or password provided for unsupported format.
	:raises PermissionError: On OS errors.
	:raises OSError: On OS errors.
	"""
	src = self._abs(archive_path)
	if not src.exists():
		raise FileNotFoundError(f"Archive path does not exist: '{src}'.")

	dest = self._destination_dir(src, extract_to, overwrite=overwrite)

	kind = detect_archive_type(src)
	if kind == "unknown":
		raise ValueError(f"Unsupported or unrecognized archive type: '{src}'.")

	if self.dry_run:
		LOG.info("[dry-run] extract (%s): %s -> %s", kind, src, dest)
		return dest.resolve()

	try:
		if kind == "zip":
			self._extract_zip(src, dest, password=password, safe=safe)
		elif kind == "tar":
			if password:
				raise ValueError("Passwords are not supported for TAR archives.")
			self._extract_tar(src, dest, safe=safe)
		elif kind == "rar":
			self._extract_rar(src, dest, password=password, safe=safe)
		else:
			raise ValueError(f"Unsupported archive type: '{kind}'.")
	except PermissionError:
		LOG.exception("Permission denied while extracting '%s' -> '%s'", src, dest)
		raise
	except OSError as exc:
		LOG.exception("OS error while extracting '%s' -> '%s': %s", src, dest, exc)
		raise

	LOG.info("Extracted (%s): %s -> %s", kind, src, dest)
	return dest.resolve()

Load

sciwork.fs.load.Load(base_dir=None, *, dry_run=False, input_func=None, **kwargs)

Bases: PathOpsBase, Classify, BaseLoaders

Facade that provides :meth:any_data_loader on top of concrete loaders.

This class relies on other mixins present in your project:

  • PathOpsBase for path resolution (_abs) and base_dir handling
  • a classifier method classify_path(path) -> str (e.g., 'comma_separated_values', 'text_only', ...)
  • encoding/delimiter helpers: detect_encoding and detect_delimiter

If your current PathOps already mixes these in, you can inherit:

Source code in src/sciwork/fs/base.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(
		self,
		base_dir: Optional[PathLike] = None,
		*,
		dry_run: bool = False,
		input_func: Optional[Callable[[str], str]] = None,
		**kwargs
) -> None:
	self.base_dir = Path(base_dir).resolve() if base_dir else Path.cwd()
	self.dry_run = bool(dry_run)
	self.input_func = input_func or input

any_data_loader(path, *, sheet_name=None, encoding=None, delimiter=None, header=None, dtype=None, include_hidden_rows=False, force_type=None)

Open a variety of data files and return either a DataFrame or Python data structures.

Supported: - Excel (.xlsx, .xlsm, .xls) → pandas.DataFrame (optionally skip rows) - CSV/TSV/Text (.csv/.tsv/.txt) → pandas.DataFrame (delimiter detection when missing) - JSON (.json) → dict/list (returned as loaded Python objects) - XML (.xml) → pandas.DataFrame (flat dict per element) - SIF (.sif) → pandas.DataFrame (via :meth:_load_sif)

Parameters:

Name Type Description Default
path PathLike

Input file path.

required
sheet_name Optional[str]

For Excel: sheet name to load. If 'choice', the user is prompted to pick. If None, the first sheet is used.

None
encoding Optional[str]

Encoding for text files; auto-detected if None.

None
delimiter Optional[str]

Delimiter for CSV/TXT; auto-detected if None.

None
header Optional[int]

Header row index (0-based) for tabular readers.

None
dtype Optional[dict]

Optional dtype mapping passed to the pandas readers.

None
include_hidden_rows bool

If True, Excel reader includes rows even if hidden. If False, hidden rows are skipped (via openpyxl).

False
force_type Optional[str]

Force a specific type_label (bypasses classification).

None

Returns:

Type Description
Any

DataFrame or Python object depending on format.

Raises:

Type Description
FileNotFoundError

Source not found.

ValueError

For unsupported/unknown types.

Source code in src/sciwork/fs/load.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def any_data_loader(
		self,
		path: PathLike,
		*,
		# common options; loaders pick what they need
		sheet_name: Optional[str] = None,  # for Excel; 'choice' → prompt
		encoding: Optional[str] = None,  # for text files
		delimiter: Optional[str] = None,  # for CSV/TXT
		header: Optional[int] = None,
		dtype: Optional[dict] = None,
		include_hidden_rows: bool = False,  # for Excel via openpyxl
		force_type: Optional[str] = None
) -> Any:
	"""
	Open a variety of data files and return either a DataFrame or Python data structures.

	Supported:
		- Excel (.xlsx, .xlsm, .xls)    → pandas.DataFrame (optionally skip rows)
		- CSV/TSV/Text (.csv/.tsv/.txt) → pandas.DataFrame (delimiter detection when missing)
		- JSON (.json)                  → dict/list (returned as loaded Python objects)
		- XML (.xml)                    → pandas.DataFrame (flat dict per element)
		- SIF (.sif)                    → pandas.DataFrame (via :meth:`_load_sif`)

	:param path: Input file path.
	:param sheet_name: For Excel: sheet name to load. If 'choice', the user is prompted to pick.
						If None, the first sheet is used.
	:param encoding: Encoding for text files; auto-detected if None.
	:param delimiter: Delimiter for CSV/TXT; auto-detected if None.
	:param header: Header row index (0-based) for tabular readers.
	:param dtype: Optional dtype mapping passed to the pandas readers.
	:param include_hidden_rows: If True, Excel reader includes rows even if hidden.
								If False, hidden rows are skipped (via openpyxl).
	:param force_type: Force a specific type_label (bypasses classification).
	:return: DataFrame or Python object depending on format.
	:raises FileNotFoundError: Source not found.
	:raises ValueError: For unsupported/unknown types.
	"""
	p = self._abs(path)
	if not p.exists():
		raise FileNotFoundError(f"File '{p}' not found")

	kind = force_type or self.classify_path(p)

	# Excel
	if kind in {"ms_excel_spreadsheet", "excel", "xlsx",  "xls", "xlsm"}:
		return self._load_ms_excel_spreadsheet(
			p, sheet_name=sheet_name, header=header, dtype=dtype, include_hidden_rows=include_hidden_rows
		)

	# CSV/TSV
	if kind in {"comma_separated_values", "csv", "tsv"}:
		return self._load_csv(
			p, encoding=encoding, delimiter=delimiter, header=header, dtype=dtype,
		)

	# Generic delimited text
	if kind in {"text_only", "txt", "log"}:
		return self._load_text_only(
			p, encoding=encoding, delimiter=delimiter, header=header, dtype=dtype
		)

	# JSON
	if kind in {"javascript_object_notation", "json"}:
		return self._load_json(p)

	# XML
	if kind in {"extensible_markup_language", "xml"}:
		return self._load_xml(p)

	# SIF
	if kind in {"andor_scientific_image_format", "sif"}:
		return self._load_sif(p)

	# UV/VIS SPC
	if kind in {"uv_vis_spectrum_spc", "spc"}:
		from .parsers.uvvis_spc import load_uvvis_spc
		return load_uvvis_spc(p)

	if kind == "folder":
		raise ValueError(f"Expected a file, got folder: {p}")

	raise ValueError(f"Unsupported/unknown file type: {p.suffix.lower() or '<no extension>'}")