summaryrefslogtreecommitdiffhomepage
path: root/libs/click
diff options
context:
space:
mode:
authormorpheus65535 <[email protected]>2024-03-03 12:15:23 -0500
committerGitHub <[email protected]>2024-03-03 12:15:23 -0500
commit03afeb347075381bcb7fd6036295c9fa4a90d2dc (patch)
tree7c5d72c973d2c8e4ade57391a1c9ad5e94903a46 /libs/click
parent9ae684240b5bdd40a870d8122f0e380f8d03a187 (diff)
downloadbazarr-03afeb347075381bcb7fd6036295c9fa4a90d2dc.tar.gz
bazarr-03afeb347075381bcb7fd6036295c9fa4a90d2dc.zip
Updated multiple Python modules (now in libs and custom_libs directories) and React libraries
Diffstat (limited to 'libs/click')
-rw-r--r--libs/click/__init__.py2
-rw-r--r--libs/click/_compat.py67
-rw-r--r--libs/click/_termui_impl.py42
-rw-r--r--libs/click/core.py126
-rw-r--r--libs/click/decorators.py178
-rw-r--r--libs/click/exceptions.py13
-rw-r--r--libs/click/parser.py10
-rw-r--r--libs/click/shell_completion.py60
-rw-r--r--libs/click/termui.py11
-rw-r--r--libs/click/testing.py20
-rw-r--r--libs/click/types.py98
-rw-r--r--libs/click/utils.py102
12 files changed, 465 insertions, 264 deletions
diff --git a/libs/click/__init__.py b/libs/click/__init__.py
index e3ef423b6..9a1dab048 100644
--- a/libs/click/__init__.py
+++ b/libs/click/__init__.py
@@ -70,4 +70,4 @@ from .utils import get_binary_stream as get_binary_stream
from .utils import get_text_stream as get_text_stream
from .utils import open_file as open_file
-__version__ = "8.1.3"
+__version__ = "8.1.7"
diff --git a/libs/click/_compat.py b/libs/click/_compat.py
index 766d286be..23f886659 100644
--- a/libs/click/_compat.py
+++ b/libs/click/_compat.py
@@ -7,20 +7,11 @@ import typing as t
from weakref import WeakKeyDictionary
CYGWIN = sys.platform.startswith("cygwin")
-MSYS2 = sys.platform.startswith("win") and ("GCC" in sys.version)
-# Determine local App Engine environment, per Google's own suggestion
-APP_ENGINE = "APPENGINE_RUNTIME" in os.environ and "Development/" in os.environ.get(
- "SERVER_SOFTWARE", ""
-)
-WIN = sys.platform.startswith("win") and not APP_ENGINE and not MSYS2
+WIN = sys.platform.startswith("win")
auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None
_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
-def get_filesystem_encoding() -> str:
- return sys.getfilesystemencoding() or sys.getdefaultencoding()
-
-
def _make_text_stream(
stream: t.BinaryIO,
encoding: t.Optional[str],
@@ -50,7 +41,7 @@ def is_ascii_encoding(encoding: str) -> bool:
return False
-def get_best_encoding(stream: t.IO) -> str:
+def get_best_encoding(stream: t.IO[t.Any]) -> str:
"""Returns the default stream encoding if not found."""
rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
if is_ascii_encoding(rv):
@@ -153,7 +144,7 @@ class _FixupStream:
return True
-def _is_binary_reader(stream: t.IO, default: bool = False) -> bool:
+def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool:
try:
return isinstance(stream.read(0), bytes)
except Exception:
@@ -162,7 +153,7 @@ def _is_binary_reader(stream: t.IO, default: bool = False) -> bool:
# closed. In this case, we assume the default.
-def _is_binary_writer(stream: t.IO, default: bool = False) -> bool:
+def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool:
try:
stream.write(b"")
except Exception:
@@ -175,7 +166,7 @@ def _is_binary_writer(stream: t.IO, default: bool = False) -> bool:
return True
-def _find_binary_reader(stream: t.IO) -> t.Optional[t.BinaryIO]:
+def _find_binary_reader(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:
# We need to figure out if the given stream is already binary.
# This can happen because the official docs recommend detaching
# the streams to get binary streams. Some code might do this, so
@@ -193,7 +184,7 @@ def _find_binary_reader(stream: t.IO) -> t.Optional[t.BinaryIO]:
return None
-def _find_binary_writer(stream: t.IO) -> t.Optional[t.BinaryIO]:
+def _find_binary_writer(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:
# We need to figure out if the given stream is already binary.
# This can happen because the official docs recommend detaching
# the streams to get binary streams. Some code might do this, so
@@ -241,11 +232,11 @@ def _is_compatible_text_stream(
def _force_correct_text_stream(
- text_stream: t.IO,
+ text_stream: t.IO[t.Any],
encoding: t.Optional[str],
errors: t.Optional[str],
- is_binary: t.Callable[[t.IO, bool], bool],
- find_binary: t.Callable[[t.IO], t.Optional[t.BinaryIO]],
+ is_binary: t.Callable[[t.IO[t.Any], bool], bool],
+ find_binary: t.Callable[[t.IO[t.Any]], t.Optional[t.BinaryIO]],
force_readable: bool = False,
force_writable: bool = False,
) -> t.TextIO:
@@ -287,7 +278,7 @@ def _force_correct_text_stream(
def _force_correct_text_reader(
- text_reader: t.IO,
+ text_reader: t.IO[t.Any],
encoding: t.Optional[str],
errors: t.Optional[str],
force_readable: bool = False,
@@ -303,7 +294,7 @@ def _force_correct_text_reader(
def _force_correct_text_writer(
- text_writer: t.IO,
+ text_writer: t.IO[t.Any],
encoding: t.Optional[str],
errors: t.Optional[str],
force_writable: bool = False,
@@ -367,11 +358,11 @@ def get_text_stderr(
def _wrap_io_open(
- file: t.Union[str, os.PathLike, int],
+ file: t.Union[str, "os.PathLike[str]", int],
mode: str,
encoding: t.Optional[str],
errors: t.Optional[str],
-) -> t.IO:
+) -> t.IO[t.Any]:
"""Handles not passing ``encoding`` and ``errors`` in binary mode."""
if "b" in mode:
return open(file, mode)
@@ -380,13 +371,14 @@ def _wrap_io_open(
def open_stream(
- filename: str,
+ filename: "t.Union[str, os.PathLike[str]]",
mode: str = "r",
encoding: t.Optional[str] = None,
errors: t.Optional[str] = "strict",
atomic: bool = False,
-) -> t.Tuple[t.IO, bool]:
+) -> t.Tuple[t.IO[t.Any], bool]:
binary = "b" in mode
+ filename = os.fspath(filename)
# Standard streams first. These are simple because they ignore the
# atomic flag. Use fsdecode to handle Path("-").
@@ -456,11 +448,11 @@ def open_stream(
f = _wrap_io_open(fd, mode, encoding, errors)
af = _AtomicFile(f, tmp_filename, os.path.realpath(filename))
- return t.cast(t.IO, af), True
+ return t.cast(t.IO[t.Any], af), True
class _AtomicFile:
- def __init__(self, f: t.IO, tmp_filename: str, real_filename: str) -> None:
+ def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None:
self._f = f
self._tmp_filename = tmp_filename
self._real_filename = real_filename
@@ -483,7 +475,7 @@ class _AtomicFile:
def __enter__(self) -> "_AtomicFile":
return self
- def __exit__(self, exc_type, exc_value, tb): # type: ignore
+ def __exit__(self, exc_type: t.Optional[t.Type[BaseException]], *_: t.Any) -> None:
self.close(delete=exc_type is not None)
def __repr__(self) -> str:
@@ -494,7 +486,7 @@ def strip_ansi(value: str) -> str:
return _ansi_re.sub("", value)
-def _is_jupyter_kernel_output(stream: t.IO) -> bool:
+def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool:
while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
stream = stream._stream
@@ -502,7 +494,7 @@ def _is_jupyter_kernel_output(stream: t.IO) -> bool:
def should_strip_ansi(
- stream: t.Optional[t.IO] = None, color: t.Optional[bool] = None
+ stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None
) -> bool:
if color is None:
if stream is None:
@@ -524,7 +516,7 @@ if sys.platform.startswith("win") and WIN:
_ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
- def auto_wrap_for_ansi(
+ def auto_wrap_for_ansi( # noqa: F811
stream: t.TextIO, color: t.Optional[bool] = None
) -> t.TextIO:
"""Support ANSI color and style codes on Windows by wrapping a
@@ -564,7 +556,7 @@ if sys.platform.startswith("win") and WIN:
else:
def _get_argv_encoding() -> str:
- return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding()
+ return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding()
def _get_windows_console_stream(
f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
@@ -576,7 +568,7 @@ def term_len(x: str) -> int:
return len(strip_ansi(x))
-def isatty(stream: t.IO) -> bool:
+def isatty(stream: t.IO[t.Any]) -> bool:
try:
return stream.isatty()
except Exception:
@@ -584,12 +576,17 @@ def isatty(stream: t.IO) -> bool:
def _make_cached_stream_func(
- src_func: t.Callable[[], t.TextIO], wrapper_func: t.Callable[[], t.TextIO]
-) -> t.Callable[[], t.TextIO]:
+ src_func: t.Callable[[], t.Optional[t.TextIO]],
+ wrapper_func: t.Callable[[], t.TextIO],
+) -> t.Callable[[], t.Optional[t.TextIO]]:
cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
- def func() -> t.TextIO:
+ def func() -> t.Optional[t.TextIO]:
stream = src_func()
+
+ if stream is None:
+ return None
+
try:
rv = cache.get(stream)
except Exception:
diff --git a/libs/click/_termui_impl.py b/libs/click/_termui_impl.py
index 4b979bcc1..f74465775 100644
--- a/libs/click/_termui_impl.py
+++ b/libs/click/_termui_impl.py
@@ -10,6 +10,8 @@ import sys
import time
import typing as t
from gettext import gettext as _
+from io import StringIO
+from types import TracebackType
from ._compat import _default_text_stdout
from ._compat import CYGWIN
@@ -59,15 +61,22 @@ class ProgressBar(t.Generic[V]):
self.show_percent = show_percent
self.show_pos = show_pos
self.item_show_func = item_show_func
- self.label = label or ""
+ self.label: str = label or ""
+
if file is None:
file = _default_text_stdout()
+
+ # There are no standard streams attached to write to. For example,
+ # pythonw on Windows.
+ if file is None:
+ file = StringIO()
+
self.file = file
self.color = color
self.update_min_steps = update_min_steps
self._completed_intervals = 0
- self.width = width
- self.autowidth = width == 0
+ self.width: int = width
+ self.autowidth: bool = width == 0
if length is None:
from operator import length_hint
@@ -80,25 +89,32 @@ class ProgressBar(t.Generic[V]):
if length is None:
raise TypeError("iterable or length is required")
iterable = t.cast(t.Iterable[V], range(length))
- self.iter = iter(iterable)
+ self.iter: t.Iterable[V] = iter(iterable)
self.length = length
self.pos = 0
self.avg: t.List[float] = []
+ self.last_eta: float
+ self.start: float
self.start = self.last_eta = time.time()
- self.eta_known = False
- self.finished = False
+ self.eta_known: bool = False
+ self.finished: bool = False
self.max_width: t.Optional[int] = None
- self.entered = False
+ self.entered: bool = False
self.current_item: t.Optional[V] = None
- self.is_hidden = not isatty(self.file)
+ self.is_hidden: bool = not isatty(self.file)
self._last_line: t.Optional[str] = None
- def __enter__(self) -> "ProgressBar":
+ def __enter__(self) -> "ProgressBar[V]":
self.entered = True
self.render_progress()
return self
- def __exit__(self, exc_type, exc_value, tb): # type: ignore
+ def __exit__(
+ self,
+ exc_type: t.Optional[t.Type[BaseException]],
+ exc_value: t.Optional[BaseException],
+ tb: t.Optional[TracebackType],
+ ) -> None:
self.render_finish()
def __iter__(self) -> t.Iterator[V]:
@@ -344,6 +360,12 @@ class ProgressBar(t.Generic[V]):
def pager(generator: t.Iterable[str], color: t.Optional[bool] = None) -> None:
"""Decide what method to use for paging through text."""
stdout = _default_text_stdout()
+
+ # There are no standard streams attached to write to. For example,
+ # pythonw on Windows.
+ if stdout is None:
+ stdout = StringIO()
+
if not isatty(sys.stdin) or not isatty(stdout):
return _nullpager(stdout, generator, color)
pager_cmd = (os.environ.get("PAGER", None) or "").strip()
diff --git a/libs/click/core.py b/libs/click/core.py
index 5abfb0f3c..cc65e896b 100644
--- a/libs/click/core.py
+++ b/libs/click/core.py
@@ -7,11 +7,11 @@ import typing as t
from collections import abc
from contextlib import contextmanager
from contextlib import ExitStack
-from functools import partial
from functools import update_wrapper
from gettext import gettext as _
from gettext import ngettext
from itertools import repeat
+from types import TracebackType
from . import types
from .exceptions import Abort
@@ -264,7 +264,7 @@ class Context:
info_name: t.Optional[str] = None,
obj: t.Optional[t.Any] = None,
auto_envvar_prefix: t.Optional[str] = None,
- default_map: t.Optional[t.Dict[str, t.Any]] = None,
+ default_map: t.Optional[t.MutableMapping[str, t.Any]] = None,
terminal_width: t.Optional[int] = None,
max_content_width: t.Optional[int] = None,
resilient_parsing: bool = False,
@@ -311,7 +311,7 @@ class Context:
):
default_map = parent.default_map.get(info_name)
- self.default_map: t.Optional[t.Dict[str, t.Any]] = default_map
+ self.default_map: t.Optional[t.MutableMapping[str, t.Any]] = default_map
#: This flag indicates if a subcommand is going to be executed. A
#: group callback can use this information to figure out if it's
@@ -455,7 +455,12 @@ class Context:
push_context(self)
return self
- def __exit__(self, exc_type, exc_value, tb): # type: ignore
+ def __exit__(
+ self,
+ exc_type: t.Optional[t.Type[BaseException]],
+ exc_value: t.Optional[BaseException],
+ tb: t.Optional[TracebackType],
+ ) -> None:
self._depth -= 1
if self._depth == 0:
self.close()
@@ -706,12 +711,30 @@ class Context:
"""
return type(self)(command, info_name=command.name, parent=self)
+ @t.overload
def invoke(
__self, # noqa: B902
- __callback: t.Union["Command", t.Callable[..., t.Any]],
+ __callback: "t.Callable[..., V]",
+ *args: t.Any,
+ **kwargs: t.Any,
+ ) -> V:
+ ...
+
+ @t.overload
+ def invoke(
+ __self, # noqa: B902
+ __callback: "Command",
*args: t.Any,
**kwargs: t.Any,
) -> t.Any:
+ ...
+
+ def invoke(
+ __self, # noqa: B902
+ __callback: t.Union["Command", "t.Callable[..., V]"],
+ *args: t.Any,
+ **kwargs: t.Any,
+ ) -> t.Union[t.Any, V]:
"""Invokes a command callback in exactly the way it expects. There
are two ways to invoke this method:
@@ -739,7 +762,7 @@ class Context:
"The given command does not have a callback that can be invoked."
)
else:
- __callback = other_cmd.callback
+ __callback = t.cast("t.Callable[..., V]", other_cmd.callback)
ctx = __self._make_sub_context(other_cmd)
@@ -844,7 +867,7 @@ class BaseCommand:
def __init__(
self,
name: t.Optional[str],
- context_settings: t.Optional[t.Dict[str, t.Any]] = None,
+ context_settings: t.Optional[t.MutableMapping[str, t.Any]] = None,
) -> None:
#: the name the command thinks it has. Upon registering a command
#: on a :class:`Group` the group will default the command name
@@ -856,7 +879,7 @@ class BaseCommand:
context_settings = {}
#: an optional dictionary with defaults passed to the context.
- self.context_settings: t.Dict[str, t.Any] = context_settings
+ self.context_settings: t.MutableMapping[str, t.Any] = context_settings
def to_info_dict(self, ctx: Context) -> t.Dict[str, t.Any]:
"""Gather information that could be useful for a tool generating
@@ -898,7 +921,7 @@ class BaseCommand:
:param info_name: the info name for this invocation. Generally this
is the most descriptive name for the script or
command. For the toplevel script it's usually
- the name of the script, for commands below it it's
+ the name of the script, for commands below it's
the name of the command.
:param args: the arguments to parse as list of strings.
:param parent: the parent context if available.
@@ -931,7 +954,7 @@ class BaseCommand:
"""Given a context, this invokes the command. The default
implementation is raising a not implemented error.
"""
- raise NotImplementedError("Base commands are not invokable by default")
+ raise NotImplementedError("Base commands are not invocable by default")
def shell_complete(self, ctx: Context, incomplete: str) -> t.List["CompletionItem"]:
"""Return a list of completions for the incomplete value. Looks
@@ -1063,9 +1086,9 @@ class BaseCommand:
# even always obvious that `rv` indicates success/failure
# by its truthiness/falsiness
ctx.exit()
- except (EOFError, KeyboardInterrupt):
+ except (EOFError, KeyboardInterrupt) as e:
echo(file=sys.stderr)
- raise Abort() from None
+ raise Abort() from e
except ClickException as e:
if not standalone_mode:
raise
@@ -1099,7 +1122,7 @@ class BaseCommand:
def _main_shell_completion(
self,
- ctx_args: t.Dict[str, t.Any],
+ ctx_args: t.MutableMapping[str, t.Any],
prog_name: str,
complete_var: t.Optional[str] = None,
) -> None:
@@ -1111,9 +1134,13 @@ class BaseCommand:
:param complete_var: Name of the environment variable that holds
the completion instruction. Defaults to
``_{PROG_NAME}_COMPLETE``.
+
+ .. versionchanged:: 8.2.0
+ Dots (``.``) in ``prog_name`` are replaced with underscores (``_``).
"""
if complete_var is None:
- complete_var = f"_{prog_name}_COMPLETE".replace("-", "_").upper()
+ complete_name = prog_name.replace("-", "_").replace(".", "_")
+ complete_var = f"_{complete_name}_COMPLETE".upper()
instruction = os.environ.get(complete_var)
@@ -1175,7 +1202,7 @@ class Command(BaseCommand):
def __init__(
self,
name: t.Optional[str],
- context_settings: t.Optional[t.Dict[str, t.Any]] = None,
+ context_settings: t.Optional[t.MutableMapping[str, t.Any]] = None,
callback: t.Optional[t.Callable[..., t.Any]] = None,
params: t.Optional[t.List["Parameter"]] = None,
help: t.Optional[str] = None,
@@ -1333,13 +1360,16 @@ class Command(BaseCommand):
def format_help_text(self, ctx: Context, formatter: HelpFormatter) -> None:
"""Writes the help text to the formatter if it exists."""
- text = self.help if self.help is not None else ""
+ if self.help is not None:
+ # truncate the help text to the first form feed
+ text = inspect.cleandoc(self.help).partition("\f")[0]
+ else:
+ text = ""
if self.deprecated:
text = _("(Deprecated) {text}").format(text=text)
if text:
- text = inspect.cleandoc(text).partition("\f")[0]
formatter.write_paragraph()
with formatter.indentation():
@@ -1462,6 +1492,7 @@ class MultiCommand(Command):
:param result_callback: The result callback to attach to this multi
command. This can be set or changed later with the
:meth:`result_callback` decorator.
+ :param attrs: Other command arguments described in :class:`Command`.
"""
allow_extra_args = True
@@ -1569,7 +1600,7 @@ class MultiCommand(Command):
return f
def function(__value, *args, **kwargs): # type: ignore
- inner = old_callback(__value, *args, **kwargs) # type: ignore
+ inner = old_callback(__value, *args, **kwargs)
return f(inner, *args, **kwargs)
self._result_callback = rv = update_wrapper(t.cast(F, function), f)
@@ -1760,7 +1791,7 @@ class Group(MultiCommand):
:class:`BaseCommand`.
.. versionchanged:: 8.0
- The ``commmands`` argument can be a list of command objects.
+ The ``commands`` argument can be a list of command objects.
"""
#: If set, this is used by the group's :meth:`command` decorator
@@ -1786,7 +1817,9 @@ class Group(MultiCommand):
def __init__(
self,
name: t.Optional[str] = None,
- commands: t.Optional[t.Union[t.Dict[str, Command], t.Sequence[Command]]] = None,
+ commands: t.Optional[
+ t.Union[t.MutableMapping[str, Command], t.Sequence[Command]]
+ ] = None,
**attrs: t.Any,
) -> None:
super().__init__(name, **attrs)
@@ -1797,7 +1830,7 @@ class Group(MultiCommand):
commands = {c.name: c for c in commands if c.name is not None}
#: The registered subcommands by their exported names.
- self.commands: t.Dict[str, Command] = commands
+ self.commands: t.MutableMapping[str, Command] = commands
def add_command(self, cmd: Command, name: t.Optional[str] = None) -> None:
"""Registers another :class:`Command` with this group. If the name
@@ -1838,10 +1871,7 @@ class Group(MultiCommand):
"""
from .decorators import command
- if self.command_class and kwargs.get("cls") is None:
- kwargs["cls"] = self.command_class
-
- func: t.Optional[t.Callable] = None
+ func: t.Optional[t.Callable[..., t.Any]] = None
if args and callable(args[0]):
assert (
@@ -1850,6 +1880,9 @@ class Group(MultiCommand):
(func,) = args
args = ()
+ if self.command_class and kwargs.get("cls") is None:
+ kwargs["cls"] = self.command_class
+
def decorator(f: t.Callable[..., t.Any]) -> Command:
cmd: Command = command(*args, **kwargs)(f)
self.add_command(cmd)
@@ -1889,7 +1922,7 @@ class Group(MultiCommand):
"""
from .decorators import group
- func: t.Optional[t.Callable] = None
+ func: t.Optional[t.Callable[..., t.Any]] = None
if args and callable(args[0]):
assert (
@@ -1926,6 +1959,9 @@ class CommandCollection(MultiCommand):
commands together into one. This is a straightforward implementation
that accepts a list of different multi commands as sources and
provides all the commands for each of them.
+
+ See :class:`MultiCommand` and :class:`Command` for the description of
+ ``name`` and ``attrs``.
"""
def __init__(
@@ -1985,7 +2021,7 @@ class Parameter:
argument. This is a list of flags or argument
names.
:param type: the type that should be used. Either a :class:`ParamType`
- or a Python type. The later is converted into the former
+ or a Python type. The latter is converted into the former
automatically if supported.
:param required: controls if this is optional or not.
:param default: the default value if omitted. This can also be a callable,
@@ -2069,10 +2105,13 @@ class Parameter:
]
] = None,
) -> None:
+ self.name: t.Optional[str]
+ self.opts: t.List[str]
+ self.secondary_opts: t.List[str]
self.name, self.opts, self.secondary_opts = self._parse_decls(
param_decls or (), expose_value
)
- self.type = types.convert_type(type, default)
+ self.type: types.ParamType = types.convert_type(type, default)
# Default nargs to what the type tells us if we have that
# information available.
@@ -2260,7 +2299,7 @@ class Parameter:
if value is None:
return () if self.multiple or self.nargs == -1 else None
- def check_iter(value: t.Any) -> t.Iterator:
+ def check_iter(value: t.Any) -> t.Iterator[t.Any]:
try:
return _check_iter(value)
except TypeError:
@@ -2272,17 +2311,18 @@ class Parameter:
) from None
if self.nargs == 1 or self.type.is_composite:
- convert: t.Callable[[t.Any], t.Any] = partial(
- self.type, param=self, ctx=ctx
- )
+
+ def convert(value: t.Any) -> t.Any:
+ return self.type(value, param=self, ctx=ctx)
+
elif self.nargs == -1:
- def convert(value: t.Any) -> t.Tuple:
+ def convert(value: t.Any) -> t.Any: # t.Tuple[t.Any, ...]
return tuple(self.type(x, self, ctx) for x in check_iter(value))
else: # nargs > 1
- def convert(value: t.Any) -> t.Tuple:
+ def convert(value: t.Any) -> t.Any: # t.Tuple[t.Any, ...]
value = tuple(check_iter(value))
if len(value) != self.nargs:
@@ -2449,6 +2489,7 @@ class Option(Parameter):
context.
:param help: the help string.
:param hidden: hide this option from help outputs.
+ :param attrs: Other command arguments described in :class:`Parameter`.
.. versionchanged:: 8.1.0
Help text indentation is cleaned here instead of only in the
@@ -2529,19 +2570,25 @@ class Option(Parameter):
# flag if flag_value is set.
self._flag_needs_value = flag_value is not None
+ self.default: t.Union[t.Any, t.Callable[[], t.Any]]
+
if is_flag and default_is_missing and not self.required:
- self.default: t.Union[t.Any, t.Callable[[], t.Any]] = False
+ if multiple:
+ self.default = ()
+ else:
+ self.default = False
if flag_value is None:
flag_value = not self.default
+ self.type: types.ParamType
if is_flag and type is None:
# Re-guess the type from the flag value instead of the
# default.
self.type = types.convert_type(None, flag_value)
self.is_flag: bool = is_flag
- self.is_bool_flag = is_flag and isinstance(self.type, types.BoolParamType)
+ self.is_bool_flag: bool = is_flag and isinstance(self.type, types.BoolParamType)
self.flag_value: t.Any = flag_value
# Counting
@@ -2580,9 +2627,6 @@ class Option(Parameter):
if self.is_flag:
raise TypeError("'count' is not valid with 'is_flag'.")
- if self.multiple and self.is_flag:
- raise TypeError("'multiple' is not valid with 'is_flag', use 'count'.")
-
def to_info_dict(self) -> t.Dict[str, t.Any]:
info_dict = super().to_info_dict()
info_dict.update(
@@ -2817,7 +2861,7 @@ class Option(Parameter):
if self.is_flag and not self.is_bool_flag:
for param in ctx.command.params:
if param.name == self.name and param.default:
- return param.flag_value # type: ignore
+ return t.cast(Option, param).flag_value
return None
@@ -2927,7 +2971,7 @@ class Argument(Parameter):
provide fewer features than options but can have infinite ``nargs``
and are required by default.
- All parameters are passed onwards to the parameter constructor.
+ All parameters are passed onwards to the constructor of :class:`Parameter`.
"""
param_type_name = "argument"
diff --git a/libs/click/decorators.py b/libs/click/decorators.py
index 28618dc52..d9bba9502 100644
--- a/libs/click/decorators.py
+++ b/libs/click/decorators.py
@@ -13,36 +13,43 @@ from .core import Parameter
from .globals import get_current_context
from .utils import echo
-F = t.TypeVar("F", bound=t.Callable[..., t.Any])
-FC = t.TypeVar("FC", bound=t.Union[t.Callable[..., t.Any], Command])
+if t.TYPE_CHECKING:
+ import typing_extensions as te
+ P = te.ParamSpec("P")
-def pass_context(f: F) -> F:
+R = t.TypeVar("R")
+T = t.TypeVar("T")
+_AnyCallable = t.Callable[..., t.Any]
+FC = t.TypeVar("FC", bound=t.Union[_AnyCallable, Command])
+
+
+def pass_context(f: "t.Callable[te.Concatenate[Context, P], R]") -> "t.Callable[P, R]":
"""Marks a callback as wanting to receive the current context
object as first argument.
"""
- def new_func(*args, **kwargs): # type: ignore
+ def new_func(*args: "P.args", **kwargs: "P.kwargs") -> "R":
return f(get_current_context(), *args, **kwargs)
- return update_wrapper(t.cast(F, new_func), f)
+ return update_wrapper(new_func, f)
-def pass_obj(f: F) -> F:
+def pass_obj(f: "t.Callable[te.Concatenate[t.Any, P], R]") -> "t.Callable[P, R]":
"""Similar to :func:`pass_context`, but only pass the object on the
context onwards (:attr:`Context.obj`). This is useful if that object
represents the state of a nested system.
"""
- def new_func(*args, **kwargs): # type: ignore
+ def new_func(*args: "P.args", **kwargs: "P.kwargs") -> "R":
return f(get_current_context().obj, *args, **kwargs)
- return update_wrapper(t.cast(F, new_func), f)
+ return update_wrapper(new_func, f)
def make_pass_decorator(
- object_type: t.Type, ensure: bool = False
-) -> "t.Callable[[F], F]":
+ object_type: t.Type[T], ensure: bool = False
+) -> t.Callable[["t.Callable[te.Concatenate[T, P], R]"], "t.Callable[P, R]"]:
"""Given an object type this creates a decorator that will work
similar to :func:`pass_obj` but instead of passing the object of the
current context, it will find the innermost context of type
@@ -65,10 +72,11 @@ def make_pass_decorator(
remembered on the context if it's not there yet.
"""
- def decorator(f: F) -> F:
- def new_func(*args, **kwargs): # type: ignore
+ def decorator(f: "t.Callable[te.Concatenate[T, P], R]") -> "t.Callable[P, R]":
+ def new_func(*args: "P.args", **kwargs: "P.kwargs") -> "R":
ctx = get_current_context()
+ obj: t.Optional[T]
if ensure:
obj = ctx.ensure_object(object_type)
else:
@@ -83,14 +91,14 @@ def make_pass_decorator(
return ctx.invoke(f, obj, *args, **kwargs)
- return update_wrapper(t.cast(F, new_func), f)
+ return update_wrapper(new_func, f)
- return decorator
+ return decorator # type: ignore[return-value]
def pass_meta_key(
key: str, *, doc_description: t.Optional[str] = None
-) -> "t.Callable[[F], F]":
+) -> "t.Callable[[t.Callable[te.Concatenate[t.Any, P], R]], t.Callable[P, R]]":
"""Create a decorator that passes a key from
:attr:`click.Context.meta` as the first argument to the decorated
function.
@@ -103,13 +111,13 @@ def pass_meta_key(
.. versionadded:: 8.0
"""
- def decorator(f: F) -> F:
- def new_func(*args, **kwargs): # type: ignore
+ def decorator(f: "t.Callable[te.Concatenate[t.Any, P], R]") -> "t.Callable[P, R]":
+ def new_func(*args: "P.args", **kwargs: "P.kwargs") -> R:
ctx = get_current_context()
obj = ctx.meta[key]
return ctx.invoke(f, obj, *args, **kwargs)
- return update_wrapper(t.cast(F, new_func), f)
+ return update_wrapper(new_func, f)
if doc_description is None:
doc_description = f"the {key!r} key from :attr:`click.Context.meta`"
@@ -118,41 +126,53 @@ def pass_meta_key(
f"Decorator that passes {doc_description} as the first argument"
" to the decorated function."
)
- return decorator
+ return decorator # type: ignore[return-value]
CmdType = t.TypeVar("CmdType", bound=Command)
+# variant: no call, directly as decorator for a function.
@t.overload
-def command(
- __func: t.Callable[..., t.Any],
-) -> Command:
+def command(name: _AnyCallable) -> Command:
...
+# variant: with positional name and with positional or keyword cls argument:
+# @command(namearg, CommandCls, ...) or @command(namearg, cls=CommandCls, ...)
@t.overload
def command(
- name: t.Optional[str] = None,
+ name: t.Optional[str],
+ cls: t.Type[CmdType],
**attrs: t.Any,
-) -> t.Callable[..., Command]:
+) -> t.Callable[[_AnyCallable], CmdType]:
...
+# variant: name omitted, cls _must_ be a keyword argument, @command(cls=CommandCls, ...)
@t.overload
def command(
- name: t.Optional[str] = None,
- cls: t.Type[CmdType] = ...,
+ name: None = None,
+ *,
+ cls: t.Type[CmdType],
**attrs: t.Any,
-) -> t.Callable[..., CmdType]:
+) -> t.Callable[[_AnyCallable], CmdType]:
+ ...
+
+
+# variant: with optional string name, no cls argument provided.
+def command(
+ name: t.Optional[str] = ..., cls: None = None, **attrs: t.Any
+) -> t.Callable[[_AnyCallable], Command]:
...
def command(
- name: t.Union[str, t.Callable[..., t.Any], None] = None,
- cls: t.Optional[t.Type[Command]] = None,
+ name: t.Union[t.Optional[str], _AnyCallable] = None,
+ cls: t.Optional[t.Type[CmdType]] = None,
**attrs: t.Any,
-) -> t.Union[Command, t.Callable[..., Command]]:
+) -> t.Union[Command, t.Callable[[_AnyCallable], t.Union[Command, CmdType]]]:
r"""Creates a new :class:`Command` and uses the decorated function as
callback. This will also automatically attach all decorated
:func:`option`\s and :func:`argument`\s as parameters to the command.
@@ -182,7 +202,7 @@ def command(
appended to the end of the list.
"""
- func: t.Optional[t.Callable[..., t.Any]] = None
+ func: t.Optional[t.Callable[[_AnyCallable], t.Any]] = None
if callable(name):
func = name
@@ -191,9 +211,9 @@ def command(
assert not attrs, "Use 'command(**kwargs)(callable)' to provide arguments."
if cls is None:
- cls = Command
+ cls = t.cast(t.Type[CmdType], Command)
- def decorator(f: t.Callable[..., t.Any]) -> Command:
+ def decorator(f: _AnyCallable) -> CmdType:
if isinstance(f, Command):
raise TypeError("Attempted to convert a callback into a command twice.")
@@ -211,8 +231,12 @@ def command(
if attrs.get("help") is None:
attrs["help"] = f.__doc__
- cmd = cls( # type: ignore[misc]
- name=name or f.__name__.lower().replace("_", "-"), # type: ignore[arg-type]
+ if t.TYPE_CHECKING:
+ assert cls is not None
+ assert not callable(name)
+
+ cmd = cls(
+ name=name or f.__name__.lower().replace("_", "-"),
callback=f,
params=params,
**attrs,
@@ -226,24 +250,50 @@ def command(
return decorator
+GrpType = t.TypeVar("GrpType", bound=Group)
+
+
+# variant: no call, directly as decorator for a function.
+def group(name: _AnyCallable) -> Group:
+ ...
+
+
+# variant: with positional name and with positional or keyword cls argument:
+# @group(namearg, GroupCls, ...) or @group(namearg, cls=GroupCls, ...)
@t.overload
def group(
- __func: t.Callable[..., t.Any],
-) -> Group:
+ name: t.Optional[str],
+ cls: t.Type[GrpType],
+ **attrs: t.Any,
+) -> t.Callable[[_AnyCallable], GrpType]:
...
+# variant: name omitted, cls _must_ be a keyword argument, @group(cmd=GroupCls, ...)
@t.overload
def group(
- name: t.Optional[str] = None,
+ name: None = None,
+ *,
+ cls: t.Type[GrpType],
**attrs: t.Any,
-) -> t.Callable[[F], Group]:
+) -> t.Callable[[_AnyCallable], GrpType]:
+ ...
+
+
+# variant: with optional string name, no cls argument provided.
+def group(
+ name: t.Optional[str] = ..., cls: None = None, **attrs: t.Any
+) -> t.Callable[[_AnyCallable], Group]:
...
def group(
- name: t.Union[str, t.Callable[..., t.Any], None] = None, **attrs: t.Any
-) -> t.Union[Group, t.Callable[[F], Group]]:
+ name: t.Union[str, _AnyCallable, None] = None,
+ cls: t.Optional[t.Type[GrpType]] = None,
+ **attrs: t.Any,
+) -> t.Union[Group, t.Callable[[_AnyCallable], t.Union[Group, GrpType]]]:
"""Creates a new :class:`Group` with a function as callback. This
works otherwise the same as :func:`command` just that the `cls`
parameter is set to :class:`Group`.
@@ -251,17 +301,16 @@ def group(
.. versionchanged:: 8.1
This decorator can be applied without parentheses.
"""
- if attrs.get("cls") is None:
- attrs["cls"] = Group
+ if cls is None:
+ cls = t.cast(t.Type[GrpType], Group)
if callable(name):
- grp: t.Callable[[F], Group] = t.cast(Group, command(**attrs))
- return grp(name)
+ return command(cls=cls, **attrs)(name)
- return t.cast(Group, command(name, **attrs))
+ return command(name, cls, **attrs)
-def _param_memo(f: FC, param: Parameter) -> None:
+def _param_memo(f: t.Callable[..., t.Any], param: Parameter) -> None:
if isinstance(f, Command):
f.params.append(param)
else:
@@ -271,41 +320,57 @@ def _param_memo(f: FC, param: Parameter) -> None:
f.__click_params__.append(param) # type: ignore
-def argument(*param_decls: str, **attrs: t.Any) -> t.Callable[[FC], FC]:
+def argument(
+ *param_decls: str, cls: t.Optional[t.Type[Argument]] = None, **attrs: t.Any
+) -> t.Callable[[FC], FC]:
"""Attaches an argument to the command. All positional arguments are
passed as parameter declarations to :class:`Argument`; all keyword
arguments are forwarded unchanged (except ``cls``).
This is equivalent to creating an :class:`Argument` instance manually
and attaching it to the :attr:`Command.params` list.
+ For the default argument class, refer to :class:`Argument` and
+ :class:`Parameter` for descriptions of parameters.
+
:param cls: the argument class to instantiate. This defaults to
:class:`Argument`.
+ :param param_decls: Passed as positional arguments to the constructor of
+ ``cls``.
+ :param attrs: Passed as keyword arguments to the constructor of ``cls``.
"""
+ if cls is None:
+ cls = Argument
def decorator(f: FC) -> FC:
- ArgumentClass = attrs.pop("cls", None) or Argument
- _param_memo(f, ArgumentClass(param_decls, **attrs))
+ _param_memo(f, cls(param_decls, **attrs))
return f
return decorator
-def option(*param_decls: str, **attrs: t.Any) -> t.Callable[[FC], FC]:
+def option(
+ *param_decls: str, cls: t.Optional[t.Type[Option]] = None, **attrs: t.Any
+) -> t.Callable[[FC], FC]:
"""Attaches an option to the command. All positional arguments are
passed as parameter declarations to :class:`Option`; all keyword
arguments are forwarded unchanged (except ``cls``).
This is equivalent to creating an :class:`Option` instance manually
and attaching it to the :attr:`Command.params` list.
+ For the default option class, refer to :class:`Option` and
+ :class:`Parameter` for descriptions of parameters.
+
:param cls: the option class to instantiate. This defaults to
:class:`Option`.
+ :param param_decls: Passed as positional arguments to the constructor of
+ ``cls``.
+ :param attrs: Passed as keyword arguments to the constructor of ``cls``.
"""
+ if cls is None:
+ cls = Option
def decorator(f: FC) -> FC:
- # Issue 926, copy attrs, so pre-defined options can re-use the same cls=
- option_attrs = attrs.copy()
- OptionClass = option_attrs.pop("cls", None) or Option
- _param_memo(f, OptionClass(param_decls, **option_attrs))
+ _param_memo(f, cls(param_decls, **attrs))
return f
return decorator
@@ -449,8 +514,7 @@ def version_option(
)
echo(
- t.cast(str, message)
- % {"prog": prog_name, "package": package_name, "version": version},
+ message % {"prog": prog_name, "package": package_name, "version": version},
color=ctx.color,
)
ctx.exit()
diff --git a/libs/click/exceptions.py b/libs/click/exceptions.py
index 9e20b3eb5..fe68a3613 100644
--- a/libs/click/exceptions.py
+++ b/libs/click/exceptions.py
@@ -1,12 +1,13 @@
-import os
import typing as t
from gettext import gettext as _
from gettext import ngettext
from ._compat import get_text_stderr
from .utils import echo
+from .utils import format_filename
if t.TYPE_CHECKING:
+ from .core import Command
from .core import Context
from .core import Parameter
@@ -36,7 +37,7 @@ class ClickException(Exception):
def __str__(self) -> str:
return self.message
- def show(self, file: t.Optional[t.IO] = None) -> None:
+ def show(self, file: t.Optional[t.IO[t.Any]] = None) -> None:
if file is None:
file = get_text_stderr()
@@ -57,9 +58,9 @@ class UsageError(ClickException):
def __init__(self, message: str, ctx: t.Optional["Context"] = None) -> None:
super().__init__(message)
self.ctx = ctx
- self.cmd = self.ctx.command if self.ctx else None
+ self.cmd: t.Optional["Command"] = self.ctx.command if self.ctx else None
- def show(self, file: t.Optional[t.IO] = None) -> None:
+ def show(self, file: t.Optional[t.IO[t.Any]] = None) -> None:
if file is None:
file = get_text_stderr()
color = None
@@ -261,7 +262,7 @@ class FileError(ClickException):
hint = _("unknown error")
super().__init__(hint)
- self.ui_filename = os.fsdecode(filename)
+ self.ui_filename: str = format_filename(filename)
self.filename = filename
def format_message(self) -> str:
@@ -284,4 +285,4 @@ class Exit(RuntimeError):
__slots__ = ("exit_code",)
def __init__(self, code: int = 0) -> None:
- self.exit_code = code
+ self.exit_code: int = code
diff --git a/libs/click/parser.py b/libs/click/parser.py
index 2d5a2ed7b..5fa7adfac 100644
--- a/libs/click/parser.py
+++ b/libs/click/parser.py
@@ -168,7 +168,7 @@ class Option:
):
self._short_opts = []
self._long_opts = []
- self.prefixes = set()
+ self.prefixes: t.Set[str] = set()
for opt in opts:
prefix, value = split_opt(opt)
@@ -194,7 +194,7 @@ class Option:
def takes_value(self) -> bool:
return self.action in ("store", "append")
- def process(self, value: str, state: "ParsingState") -> None:
+ def process(self, value: t.Any, state: "ParsingState") -> None:
if self.action == "store":
state.opts[self.dest] = value # type: ignore
elif self.action == "store_const":
@@ -272,12 +272,12 @@ class OptionParser:
#: If this is set to `False`, the parser will stop on the first
#: non-option. Click uses this to implement nested subcommands
#: safely.
- self.allow_interspersed_args = True
+ self.allow_interspersed_args: bool = True
#: This tells the parser how to deal with unknown options. By
#: default it will error out (which is sensible), but there is a
#: second mode where it will ignore it and continue processing
#: after shifting all the unknown options into the resulting args.
- self.ignore_unknown_options = False
+ self.ignore_unknown_options: bool = False
if ctx is not None:
self.allow_interspersed_args = ctx.allow_interspersed_args
@@ -451,7 +451,7 @@ class OptionParser:
if stop:
break
- # If we got any unknown options we re-combinate the string of the
+ # If we got any unknown options we recombine the string of the
# remaining options and re-attach the prefix, then report that
# to the state as new larg. This way there is basic combinatorics
# that can be achieved while still ignoring unknown arguments.
diff --git a/libs/click/shell_completion.py b/libs/click/shell_completion.py
index c17a8e643..dc9e00b9b 100644
--- a/libs/click/shell_completion.py
+++ b/libs/click/shell_completion.py
@@ -16,7 +16,7 @@ from .utils import echo
def shell_complete(
cli: BaseCommand,
- ctx_args: t.Dict[str, t.Any],
+ ctx_args: t.MutableMapping[str, t.Any],
prog_name: str,
complete_var: str,
instruction: str,
@@ -80,9 +80,9 @@ class CompletionItem:
help: t.Optional[str] = None,
**kwargs: t.Any,
) -> None:
- self.value = value
- self.type = type
- self.help = help
+ self.value: t.Any = value
+ self.type: str = type
+ self.help: t.Optional[str] = help
self._info = kwargs
def __getattr__(self, name: str) -> t.Any:
@@ -157,17 +157,19 @@ _SOURCE_ZSH = """\
fi
}
-compdef %(complete_func)s %(prog_name)s;
+if [[ $zsh_eval_context[-1] == loadautofunc ]]; then
+ # autoload from fpath, call function directly
+ %(complete_func)s "$@"
+else
+ # eval/source/. command, register function for later
+ compdef %(complete_func)s %(prog_name)s
+fi
"""
_SOURCE_FISH = """\
function %(complete_func)s;
- set -l response;
-
- for value in (env %(complete_var)s=fish_complete COMP_WORDS=(commandline -cp) \
+ set -l response (env %(complete_var)s=fish_complete COMP_WORDS=(commandline -cp) \
COMP_CWORD=(commandline -t) %(prog_name)s);
- set response $response $value;
- end;
for completion in $response;
set -l metadata (string split "," $completion);
@@ -214,7 +216,7 @@ class ShellComplete:
def __init__(
self,
cli: BaseCommand,
- ctx_args: t.Dict[str, t.Any],
+ ctx_args: t.MutableMapping[str, t.Any],
prog_name: str,
complete_var: str,
) -> None:
@@ -228,7 +230,7 @@ class ShellComplete:
"""The name of the shell function defined by the completion
script.
"""
- safe_name = re.sub(r"\W*", "", self.prog_name.replace("-", "_"), re.ASCII)
+ safe_name = re.sub(r"\W*", "", self.prog_name.replace("-", "_"), flags=re.ASCII)
return f"_{safe_name}_completion"
def source_vars(self) -> t.Dict[str, t.Any]:
@@ -299,11 +301,12 @@ class BashComplete(ShellComplete):
name = "bash"
source_template = _SOURCE_BASH
- def _check_version(self) -> None:
+ @staticmethod
+ def _check_version() -> None:
import subprocess
output = subprocess.run(
- ["bash", "-c", "echo ${BASH_VERSION}"], stdout=subprocess.PIPE
+ ["bash", "-c", 'echo "${BASH_VERSION}"'], stdout=subprocess.PIPE
)
match = re.search(r"^(\d+)\.(\d+)\.\d+", output.stdout.decode())
@@ -311,15 +314,17 @@ class BashComplete(ShellComplete):
major, minor = match.groups()
if major < "4" or major == "4" and minor < "4":
- raise RuntimeError(
+ echo(
_(
"Shell completion is not supported for Bash"
" versions older than 4.4."
- )
+ ),
+ err=True,
)
else:
- raise RuntimeError(
- _("Couldn't detect Bash version, shell completion is not supported.")
+ echo(
+ _("Couldn't detect Bash version, shell completion is not supported."),
+ err=True,
)
def source(self) -> str:
@@ -389,6 +394,9 @@ class FishComplete(ShellComplete):
return f"{item.type},{item.value}"
+ShellCompleteType = t.TypeVar("ShellCompleteType", bound=t.Type[ShellComplete])
+
+
_available_shells: t.Dict[str, t.Type[ShellComplete]] = {
"bash": BashComplete,
"fish": FishComplete,
@@ -397,8 +405,8 @@ _available_shells: t.Dict[str, t.Type[ShellComplete]] = {
def add_completion_class(
- cls: t.Type[ShellComplete], name: t.Optional[str] = None
-) -> None:
+ cls: ShellCompleteType, name: t.Optional[str] = None
+) -> ShellCompleteType:
"""Register a :class:`ShellComplete` subclass under the given name.
The name will be provided by the completion instruction environment
variable during completion.
@@ -413,6 +421,8 @@ def add_completion_class(
_available_shells[name] = cls
+ return cls
+
def get_completion_class(shell: str) -> t.Optional[t.Type[ShellComplete]]:
"""Look up a registered :class:`ShellComplete` subclass by the name
@@ -436,7 +446,8 @@ def _is_incomplete_argument(ctx: Context, param: Parameter) -> bool:
return False
assert param.name is not None
- value = ctx.params[param.name]
+ # Will be None if expose_value is False.
+ value = ctx.params.get(param.name)
return (
param.nargs == -1
or ctx.get_parameter_source(param.name) is not ParameterSource.COMMANDLINE
@@ -482,7 +493,10 @@ def _is_incomplete_option(ctx: Context, args: t.List[str], param: Parameter) ->
def _resolve_context(
- cli: BaseCommand, ctx_args: t.Dict[str, t.Any], prog_name: str, args: t.List[str]
+ cli: BaseCommand,
+ ctx_args: t.MutableMapping[str, t.Any],
+ prog_name: str,
+ args: t.List[str],
) -> Context:
"""Produce the context hierarchy starting with the command and
traversing the complete arguments. This only follows the commands,
@@ -509,6 +523,8 @@ def _resolve_context(
ctx = cmd.make_context(name, args, parent=ctx, resilient_parsing=True)
args = ctx.protected_args + ctx.args
else:
+ sub_ctx = ctx
+
while args:
name, cmd, args = command.resolve_command(ctx, args)
diff --git a/libs/click/termui.py b/libs/click/termui.py
index bfb2f5ae6..db7a4b286 100644
--- a/libs/click/termui.py
+++ b/libs/click/termui.py
@@ -1,14 +1,12 @@
import inspect
import io
import itertools
-import os
import sys
import typing as t
from gettext import gettext as _
from ._compat import isatty
from ._compat import strip_ansi
-from ._compat import WIN
from .exceptions import Abort
from .exceptions import UsageError
from .globals import resolve_color_default
@@ -73,7 +71,7 @@ def _build_prompt(
def _format_default(default: t.Any) -> t.Any:
if isinstance(default, (io.IOBase, LazyFile)) and hasattr(default, "name"):
- return default.name # type: ignore
+ return default.name
return default
@@ -443,10 +441,9 @@ def clear() -> None:
"""
if not isatty(sys.stdout):
return
- if WIN:
- os.system("cls")
- else:
- sys.stdout.write("\033[2J\033[1;1H")
+
+ # ANSI escape \033[2J clears the screen, \033[1;1H moves the cursor
+ echo("\033[2J\033[1;1H", nl=False)
def _interpret_color(
diff --git a/libs/click/testing.py b/libs/click/testing.py
index e395c2edf..e0df0d2a6 100644
--- a/libs/click/testing.py
+++ b/libs/click/testing.py
@@ -79,11 +79,11 @@ class _NamedTextIOWrapper(io.TextIOWrapper):
def make_input_stream(
- input: t.Optional[t.Union[str, bytes, t.IO]], charset: str
+ input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]], charset: str
) -> t.BinaryIO:
# Is already an input stream.
if hasattr(input, "read"):
- rv = _find_binary_reader(t.cast(t.IO, input))
+ rv = _find_binary_reader(t.cast(t.IO[t.Any], input))
if rv is not None:
return rv
@@ -95,7 +95,7 @@ def make_input_stream(
elif isinstance(input, str):
input = input.encode(charset)
- return io.BytesIO(t.cast(bytes, input))
+ return io.BytesIO(input)
class Result:
@@ -183,7 +183,7 @@ class CliRunner:
mix_stderr: bool = True,
) -> None:
self.charset = charset
- self.env = env or {}
+ self.env: t.Mapping[str, t.Optional[str]] = env or {}
self.echo_stdin = echo_stdin
self.mix_stderr = mix_stderr
@@ -206,7 +206,7 @@ class CliRunner:
@contextlib.contextmanager
def isolation(
self,
- input: t.Optional[t.Union[str, bytes, t.IO]] = None,
+ input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None,
env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
color: bool = False,
) -> t.Iterator[t.Tuple[io.BytesIO, t.Optional[io.BytesIO]]]:
@@ -301,7 +301,7 @@ class CliRunner:
default_color = color
def should_strip_ansi(
- stream: t.Optional[t.IO] = None, color: t.Optional[bool] = None
+ stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None
) -> bool:
if color is None:
return not default_color
@@ -350,7 +350,7 @@ class CliRunner:
self,
cli: "BaseCommand",
args: t.Optional[t.Union[str, t.Sequence[str]]] = None,
- input: t.Optional[t.Union[str, bytes, t.IO]] = None,
+ input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None,
env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
catch_exceptions: bool = True,
color: bool = False,
@@ -449,7 +449,7 @@ class CliRunner:
@contextlib.contextmanager
def isolated_filesystem(
- self, temp_dir: t.Optional[t.Union[str, os.PathLike]] = None
+ self, temp_dir: t.Optional[t.Union[str, "os.PathLike[str]"]] = None
) -> t.Iterator[str]:
"""A context manager that creates a temporary directory and
changes the current working directory to it. This isolates tests
@@ -464,11 +464,11 @@ class CliRunner:
Added the ``temp_dir`` parameter.
"""
cwd = os.getcwd()
- dt = tempfile.mkdtemp(dir=temp_dir) # type: ignore[type-var]
+ dt = tempfile.mkdtemp(dir=temp_dir)
os.chdir(dt)
try:
- yield t.cast(str, dt)
+ yield dt
finally:
os.chdir(cwd)
diff --git a/libs/click/types.py b/libs/click/types.py
index b45ee53d0..2b1d1797f 100644
--- a/libs/click/types.py
+++ b/libs/click/types.py
@@ -1,14 +1,15 @@
import os
import stat
+import sys
import typing as t
from datetime import datetime
from gettext import gettext as _
from gettext import ngettext
from ._compat import _get_argv_encoding
-from ._compat import get_filesystem_encoding
from ._compat import open_stream
from .exceptions import BadParameter
+from .utils import format_filename
from .utils import LazyFile
from .utils import safecall
@@ -162,7 +163,7 @@ class CompositeParamType(ParamType):
class FuncParamType(ParamType):
def __init__(self, func: t.Callable[[t.Any], t.Any]) -> None:
- self.name = func.__name__
+ self.name: str = func.__name__
self.func = func
def to_info_dict(self) -> t.Dict[str, t.Any]:
@@ -207,7 +208,7 @@ class StringParamType(ParamType):
try:
value = value.decode(enc)
except UnicodeError:
- fs_enc = get_filesystem_encoding()
+ fs_enc = sys.getfilesystemencoding()
if fs_enc != enc:
try:
value = value.decode(fs_enc)
@@ -353,7 +354,11 @@ class DateTime(ParamType):
name = "datetime"
def __init__(self, formats: t.Optional[t.Sequence[str]] = None):
- self.formats = formats or ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"]
+ self.formats: t.Sequence[str] = formats or [
+ "%Y-%m-%d",
+ "%Y-%m-%dT%H:%M:%S",
+ "%Y-%m-%d %H:%M:%S",
+ ]
def to_info_dict(self) -> t.Dict[str, t.Any]:
info_dict = super().to_info_dict()
@@ -397,7 +402,7 @@ class DateTime(ParamType):
class _NumberParamTypeBase(ParamType):
- _number_class: t.ClassVar[t.Type]
+ _number_class: t.ClassVar[t.Type[t.Any]]
def convert(
self, value: t.Any, param: t.Optional["Parameter"], ctx: t.Optional["Context"]
@@ -662,7 +667,7 @@ class File(ParamType):
"""
name = "filename"
- envvar_list_splitter = os.path.pathsep
+ envvar_list_splitter: t.ClassVar[str] = os.path.pathsep
def __init__(
self,
@@ -683,36 +688,38 @@ class File(ParamType):
info_dict.update(mode=self.mode, encoding=self.encoding)
return info_dict
- def resolve_lazy_flag(self, value: t.Any) -> bool:
+ def resolve_lazy_flag(self, value: "t.Union[str, os.PathLike[str]]") -> bool:
if self.lazy is not None:
return self.lazy
- if value == "-":
+ if os.fspath(value) == "-":
return False
elif "w" in self.mode:
return True
return False
def convert(
- self, value: t.Any, param: t.Optional["Parameter"], ctx: t.Optional["Context"]
- ) -> t.Any:
- try:
- if hasattr(value, "read") or hasattr(value, "write"):
- return value
+ self,
+ value: t.Union[str, "os.PathLike[str]", t.IO[t.Any]],
+ param: t.Optional["Parameter"],
+ ctx: t.Optional["Context"],
+ ) -> t.IO[t.Any]:
+ if _is_file_like(value):
+ return value
+
+ value = t.cast("t.Union[str, os.PathLike[str]]", value)
+ try:
lazy = self.resolve_lazy_flag(value)
if lazy:
- f: t.IO = t.cast(
- t.IO,
- LazyFile(
- value, self.mode, self.encoding, self.errors, atomic=self.atomic
- ),
+ lf = LazyFile(
+ value, self.mode, self.encoding, self.errors, atomic=self.atomic
)
if ctx is not None:
- ctx.call_on_close(f.close_intelligently) # type: ignore
+ ctx.call_on_close(lf.close_intelligently)
- return f
+ return t.cast(t.IO[t.Any], lf)
f, should_close = open_stream(
value, self.mode, self.encoding, self.errors, atomic=self.atomic
@@ -731,7 +738,7 @@ class File(ParamType):
return f
except OSError as e: # noqa: B014
- self.fail(f"'{os.fsdecode(value)}': {e.strerror}", param, ctx)
+ self.fail(f"'{format_filename(value)}': {e.strerror}", param, ctx)
def shell_complete(
self, ctx: "Context", param: "Parameter", incomplete: str
@@ -750,6 +757,10 @@ class File(ParamType):
return [CompletionItem(incomplete, type="file")]
+def _is_file_like(value: t.Any) -> "te.TypeGuard[t.IO[t.Any]]":
+ return hasattr(value, "read") or hasattr(value, "write")
+
+
class Path(ParamType):
"""The ``Path`` type is similar to the :class:`File` type, but
returns the filename instead of an open file. Various checks can be
@@ -777,13 +788,13 @@ class Path(ParamType):
Added the ``executable`` parameter.
.. versionchanged:: 8.0
- Allow passing ``type=pathlib.Path``.
+ Allow passing ``path_type=pathlib.Path``.
.. versionchanged:: 6.0
Added the ``allow_dash`` parameter.
"""
- envvar_list_splitter = os.path.pathsep
+ envvar_list_splitter: t.ClassVar[str] = os.path.pathsep
def __init__(
self,
@@ -794,7 +805,7 @@ class Path(ParamType):
readable: bool = True,
resolve_path: bool = False,
allow_dash: bool = False,
- path_type: t.Optional[t.Type] = None,
+ path_type: t.Optional[t.Type[t.Any]] = None,
executable: bool = False,
):
self.exists = exists
@@ -808,7 +819,7 @@ class Path(ParamType):
self.type = path_type
if self.file_okay and not self.dir_okay:
- self.name = _("file")
+ self.name: str = _("file")
elif self.dir_okay and not self.file_okay:
self.name = _("directory")
else:
@@ -826,20 +837,25 @@ class Path(ParamType):
)
return info_dict
- def coerce_path_result(self, rv: t.Any) -> t.Any:
- if self.type is not None and not isinstance(rv, self.type):
+ def coerce_path_result(
+ self, value: "t.Union[str, os.PathLike[str]]"
+ ) -> "t.Union[str, bytes, os.PathLike[str]]":
+ if self.type is not None and not isinstance(value, self.type):
if self.type is str:
- rv = os.fsdecode(rv)
+ return os.fsdecode(value)
elif self.type is bytes:
- rv = os.fsencode(rv)
+ return os.fsencode(value)
else:
- rv = self.type(rv)
+ return t.cast("os.PathLike[str]", self.type(value))
- return rv
+ return value
def convert(
- self, value: t.Any, param: t.Optional["Parameter"], ctx: t.Optional["Context"]
- ) -> t.Any:
+ self,
+ value: "t.Union[str, os.PathLike[str]]",
+ param: t.Optional["Parameter"],
+ ctx: t.Optional["Context"],
+ ) -> "t.Union[str, bytes, os.PathLike[str]]":
rv = value
is_dash = self.file_okay and self.allow_dash and rv in (b"-", "-")
@@ -859,7 +875,7 @@ class Path(ParamType):
return self.coerce_path_result(rv)
self.fail(
_("{name} {filename!r} does not exist.").format(
- name=self.name.title(), filename=os.fsdecode(value)
+ name=self.name.title(), filename=format_filename(value)
),
param,
ctx,
@@ -868,7 +884,7 @@ class Path(ParamType):
if not self.file_okay and stat.S_ISREG(st.st_mode):
self.fail(
_("{name} {filename!r} is a file.").format(
- name=self.name.title(), filename=os.fsdecode(value)
+ name=self.name.title(), filename=format_filename(value)
),
param,
ctx,
@@ -876,7 +892,7 @@ class Path(ParamType):
if not self.dir_okay and stat.S_ISDIR(st.st_mode):
self.fail(
_("{name} '{filename}' is a directory.").format(
- name=self.name.title(), filename=os.fsdecode(value)
+ name=self.name.title(), filename=format_filename(value)
),
param,
ctx,
@@ -885,7 +901,7 @@ class Path(ParamType):
if self.readable and not os.access(rv, os.R_OK):
self.fail(
_("{name} {filename!r} is not readable.").format(
- name=self.name.title(), filename=os.fsdecode(value)
+ name=self.name.title(), filename=format_filename(value)
),
param,
ctx,
@@ -894,7 +910,7 @@ class Path(ParamType):
if self.writable and not os.access(rv, os.W_OK):
self.fail(
_("{name} {filename!r} is not writable.").format(
- name=self.name.title(), filename=os.fsdecode(value)
+ name=self.name.title(), filename=format_filename(value)
),
param,
ctx,
@@ -903,7 +919,7 @@ class Path(ParamType):
if self.executable and not os.access(value, os.X_OK):
self.fail(
_("{name} {filename!r} is not executable.").format(
- name=self.name.title(), filename=os.fsdecode(value)
+ name=self.name.title(), filename=format_filename(value)
),
param,
ctx,
@@ -944,8 +960,8 @@ class Tuple(CompositeParamType):
:param types: a list of types that should be used for the tuple items.
"""
- def __init__(self, types: t.Sequence[t.Union[t.Type, ParamType]]) -> None:
- self.types = [convert_type(ty) for ty in types]
+ def __init__(self, types: t.Sequence[t.Union[t.Type[t.Any], ParamType]]) -> None:
+ self.types: t.Sequence[ParamType] = [convert_type(ty) for ty in types]
def to_info_dict(self) -> t.Dict[str, t.Any]:
info_dict = super().to_info_dict()
diff --git a/libs/click/utils.py b/libs/click/utils.py
index 8283788ac..d536434f0 100644
--- a/libs/click/utils.py
+++ b/libs/click/utils.py
@@ -4,13 +4,13 @@ import sys
import typing as t
from functools import update_wrapper
from types import ModuleType
+from types import TracebackType
from ._compat import _default_text_stderr
from ._compat import _default_text_stdout
from ._compat import _find_binary_writer
from ._compat import auto_wrap_for_ansi
from ._compat import binary_streams
-from ._compat import get_filesystem_encoding
from ._compat import open_stream
from ._compat import should_strip_ansi
from ._compat import strip_ansi
@@ -21,30 +21,33 @@ from .globals import resolve_color_default
if t.TYPE_CHECKING:
import typing_extensions as te
-F = t.TypeVar("F", bound=t.Callable[..., t.Any])
+ P = te.ParamSpec("P")
+
+R = t.TypeVar("R")
def _posixify(name: str) -> str:
return "-".join(name.split()).lower()
-def safecall(func: F) -> F:
+def safecall(func: "t.Callable[P, R]") -> "t.Callable[P, t.Optional[R]]":
"""Wraps a function so that it swallows exceptions."""
- def wrapper(*args, **kwargs): # type: ignore
+ def wrapper(*args: "P.args", **kwargs: "P.kwargs") -> t.Optional[R]:
try:
return func(*args, **kwargs)
except Exception:
pass
+ return None
- return update_wrapper(t.cast(F, wrapper), func)
+ return update_wrapper(wrapper, func)
def make_str(value: t.Any) -> str:
"""Converts a value into a valid string."""
if isinstance(value, bytes):
try:
- return value.decode(get_filesystem_encoding())
+ return value.decode(sys.getfilesystemencoding())
except UnicodeError:
return value.decode("utf-8", "replace")
return str(value)
@@ -109,20 +112,21 @@ class LazyFile:
def __init__(
self,
- filename: str,
+ filename: t.Union[str, "os.PathLike[str]"],
mode: str = "r",
encoding: t.Optional[str] = None,
errors: t.Optional[str] = "strict",
atomic: bool = False,
):
- self.name = filename
+ self.name: str = os.fspath(filename)
self.mode = mode
self.encoding = encoding
self.errors = errors
self.atomic = atomic
- self._f: t.Optional[t.IO]
+ self._f: t.Optional[t.IO[t.Any]]
+ self.should_close: bool
- if filename == "-":
+ if self.name == "-":
self._f, self.should_close = open_stream(filename, mode, encoding, errors)
else:
if "r" in mode:
@@ -139,9 +143,9 @@ class LazyFile:
def __repr__(self) -> str:
if self._f is not None:
return repr(self._f)
- return f"<unopened file '{self.name}' {self.mode}>"
+ return f"<unopened file '{format_filename(self.name)}' {self.mode}>"
- def open(self) -> t.IO:
+ def open(self) -> t.IO[t.Any]:
"""Opens the file if it's not yet open. This call might fail with
a :exc:`FileError`. Not handling this error will produce an error
that Click shows.
@@ -174,7 +178,12 @@ class LazyFile:
def __enter__(self) -> "LazyFile":
return self
- def __exit__(self, exc_type, exc_value, tb): # type: ignore
+ def __exit__(
+ self,
+ exc_type: t.Optional[t.Type[BaseException]],
+ exc_value: t.Optional[BaseException],
+ tb: t.Optional[TracebackType],
+ ) -> None:
self.close_intelligently()
def __iter__(self) -> t.Iterator[t.AnyStr]:
@@ -183,8 +192,8 @@ class LazyFile:
class KeepOpenFile:
- def __init__(self, file: t.IO) -> None:
- self._file = file
+ def __init__(self, file: t.IO[t.Any]) -> None:
+ self._file: t.IO[t.Any] = file
def __getattr__(self, name: str) -> t.Any:
return getattr(self._file, name)
@@ -192,7 +201,12 @@ class KeepOpenFile:
def __enter__(self) -> "KeepOpenFile":
return self
- def __exit__(self, exc_type, exc_value, tb): # type: ignore
+ def __exit__(
+ self,
+ exc_type: t.Optional[t.Type[BaseException]],
+ exc_value: t.Optional[BaseException],
+ tb: t.Optional[TracebackType],
+ ) -> None:
pass
def __repr__(self) -> str:
@@ -253,6 +267,11 @@ def echo(
else:
file = _default_text_stdout()
+ # There are no standard streams attached to write to. For example,
+ # pythonw on Windows.
+ if file is None:
+ return
+
# Convert non bytes/text into the native string type.
if message is not None and not isinstance(message, (str, bytes, bytearray)):
out: t.Optional[t.Union[str, bytes]] = str(message)
@@ -340,7 +359,7 @@ def open_file(
errors: t.Optional[str] = "strict",
lazy: bool = False,
atomic: bool = False,
-) -> t.IO:
+) -> t.IO[t.Any]:
"""Open a file, with extra behavior to handle ``'-'`` to indicate
a standard stream, lazy open on write, and atomic write. Similar to
the behavior of the :class:`~click.File` param type.
@@ -370,24 +389,39 @@ def open_file(
.. versionadded:: 3.0
"""
if lazy:
- return t.cast(t.IO, LazyFile(filename, mode, encoding, errors, atomic=atomic))
+ return t.cast(
+ t.IO[t.Any], LazyFile(filename, mode, encoding, errors, atomic=atomic)
+ )
f, should_close = open_stream(filename, mode, encoding, errors, atomic=atomic)
if not should_close:
- f = t.cast(t.IO, KeepOpenFile(f))
+ f = t.cast(t.IO[t.Any], KeepOpenFile(f))
return f
def format_filename(
- filename: t.Union[str, bytes, os.PathLike], shorten: bool = False
+ filename: "t.Union[str, bytes, os.PathLike[str], os.PathLike[bytes]]",
+ shorten: bool = False,
) -> str:
- """Formats a filename for user display. The main purpose of this
- function is to ensure that the filename can be displayed at all. This
- will decode the filename to unicode if necessary in a way that it will
- not fail. Optionally, it can shorten the filename to not include the
- full path to the filename.
+ """Format a filename as a string for display. Ensures the filename can be
+ displayed by replacing any invalid bytes or surrogate escapes in the name
+ with the replacement character ``�``.
+
+ Invalid bytes or surrogate escapes will raise an error when written to a
+ stream with ``errors="strict". This will typically happen with ``stdout``
+ when the locale is something like ``en_GB.UTF-8``.
+
+ Many scenarios *are* safe to write surrogates though, due to PEP 538 and
+ PEP 540, including:
+
+ - Writing to ``stderr``, which uses ``errors="backslashreplace"``.
+ - The system has ``LANG=C.UTF-8``, ``C``, or ``POSIX``. Python opens
+ stdout and stderr with ``errors="surrogateescape"``.
+ - None of ``LANG/LC_*`` are set. Python assumes ``LANG=C.UTF-8``.
+ - Python is started in UTF-8 mode with ``PYTHONUTF8=1`` or ``-X utf8``.
+ Python opens stdout and stderr with ``errors="surrogateescape"``.
:param filename: formats a filename for UI display. This will also convert
the filename into unicode without failing.
@@ -396,8 +430,17 @@ def format_filename(
"""
if shorten:
filename = os.path.basename(filename)
+ else:
+ filename = os.fspath(filename)
+
+ if isinstance(filename, bytes):
+ filename = filename.decode(sys.getfilesystemencoding(), "replace")
+ else:
+ filename = filename.encode("utf-8", "surrogateescape").decode(
+ "utf-8", "replace"
+ )
- return os.fsdecode(filename)
+ return filename
def get_app_dir(app_name: str, roaming: bool = True, force_posix: bool = False) -> str:
@@ -425,7 +468,7 @@ def get_app_dir(app_name: str, roaming: bool = True, force_posix: bool = False)
:param app_name: the application name. This should be properly capitalized
and can contain whitespace.
:param roaming: controls if the folder should be roaming or not on Windows.
- Has no affect otherwise.
+ Has no effect otherwise.
:param force_posix: if this is set to `True` then on any POSIX system the
folder will be stored in the home folder with a leading
dot instead of the XDG config home or darwin's
@@ -458,7 +501,7 @@ class PacifyFlushWrapper:
pipe, all calls and attributes are proxied.
"""
- def __init__(self, wrapped: t.IO) -> None:
+ def __init__(self, wrapped: t.IO[t.Any]) -> None:
self.wrapped = wrapped
def flush(self) -> None:
@@ -506,7 +549,8 @@ def _detect_program_name(
# The value of __package__ indicates how Python was called. It may
# not exist if a setuptools script is installed as an egg. It may be
# set incorrectly for entry points created with pip on Windows.
- if getattr(_main, "__package__", None) is None or (
+ # It is set to "" inside a Shiv or PEX zipapp.
+ if getattr(_main, "__package__", None) in {None, ""} or (
os.name == "nt"
and _main.__package__ == ""
and not os.path.exists(path)