diff options
author | morpheus65535 <[email protected]> | 2024-03-03 12:15:23 -0500 |
---|---|---|
committer | GitHub <[email protected]> | 2024-03-03 12:15:23 -0500 |
commit | 03afeb347075381bcb7fd6036295c9fa4a90d2dc (patch) | |
tree | 7c5d72c973d2c8e4ade57391a1c9ad5e94903a46 /libs/click | |
parent | 9ae684240b5bdd40a870d8122f0e380f8d03a187 (diff) | |
download | bazarr-03afeb347075381bcb7fd6036295c9fa4a90d2dc.tar.gz bazarr-03afeb347075381bcb7fd6036295c9fa4a90d2dc.zip |
Updated multiple Python modules (now in libs and custom_libs directories) and React libraries
Diffstat (limited to 'libs/click')
-rw-r--r-- | libs/click/__init__.py | 2 | ||||
-rw-r--r-- | libs/click/_compat.py | 67 | ||||
-rw-r--r-- | libs/click/_termui_impl.py | 42 | ||||
-rw-r--r-- | libs/click/core.py | 126 | ||||
-rw-r--r-- | libs/click/decorators.py | 178 | ||||
-rw-r--r-- | libs/click/exceptions.py | 13 | ||||
-rw-r--r-- | libs/click/parser.py | 10 | ||||
-rw-r--r-- | libs/click/shell_completion.py | 60 | ||||
-rw-r--r-- | libs/click/termui.py | 11 | ||||
-rw-r--r-- | libs/click/testing.py | 20 | ||||
-rw-r--r-- | libs/click/types.py | 98 | ||||
-rw-r--r-- | libs/click/utils.py | 102 |
12 files changed, 465 insertions, 264 deletions
diff --git a/libs/click/__init__.py b/libs/click/__init__.py index e3ef423b6..9a1dab048 100644 --- a/libs/click/__init__.py +++ b/libs/click/__init__.py @@ -70,4 +70,4 @@ from .utils import get_binary_stream as get_binary_stream from .utils import get_text_stream as get_text_stream from .utils import open_file as open_file -__version__ = "8.1.3" +__version__ = "8.1.7" diff --git a/libs/click/_compat.py b/libs/click/_compat.py index 766d286be..23f886659 100644 --- a/libs/click/_compat.py +++ b/libs/click/_compat.py @@ -7,20 +7,11 @@ import typing as t from weakref import WeakKeyDictionary CYGWIN = sys.platform.startswith("cygwin") -MSYS2 = sys.platform.startswith("win") and ("GCC" in sys.version) -# Determine local App Engine environment, per Google's own suggestion -APP_ENGINE = "APPENGINE_RUNTIME" in os.environ and "Development/" in os.environ.get( - "SERVER_SOFTWARE", "" -) -WIN = sys.platform.startswith("win") and not APP_ENGINE and not MSYS2 +WIN = sys.platform.startswith("win") auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None _ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]") -def get_filesystem_encoding() -> str: - return sys.getfilesystemencoding() or sys.getdefaultencoding() - - def _make_text_stream( stream: t.BinaryIO, encoding: t.Optional[str], @@ -50,7 +41,7 @@ def is_ascii_encoding(encoding: str) -> bool: return False -def get_best_encoding(stream: t.IO) -> str: +def get_best_encoding(stream: t.IO[t.Any]) -> str: """Returns the default stream encoding if not found.""" rv = getattr(stream, "encoding", None) or sys.getdefaultencoding() if is_ascii_encoding(rv): @@ -153,7 +144,7 @@ class _FixupStream: return True -def _is_binary_reader(stream: t.IO, default: bool = False) -> bool: +def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool: try: return isinstance(stream.read(0), bytes) except Exception: @@ -162,7 +153,7 @@ def _is_binary_reader(stream: t.IO, default: bool = False) -> bool: # closed. In this case, we assume the default. -def _is_binary_writer(stream: t.IO, default: bool = False) -> bool: +def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool: try: stream.write(b"") except Exception: @@ -175,7 +166,7 @@ def _is_binary_writer(stream: t.IO, default: bool = False) -> bool: return True -def _find_binary_reader(stream: t.IO) -> t.Optional[t.BinaryIO]: +def _find_binary_reader(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]: # We need to figure out if the given stream is already binary. # This can happen because the official docs recommend detaching # the streams to get binary streams. Some code might do this, so @@ -193,7 +184,7 @@ def _find_binary_reader(stream: t.IO) -> t.Optional[t.BinaryIO]: return None -def _find_binary_writer(stream: t.IO) -> t.Optional[t.BinaryIO]: +def _find_binary_writer(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]: # We need to figure out if the given stream is already binary. # This can happen because the official docs recommend detaching # the streams to get binary streams. Some code might do this, so @@ -241,11 +232,11 @@ def _is_compatible_text_stream( def _force_correct_text_stream( - text_stream: t.IO, + text_stream: t.IO[t.Any], encoding: t.Optional[str], errors: t.Optional[str], - is_binary: t.Callable[[t.IO, bool], bool], - find_binary: t.Callable[[t.IO], t.Optional[t.BinaryIO]], + is_binary: t.Callable[[t.IO[t.Any], bool], bool], + find_binary: t.Callable[[t.IO[t.Any]], t.Optional[t.BinaryIO]], force_readable: bool = False, force_writable: bool = False, ) -> t.TextIO: @@ -287,7 +278,7 @@ def _force_correct_text_stream( def _force_correct_text_reader( - text_reader: t.IO, + text_reader: t.IO[t.Any], encoding: t.Optional[str], errors: t.Optional[str], force_readable: bool = False, @@ -303,7 +294,7 @@ def _force_correct_text_reader( def _force_correct_text_writer( - text_writer: t.IO, + text_writer: t.IO[t.Any], encoding: t.Optional[str], errors: t.Optional[str], force_writable: bool = False, @@ -367,11 +358,11 @@ def get_text_stderr( def _wrap_io_open( - file: t.Union[str, os.PathLike, int], + file: t.Union[str, "os.PathLike[str]", int], mode: str, encoding: t.Optional[str], errors: t.Optional[str], -) -> t.IO: +) -> t.IO[t.Any]: """Handles not passing ``encoding`` and ``errors`` in binary mode.""" if "b" in mode: return open(file, mode) @@ -380,13 +371,14 @@ def _wrap_io_open( def open_stream( - filename: str, + filename: "t.Union[str, os.PathLike[str]]", mode: str = "r", encoding: t.Optional[str] = None, errors: t.Optional[str] = "strict", atomic: bool = False, -) -> t.Tuple[t.IO, bool]: +) -> t.Tuple[t.IO[t.Any], bool]: binary = "b" in mode + filename = os.fspath(filename) # Standard streams first. These are simple because they ignore the # atomic flag. Use fsdecode to handle Path("-"). @@ -456,11 +448,11 @@ def open_stream( f = _wrap_io_open(fd, mode, encoding, errors) af = _AtomicFile(f, tmp_filename, os.path.realpath(filename)) - return t.cast(t.IO, af), True + return t.cast(t.IO[t.Any], af), True class _AtomicFile: - def __init__(self, f: t.IO, tmp_filename: str, real_filename: str) -> None: + def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None: self._f = f self._tmp_filename = tmp_filename self._real_filename = real_filename @@ -483,7 +475,7 @@ class _AtomicFile: def __enter__(self) -> "_AtomicFile": return self - def __exit__(self, exc_type, exc_value, tb): # type: ignore + def __exit__(self, exc_type: t.Optional[t.Type[BaseException]], *_: t.Any) -> None: self.close(delete=exc_type is not None) def __repr__(self) -> str: @@ -494,7 +486,7 @@ def strip_ansi(value: str) -> str: return _ansi_re.sub("", value) -def _is_jupyter_kernel_output(stream: t.IO) -> bool: +def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool: while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)): stream = stream._stream @@ -502,7 +494,7 @@ def _is_jupyter_kernel_output(stream: t.IO) -> bool: def should_strip_ansi( - stream: t.Optional[t.IO] = None, color: t.Optional[bool] = None + stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None ) -> bool: if color is None: if stream is None: @@ -524,7 +516,7 @@ if sys.platform.startswith("win") and WIN: _ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() - def auto_wrap_for_ansi( + def auto_wrap_for_ansi( # noqa: F811 stream: t.TextIO, color: t.Optional[bool] = None ) -> t.TextIO: """Support ANSI color and style codes on Windows by wrapping a @@ -564,7 +556,7 @@ if sys.platform.startswith("win") and WIN: else: def _get_argv_encoding() -> str: - return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding() + return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding() def _get_windows_console_stream( f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str] @@ -576,7 +568,7 @@ def term_len(x: str) -> int: return len(strip_ansi(x)) -def isatty(stream: t.IO) -> bool: +def isatty(stream: t.IO[t.Any]) -> bool: try: return stream.isatty() except Exception: @@ -584,12 +576,17 @@ def isatty(stream: t.IO) -> bool: def _make_cached_stream_func( - src_func: t.Callable[[], t.TextIO], wrapper_func: t.Callable[[], t.TextIO] -) -> t.Callable[[], t.TextIO]: + src_func: t.Callable[[], t.Optional[t.TextIO]], + wrapper_func: t.Callable[[], t.TextIO], +) -> t.Callable[[], t.Optional[t.TextIO]]: cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() - def func() -> t.TextIO: + def func() -> t.Optional[t.TextIO]: stream = src_func() + + if stream is None: + return None + try: rv = cache.get(stream) except Exception: diff --git a/libs/click/_termui_impl.py b/libs/click/_termui_impl.py index 4b979bcc1..f74465775 100644 --- a/libs/click/_termui_impl.py +++ b/libs/click/_termui_impl.py @@ -10,6 +10,8 @@ import sys import time import typing as t from gettext import gettext as _ +from io import StringIO +from types import TracebackType from ._compat import _default_text_stdout from ._compat import CYGWIN @@ -59,15 +61,22 @@ class ProgressBar(t.Generic[V]): self.show_percent = show_percent self.show_pos = show_pos self.item_show_func = item_show_func - self.label = label or "" + self.label: str = label or "" + if file is None: file = _default_text_stdout() + + # There are no standard streams attached to write to. For example, + # pythonw on Windows. + if file is None: + file = StringIO() + self.file = file self.color = color self.update_min_steps = update_min_steps self._completed_intervals = 0 - self.width = width - self.autowidth = width == 0 + self.width: int = width + self.autowidth: bool = width == 0 if length is None: from operator import length_hint @@ -80,25 +89,32 @@ class ProgressBar(t.Generic[V]): if length is None: raise TypeError("iterable or length is required") iterable = t.cast(t.Iterable[V], range(length)) - self.iter = iter(iterable) + self.iter: t.Iterable[V] = iter(iterable) self.length = length self.pos = 0 self.avg: t.List[float] = [] + self.last_eta: float + self.start: float self.start = self.last_eta = time.time() - self.eta_known = False - self.finished = False + self.eta_known: bool = False + self.finished: bool = False self.max_width: t.Optional[int] = None - self.entered = False + self.entered: bool = False self.current_item: t.Optional[V] = None - self.is_hidden = not isatty(self.file) + self.is_hidden: bool = not isatty(self.file) self._last_line: t.Optional[str] = None - def __enter__(self) -> "ProgressBar": + def __enter__(self) -> "ProgressBar[V]": self.entered = True self.render_progress() return self - def __exit__(self, exc_type, exc_value, tb): # type: ignore + def __exit__( + self, + exc_type: t.Optional[t.Type[BaseException]], + exc_value: t.Optional[BaseException], + tb: t.Optional[TracebackType], + ) -> None: self.render_finish() def __iter__(self) -> t.Iterator[V]: @@ -344,6 +360,12 @@ class ProgressBar(t.Generic[V]): def pager(generator: t.Iterable[str], color: t.Optional[bool] = None) -> None: """Decide what method to use for paging through text.""" stdout = _default_text_stdout() + + # There are no standard streams attached to write to. For example, + # pythonw on Windows. + if stdout is None: + stdout = StringIO() + if not isatty(sys.stdin) or not isatty(stdout): return _nullpager(stdout, generator, color) pager_cmd = (os.environ.get("PAGER", None) or "").strip() diff --git a/libs/click/core.py b/libs/click/core.py index 5abfb0f3c..cc65e896b 100644 --- a/libs/click/core.py +++ b/libs/click/core.py @@ -7,11 +7,11 @@ import typing as t from collections import abc from contextlib import contextmanager from contextlib import ExitStack -from functools import partial from functools import update_wrapper from gettext import gettext as _ from gettext import ngettext from itertools import repeat +from types import TracebackType from . import types from .exceptions import Abort @@ -264,7 +264,7 @@ class Context: info_name: t.Optional[str] = None, obj: t.Optional[t.Any] = None, auto_envvar_prefix: t.Optional[str] = None, - default_map: t.Optional[t.Dict[str, t.Any]] = None, + default_map: t.Optional[t.MutableMapping[str, t.Any]] = None, terminal_width: t.Optional[int] = None, max_content_width: t.Optional[int] = None, resilient_parsing: bool = False, @@ -311,7 +311,7 @@ class Context: ): default_map = parent.default_map.get(info_name) - self.default_map: t.Optional[t.Dict[str, t.Any]] = default_map + self.default_map: t.Optional[t.MutableMapping[str, t.Any]] = default_map #: This flag indicates if a subcommand is going to be executed. A #: group callback can use this information to figure out if it's @@ -455,7 +455,12 @@ class Context: push_context(self) return self - def __exit__(self, exc_type, exc_value, tb): # type: ignore + def __exit__( + self, + exc_type: t.Optional[t.Type[BaseException]], + exc_value: t.Optional[BaseException], + tb: t.Optional[TracebackType], + ) -> None: self._depth -= 1 if self._depth == 0: self.close() @@ -706,12 +711,30 @@ class Context: """ return type(self)(command, info_name=command.name, parent=self) + @t.overload def invoke( __self, # noqa: B902 - __callback: t.Union["Command", t.Callable[..., t.Any]], + __callback: "t.Callable[..., V]", + *args: t.Any, + **kwargs: t.Any, + ) -> V: + ... + + @t.overload + def invoke( + __self, # noqa: B902 + __callback: "Command", *args: t.Any, **kwargs: t.Any, ) -> t.Any: + ... + + def invoke( + __self, # noqa: B902 + __callback: t.Union["Command", "t.Callable[..., V]"], + *args: t.Any, + **kwargs: t.Any, + ) -> t.Union[t.Any, V]: """Invokes a command callback in exactly the way it expects. There are two ways to invoke this method: @@ -739,7 +762,7 @@ class Context: "The given command does not have a callback that can be invoked." ) else: - __callback = other_cmd.callback + __callback = t.cast("t.Callable[..., V]", other_cmd.callback) ctx = __self._make_sub_context(other_cmd) @@ -844,7 +867,7 @@ class BaseCommand: def __init__( self, name: t.Optional[str], - context_settings: t.Optional[t.Dict[str, t.Any]] = None, + context_settings: t.Optional[t.MutableMapping[str, t.Any]] = None, ) -> None: #: the name the command thinks it has. Upon registering a command #: on a :class:`Group` the group will default the command name @@ -856,7 +879,7 @@ class BaseCommand: context_settings = {} #: an optional dictionary with defaults passed to the context. - self.context_settings: t.Dict[str, t.Any] = context_settings + self.context_settings: t.MutableMapping[str, t.Any] = context_settings def to_info_dict(self, ctx: Context) -> t.Dict[str, t.Any]: """Gather information that could be useful for a tool generating @@ -898,7 +921,7 @@ class BaseCommand: :param info_name: the info name for this invocation. Generally this is the most descriptive name for the script or command. For the toplevel script it's usually - the name of the script, for commands below it it's + the name of the script, for commands below it's the name of the command. :param args: the arguments to parse as list of strings. :param parent: the parent context if available. @@ -931,7 +954,7 @@ class BaseCommand: """Given a context, this invokes the command. The default implementation is raising a not implemented error. """ - raise NotImplementedError("Base commands are not invokable by default") + raise NotImplementedError("Base commands are not invocable by default") def shell_complete(self, ctx: Context, incomplete: str) -> t.List["CompletionItem"]: """Return a list of completions for the incomplete value. Looks @@ -1063,9 +1086,9 @@ class BaseCommand: # even always obvious that `rv` indicates success/failure # by its truthiness/falsiness ctx.exit() - except (EOFError, KeyboardInterrupt): + except (EOFError, KeyboardInterrupt) as e: echo(file=sys.stderr) - raise Abort() from None + raise Abort() from e except ClickException as e: if not standalone_mode: raise @@ -1099,7 +1122,7 @@ class BaseCommand: def _main_shell_completion( self, - ctx_args: t.Dict[str, t.Any], + ctx_args: t.MutableMapping[str, t.Any], prog_name: str, complete_var: t.Optional[str] = None, ) -> None: @@ -1111,9 +1134,13 @@ class BaseCommand: :param complete_var: Name of the environment variable that holds the completion instruction. Defaults to ``_{PROG_NAME}_COMPLETE``. + + .. versionchanged:: 8.2.0 + Dots (``.``) in ``prog_name`` are replaced with underscores (``_``). """ if complete_var is None: - complete_var = f"_{prog_name}_COMPLETE".replace("-", "_").upper() + complete_name = prog_name.replace("-", "_").replace(".", "_") + complete_var = f"_{complete_name}_COMPLETE".upper() instruction = os.environ.get(complete_var) @@ -1175,7 +1202,7 @@ class Command(BaseCommand): def __init__( self, name: t.Optional[str], - context_settings: t.Optional[t.Dict[str, t.Any]] = None, + context_settings: t.Optional[t.MutableMapping[str, t.Any]] = None, callback: t.Optional[t.Callable[..., t.Any]] = None, params: t.Optional[t.List["Parameter"]] = None, help: t.Optional[str] = None, @@ -1333,13 +1360,16 @@ class Command(BaseCommand): def format_help_text(self, ctx: Context, formatter: HelpFormatter) -> None: """Writes the help text to the formatter if it exists.""" - text = self.help if self.help is not None else "" + if self.help is not None: + # truncate the help text to the first form feed + text = inspect.cleandoc(self.help).partition("\f")[0] + else: + text = "" if self.deprecated: text = _("(Deprecated) {text}").format(text=text) if text: - text = inspect.cleandoc(text).partition("\f")[0] formatter.write_paragraph() with formatter.indentation(): @@ -1462,6 +1492,7 @@ class MultiCommand(Command): :param result_callback: The result callback to attach to this multi command. This can be set or changed later with the :meth:`result_callback` decorator. + :param attrs: Other command arguments described in :class:`Command`. """ allow_extra_args = True @@ -1569,7 +1600,7 @@ class MultiCommand(Command): return f def function(__value, *args, **kwargs): # type: ignore - inner = old_callback(__value, *args, **kwargs) # type: ignore + inner = old_callback(__value, *args, **kwargs) return f(inner, *args, **kwargs) self._result_callback = rv = update_wrapper(t.cast(F, function), f) @@ -1760,7 +1791,7 @@ class Group(MultiCommand): :class:`BaseCommand`. .. versionchanged:: 8.0 - The ``commmands`` argument can be a list of command objects. + The ``commands`` argument can be a list of command objects. """ #: If set, this is used by the group's :meth:`command` decorator @@ -1786,7 +1817,9 @@ class Group(MultiCommand): def __init__( self, name: t.Optional[str] = None, - commands: t.Optional[t.Union[t.Dict[str, Command], t.Sequence[Command]]] = None, + commands: t.Optional[ + t.Union[t.MutableMapping[str, Command], t.Sequence[Command]] + ] = None, **attrs: t.Any, ) -> None: super().__init__(name, **attrs) @@ -1797,7 +1830,7 @@ class Group(MultiCommand): commands = {c.name: c for c in commands if c.name is not None} #: The registered subcommands by their exported names. - self.commands: t.Dict[str, Command] = commands + self.commands: t.MutableMapping[str, Command] = commands def add_command(self, cmd: Command, name: t.Optional[str] = None) -> None: """Registers another :class:`Command` with this group. If the name @@ -1838,10 +1871,7 @@ class Group(MultiCommand): """ from .decorators import command - if self.command_class and kwargs.get("cls") is None: - kwargs["cls"] = self.command_class - - func: t.Optional[t.Callable] = None + func: t.Optional[t.Callable[..., t.Any]] = None if args and callable(args[0]): assert ( @@ -1850,6 +1880,9 @@ class Group(MultiCommand): (func,) = args args = () + if self.command_class and kwargs.get("cls") is None: + kwargs["cls"] = self.command_class + def decorator(f: t.Callable[..., t.Any]) -> Command: cmd: Command = command(*args, **kwargs)(f) self.add_command(cmd) @@ -1889,7 +1922,7 @@ class Group(MultiCommand): """ from .decorators import group - func: t.Optional[t.Callable] = None + func: t.Optional[t.Callable[..., t.Any]] = None if args and callable(args[0]): assert ( @@ -1926,6 +1959,9 @@ class CommandCollection(MultiCommand): commands together into one. This is a straightforward implementation that accepts a list of different multi commands as sources and provides all the commands for each of them. + + See :class:`MultiCommand` and :class:`Command` for the description of + ``name`` and ``attrs``. """ def __init__( @@ -1985,7 +2021,7 @@ class Parameter: argument. This is a list of flags or argument names. :param type: the type that should be used. Either a :class:`ParamType` - or a Python type. The later is converted into the former + or a Python type. The latter is converted into the former automatically if supported. :param required: controls if this is optional or not. :param default: the default value if omitted. This can also be a callable, @@ -2069,10 +2105,13 @@ class Parameter: ] ] = None, ) -> None: + self.name: t.Optional[str] + self.opts: t.List[str] + self.secondary_opts: t.List[str] self.name, self.opts, self.secondary_opts = self._parse_decls( param_decls or (), expose_value ) - self.type = types.convert_type(type, default) + self.type: types.ParamType = types.convert_type(type, default) # Default nargs to what the type tells us if we have that # information available. @@ -2260,7 +2299,7 @@ class Parameter: if value is None: return () if self.multiple or self.nargs == -1 else None - def check_iter(value: t.Any) -> t.Iterator: + def check_iter(value: t.Any) -> t.Iterator[t.Any]: try: return _check_iter(value) except TypeError: @@ -2272,17 +2311,18 @@ class Parameter: ) from None if self.nargs == 1 or self.type.is_composite: - convert: t.Callable[[t.Any], t.Any] = partial( - self.type, param=self, ctx=ctx - ) + + def convert(value: t.Any) -> t.Any: + return self.type(value, param=self, ctx=ctx) + elif self.nargs == -1: - def convert(value: t.Any) -> t.Tuple: + def convert(value: t.Any) -> t.Any: # t.Tuple[t.Any, ...] return tuple(self.type(x, self, ctx) for x in check_iter(value)) else: # nargs > 1 - def convert(value: t.Any) -> t.Tuple: + def convert(value: t.Any) -> t.Any: # t.Tuple[t.Any, ...] value = tuple(check_iter(value)) if len(value) != self.nargs: @@ -2449,6 +2489,7 @@ class Option(Parameter): context. :param help: the help string. :param hidden: hide this option from help outputs. + :param attrs: Other command arguments described in :class:`Parameter`. .. versionchanged:: 8.1.0 Help text indentation is cleaned here instead of only in the @@ -2529,19 +2570,25 @@ class Option(Parameter): # flag if flag_value is set. self._flag_needs_value = flag_value is not None + self.default: t.Union[t.Any, t.Callable[[], t.Any]] + if is_flag and default_is_missing and not self.required: - self.default: t.Union[t.Any, t.Callable[[], t.Any]] = False + if multiple: + self.default = () + else: + self.default = False if flag_value is None: flag_value = not self.default + self.type: types.ParamType if is_flag and type is None: # Re-guess the type from the flag value instead of the # default. self.type = types.convert_type(None, flag_value) self.is_flag: bool = is_flag - self.is_bool_flag = is_flag and isinstance(self.type, types.BoolParamType) + self.is_bool_flag: bool = is_flag and isinstance(self.type, types.BoolParamType) self.flag_value: t.Any = flag_value # Counting @@ -2580,9 +2627,6 @@ class Option(Parameter): if self.is_flag: raise TypeError("'count' is not valid with 'is_flag'.") - if self.multiple and self.is_flag: - raise TypeError("'multiple' is not valid with 'is_flag', use 'count'.") - def to_info_dict(self) -> t.Dict[str, t.Any]: info_dict = super().to_info_dict() info_dict.update( @@ -2817,7 +2861,7 @@ class Option(Parameter): if self.is_flag and not self.is_bool_flag: for param in ctx.command.params: if param.name == self.name and param.default: - return param.flag_value # type: ignore + return t.cast(Option, param).flag_value return None @@ -2927,7 +2971,7 @@ class Argument(Parameter): provide fewer features than options but can have infinite ``nargs`` and are required by default. - All parameters are passed onwards to the parameter constructor. + All parameters are passed onwards to the constructor of :class:`Parameter`. """ param_type_name = "argument" diff --git a/libs/click/decorators.py b/libs/click/decorators.py index 28618dc52..d9bba9502 100644 --- a/libs/click/decorators.py +++ b/libs/click/decorators.py @@ -13,36 +13,43 @@ from .core import Parameter from .globals import get_current_context from .utils import echo -F = t.TypeVar("F", bound=t.Callable[..., t.Any]) -FC = t.TypeVar("FC", bound=t.Union[t.Callable[..., t.Any], Command]) +if t.TYPE_CHECKING: + import typing_extensions as te + P = te.ParamSpec("P") -def pass_context(f: F) -> F: +R = t.TypeVar("R") +T = t.TypeVar("T") +_AnyCallable = t.Callable[..., t.Any] +FC = t.TypeVar("FC", bound=t.Union[_AnyCallable, Command]) + + +def pass_context(f: "t.Callable[te.Concatenate[Context, P], R]") -> "t.Callable[P, R]": """Marks a callback as wanting to receive the current context object as first argument. """ - def new_func(*args, **kwargs): # type: ignore + def new_func(*args: "P.args", **kwargs: "P.kwargs") -> "R": return f(get_current_context(), *args, **kwargs) - return update_wrapper(t.cast(F, new_func), f) + return update_wrapper(new_func, f) -def pass_obj(f: F) -> F: +def pass_obj(f: "t.Callable[te.Concatenate[t.Any, P], R]") -> "t.Callable[P, R]": """Similar to :func:`pass_context`, but only pass the object on the context onwards (:attr:`Context.obj`). This is useful if that object represents the state of a nested system. """ - def new_func(*args, **kwargs): # type: ignore + def new_func(*args: "P.args", **kwargs: "P.kwargs") -> "R": return f(get_current_context().obj, *args, **kwargs) - return update_wrapper(t.cast(F, new_func), f) + return update_wrapper(new_func, f) def make_pass_decorator( - object_type: t.Type, ensure: bool = False -) -> "t.Callable[[F], F]": + object_type: t.Type[T], ensure: bool = False +) -> t.Callable[["t.Callable[te.Concatenate[T, P], R]"], "t.Callable[P, R]"]: """Given an object type this creates a decorator that will work similar to :func:`pass_obj` but instead of passing the object of the current context, it will find the innermost context of type @@ -65,10 +72,11 @@ def make_pass_decorator( remembered on the context if it's not there yet. """ - def decorator(f: F) -> F: - def new_func(*args, **kwargs): # type: ignore + def decorator(f: "t.Callable[te.Concatenate[T, P], R]") -> "t.Callable[P, R]": + def new_func(*args: "P.args", **kwargs: "P.kwargs") -> "R": ctx = get_current_context() + obj: t.Optional[T] if ensure: obj = ctx.ensure_object(object_type) else: @@ -83,14 +91,14 @@ def make_pass_decorator( return ctx.invoke(f, obj, *args, **kwargs) - return update_wrapper(t.cast(F, new_func), f) + return update_wrapper(new_func, f) - return decorator + return decorator # type: ignore[return-value] def pass_meta_key( key: str, *, doc_description: t.Optional[str] = None -) -> "t.Callable[[F], F]": +) -> "t.Callable[[t.Callable[te.Concatenate[t.Any, P], R]], t.Callable[P, R]]": """Create a decorator that passes a key from :attr:`click.Context.meta` as the first argument to the decorated function. @@ -103,13 +111,13 @@ def pass_meta_key( .. versionadded:: 8.0 """ - def decorator(f: F) -> F: - def new_func(*args, **kwargs): # type: ignore + def decorator(f: "t.Callable[te.Concatenate[t.Any, P], R]") -> "t.Callable[P, R]": + def new_func(*args: "P.args", **kwargs: "P.kwargs") -> R: ctx = get_current_context() obj = ctx.meta[key] return ctx.invoke(f, obj, *args, **kwargs) - return update_wrapper(t.cast(F, new_func), f) + return update_wrapper(new_func, f) if doc_description is None: doc_description = f"the {key!r} key from :attr:`click.Context.meta`" @@ -118,41 +126,53 @@ def pass_meta_key( f"Decorator that passes {doc_description} as the first argument" " to the decorated function." ) - return decorator + return decorator # type: ignore[return-value] CmdType = t.TypeVar("CmdType", bound=Command) +# variant: no call, directly as decorator for a function. @t.overload -def command( - __func: t.Callable[..., t.Any], -) -> Command: +def command(name: _AnyCallable) -> Command: ... +# variant: with positional name and with positional or keyword cls argument: +# @command(namearg, CommandCls, ...) or @command(namearg, cls=CommandCls, ...) @t.overload def command( - name: t.Optional[str] = None, + name: t.Optional[str], + cls: t.Type[CmdType], **attrs: t.Any, -) -> t.Callable[..., Command]: +) -> t.Callable[[_AnyCallable], CmdType]: ... +# variant: name omitted, cls _must_ be a keyword argument, @command(cls=CommandCls, ...) @t.overload def command( - name: t.Optional[str] = None, - cls: t.Type[CmdType] = ..., + name: None = None, + *, + cls: t.Type[CmdType], **attrs: t.Any, -) -> t.Callable[..., CmdType]: +) -> t.Callable[[_AnyCallable], CmdType]: + ... + + +# variant: with optional string name, no cls argument provided. +def command( + name: t.Optional[str] = ..., cls: None = None, **attrs: t.Any +) -> t.Callable[[_AnyCallable], Command]: ... def command( - name: t.Union[str, t.Callable[..., t.Any], None] = None, - cls: t.Optional[t.Type[Command]] = None, + name: t.Union[t.Optional[str], _AnyCallable] = None, + cls: t.Optional[t.Type[CmdType]] = None, **attrs: t.Any, -) -> t.Union[Command, t.Callable[..., Command]]: +) -> t.Union[Command, t.Callable[[_AnyCallable], t.Union[Command, CmdType]]]: r"""Creates a new :class:`Command` and uses the decorated function as callback. This will also automatically attach all decorated :func:`option`\s and :func:`argument`\s as parameters to the command. @@ -182,7 +202,7 @@ def command( appended to the end of the list. """ - func: t.Optional[t.Callable[..., t.Any]] = None + func: t.Optional[t.Callable[[_AnyCallable], t.Any]] = None if callable(name): func = name @@ -191,9 +211,9 @@ def command( assert not attrs, "Use 'command(**kwargs)(callable)' to provide arguments." if cls is None: - cls = Command + cls = t.cast(t.Type[CmdType], Command) - def decorator(f: t.Callable[..., t.Any]) -> Command: + def decorator(f: _AnyCallable) -> CmdType: if isinstance(f, Command): raise TypeError("Attempted to convert a callback into a command twice.") @@ -211,8 +231,12 @@ def command( if attrs.get("help") is None: attrs["help"] = f.__doc__ - cmd = cls( # type: ignore[misc] - name=name or f.__name__.lower().replace("_", "-"), # type: ignore[arg-type] + if t.TYPE_CHECKING: + assert cls is not None + assert not callable(name) + + cmd = cls( + name=name or f.__name__.lower().replace("_", "-"), callback=f, params=params, **attrs, @@ -226,24 +250,50 @@ def command( return decorator +GrpType = t.TypeVar("GrpType", bound=Group) + + +# variant: no call, directly as decorator for a function. +def group(name: _AnyCallable) -> Group: + ... + + +# variant: with positional name and with positional or keyword cls argument: +# @group(namearg, GroupCls, ...) or @group(namearg, cls=GroupCls, ...) @t.overload def group( - __func: t.Callable[..., t.Any], -) -> Group: + name: t.Optional[str], + cls: t.Type[GrpType], + **attrs: t.Any, +) -> t.Callable[[_AnyCallable], GrpType]: ... +# variant: name omitted, cls _must_ be a keyword argument, @group(cmd=GroupCls, ...) @t.overload def group( - name: t.Optional[str] = None, + name: None = None, + *, + cls: t.Type[GrpType], **attrs: t.Any, -) -> t.Callable[[F], Group]: +) -> t.Callable[[_AnyCallable], GrpType]: + ... + + +# variant: with optional string name, no cls argument provided. +def group( + name: t.Optional[str] = ..., cls: None = None, **attrs: t.Any +) -> t.Callable[[_AnyCallable], Group]: ... def group( - name: t.Union[str, t.Callable[..., t.Any], None] = None, **attrs: t.Any -) -> t.Union[Group, t.Callable[[F], Group]]: + name: t.Union[str, _AnyCallable, None] = None, + cls: t.Optional[t.Type[GrpType]] = None, + **attrs: t.Any, +) -> t.Union[Group, t.Callable[[_AnyCallable], t.Union[Group, GrpType]]]: """Creates a new :class:`Group` with a function as callback. This works otherwise the same as :func:`command` just that the `cls` parameter is set to :class:`Group`. @@ -251,17 +301,16 @@ def group( .. versionchanged:: 8.1 This decorator can be applied without parentheses. """ - if attrs.get("cls") is None: - attrs["cls"] = Group + if cls is None: + cls = t.cast(t.Type[GrpType], Group) if callable(name): - grp: t.Callable[[F], Group] = t.cast(Group, command(**attrs)) - return grp(name) + return command(cls=cls, **attrs)(name) - return t.cast(Group, command(name, **attrs)) + return command(name, cls, **attrs) -def _param_memo(f: FC, param: Parameter) -> None: +def _param_memo(f: t.Callable[..., t.Any], param: Parameter) -> None: if isinstance(f, Command): f.params.append(param) else: @@ -271,41 +320,57 @@ def _param_memo(f: FC, param: Parameter) -> None: f.__click_params__.append(param) # type: ignore -def argument(*param_decls: str, **attrs: t.Any) -> t.Callable[[FC], FC]: +def argument( + *param_decls: str, cls: t.Optional[t.Type[Argument]] = None, **attrs: t.Any +) -> t.Callable[[FC], FC]: """Attaches an argument to the command. All positional arguments are passed as parameter declarations to :class:`Argument`; all keyword arguments are forwarded unchanged (except ``cls``). This is equivalent to creating an :class:`Argument` instance manually and attaching it to the :attr:`Command.params` list. + For the default argument class, refer to :class:`Argument` and + :class:`Parameter` for descriptions of parameters. + :param cls: the argument class to instantiate. This defaults to :class:`Argument`. + :param param_decls: Passed as positional arguments to the constructor of + ``cls``. + :param attrs: Passed as keyword arguments to the constructor of ``cls``. """ + if cls is None: + cls = Argument def decorator(f: FC) -> FC: - ArgumentClass = attrs.pop("cls", None) or Argument - _param_memo(f, ArgumentClass(param_decls, **attrs)) + _param_memo(f, cls(param_decls, **attrs)) return f return decorator -def option(*param_decls: str, **attrs: t.Any) -> t.Callable[[FC], FC]: +def option( + *param_decls: str, cls: t.Optional[t.Type[Option]] = None, **attrs: t.Any +) -> t.Callable[[FC], FC]: """Attaches an option to the command. All positional arguments are passed as parameter declarations to :class:`Option`; all keyword arguments are forwarded unchanged (except ``cls``). This is equivalent to creating an :class:`Option` instance manually and attaching it to the :attr:`Command.params` list. + For the default option class, refer to :class:`Option` and + :class:`Parameter` for descriptions of parameters. + :param cls: the option class to instantiate. This defaults to :class:`Option`. + :param param_decls: Passed as positional arguments to the constructor of + ``cls``. + :param attrs: Passed as keyword arguments to the constructor of ``cls``. """ + if cls is None: + cls = Option def decorator(f: FC) -> FC: - # Issue 926, copy attrs, so pre-defined options can re-use the same cls= - option_attrs = attrs.copy() - OptionClass = option_attrs.pop("cls", None) or Option - _param_memo(f, OptionClass(param_decls, **option_attrs)) + _param_memo(f, cls(param_decls, **attrs)) return f return decorator @@ -449,8 +514,7 @@ def version_option( ) echo( - t.cast(str, message) - % {"prog": prog_name, "package": package_name, "version": version}, + message % {"prog": prog_name, "package": package_name, "version": version}, color=ctx.color, ) ctx.exit() diff --git a/libs/click/exceptions.py b/libs/click/exceptions.py index 9e20b3eb5..fe68a3613 100644 --- a/libs/click/exceptions.py +++ b/libs/click/exceptions.py @@ -1,12 +1,13 @@ -import os import typing as t from gettext import gettext as _ from gettext import ngettext from ._compat import get_text_stderr from .utils import echo +from .utils import format_filename if t.TYPE_CHECKING: + from .core import Command from .core import Context from .core import Parameter @@ -36,7 +37,7 @@ class ClickException(Exception): def __str__(self) -> str: return self.message - def show(self, file: t.Optional[t.IO] = None) -> None: + def show(self, file: t.Optional[t.IO[t.Any]] = None) -> None: if file is None: file = get_text_stderr() @@ -57,9 +58,9 @@ class UsageError(ClickException): def __init__(self, message: str, ctx: t.Optional["Context"] = None) -> None: super().__init__(message) self.ctx = ctx - self.cmd = self.ctx.command if self.ctx else None + self.cmd: t.Optional["Command"] = self.ctx.command if self.ctx else None - def show(self, file: t.Optional[t.IO] = None) -> None: + def show(self, file: t.Optional[t.IO[t.Any]] = None) -> None: if file is None: file = get_text_stderr() color = None @@ -261,7 +262,7 @@ class FileError(ClickException): hint = _("unknown error") super().__init__(hint) - self.ui_filename = os.fsdecode(filename) + self.ui_filename: str = format_filename(filename) self.filename = filename def format_message(self) -> str: @@ -284,4 +285,4 @@ class Exit(RuntimeError): __slots__ = ("exit_code",) def __init__(self, code: int = 0) -> None: - self.exit_code = code + self.exit_code: int = code diff --git a/libs/click/parser.py b/libs/click/parser.py index 2d5a2ed7b..5fa7adfac 100644 --- a/libs/click/parser.py +++ b/libs/click/parser.py @@ -168,7 +168,7 @@ class Option: ): self._short_opts = [] self._long_opts = [] - self.prefixes = set() + self.prefixes: t.Set[str] = set() for opt in opts: prefix, value = split_opt(opt) @@ -194,7 +194,7 @@ class Option: def takes_value(self) -> bool: return self.action in ("store", "append") - def process(self, value: str, state: "ParsingState") -> None: + def process(self, value: t.Any, state: "ParsingState") -> None: if self.action == "store": state.opts[self.dest] = value # type: ignore elif self.action == "store_const": @@ -272,12 +272,12 @@ class OptionParser: #: If this is set to `False`, the parser will stop on the first #: non-option. Click uses this to implement nested subcommands #: safely. - self.allow_interspersed_args = True + self.allow_interspersed_args: bool = True #: This tells the parser how to deal with unknown options. By #: default it will error out (which is sensible), but there is a #: second mode where it will ignore it and continue processing #: after shifting all the unknown options into the resulting args. - self.ignore_unknown_options = False + self.ignore_unknown_options: bool = False if ctx is not None: self.allow_interspersed_args = ctx.allow_interspersed_args @@ -451,7 +451,7 @@ class OptionParser: if stop: break - # If we got any unknown options we re-combinate the string of the + # If we got any unknown options we recombine the string of the # remaining options and re-attach the prefix, then report that # to the state as new larg. This way there is basic combinatorics # that can be achieved while still ignoring unknown arguments. diff --git a/libs/click/shell_completion.py b/libs/click/shell_completion.py index c17a8e643..dc9e00b9b 100644 --- a/libs/click/shell_completion.py +++ b/libs/click/shell_completion.py @@ -16,7 +16,7 @@ from .utils import echo def shell_complete( cli: BaseCommand, - ctx_args: t.Dict[str, t.Any], + ctx_args: t.MutableMapping[str, t.Any], prog_name: str, complete_var: str, instruction: str, @@ -80,9 +80,9 @@ class CompletionItem: help: t.Optional[str] = None, **kwargs: t.Any, ) -> None: - self.value = value - self.type = type - self.help = help + self.value: t.Any = value + self.type: str = type + self.help: t.Optional[str] = help self._info = kwargs def __getattr__(self, name: str) -> t.Any: @@ -157,17 +157,19 @@ _SOURCE_ZSH = """\ fi } -compdef %(complete_func)s %(prog_name)s; +if [[ $zsh_eval_context[-1] == loadautofunc ]]; then + # autoload from fpath, call function directly + %(complete_func)s "$@" +else + # eval/source/. command, register function for later + compdef %(complete_func)s %(prog_name)s +fi """ _SOURCE_FISH = """\ function %(complete_func)s; - set -l response; - - for value in (env %(complete_var)s=fish_complete COMP_WORDS=(commandline -cp) \ + set -l response (env %(complete_var)s=fish_complete COMP_WORDS=(commandline -cp) \ COMP_CWORD=(commandline -t) %(prog_name)s); - set response $response $value; - end; for completion in $response; set -l metadata (string split "," $completion); @@ -214,7 +216,7 @@ class ShellComplete: def __init__( self, cli: BaseCommand, - ctx_args: t.Dict[str, t.Any], + ctx_args: t.MutableMapping[str, t.Any], prog_name: str, complete_var: str, ) -> None: @@ -228,7 +230,7 @@ class ShellComplete: """The name of the shell function defined by the completion script. """ - safe_name = re.sub(r"\W*", "", self.prog_name.replace("-", "_"), re.ASCII) + safe_name = re.sub(r"\W*", "", self.prog_name.replace("-", "_"), flags=re.ASCII) return f"_{safe_name}_completion" def source_vars(self) -> t.Dict[str, t.Any]: @@ -299,11 +301,12 @@ class BashComplete(ShellComplete): name = "bash" source_template = _SOURCE_BASH - def _check_version(self) -> None: + @staticmethod + def _check_version() -> None: import subprocess output = subprocess.run( - ["bash", "-c", "echo ${BASH_VERSION}"], stdout=subprocess.PIPE + ["bash", "-c", 'echo "${BASH_VERSION}"'], stdout=subprocess.PIPE ) match = re.search(r"^(\d+)\.(\d+)\.\d+", output.stdout.decode()) @@ -311,15 +314,17 @@ class BashComplete(ShellComplete): major, minor = match.groups() if major < "4" or major == "4" and minor < "4": - raise RuntimeError( + echo( _( "Shell completion is not supported for Bash" " versions older than 4.4." - ) + ), + err=True, ) else: - raise RuntimeError( - _("Couldn't detect Bash version, shell completion is not supported.") + echo( + _("Couldn't detect Bash version, shell completion is not supported."), + err=True, ) def source(self) -> str: @@ -389,6 +394,9 @@ class FishComplete(ShellComplete): return f"{item.type},{item.value}" +ShellCompleteType = t.TypeVar("ShellCompleteType", bound=t.Type[ShellComplete]) + + _available_shells: t.Dict[str, t.Type[ShellComplete]] = { "bash": BashComplete, "fish": FishComplete, @@ -397,8 +405,8 @@ _available_shells: t.Dict[str, t.Type[ShellComplete]] = { def add_completion_class( - cls: t.Type[ShellComplete], name: t.Optional[str] = None -) -> None: + cls: ShellCompleteType, name: t.Optional[str] = None +) -> ShellCompleteType: """Register a :class:`ShellComplete` subclass under the given name. The name will be provided by the completion instruction environment variable during completion. @@ -413,6 +421,8 @@ def add_completion_class( _available_shells[name] = cls + return cls + def get_completion_class(shell: str) -> t.Optional[t.Type[ShellComplete]]: """Look up a registered :class:`ShellComplete` subclass by the name @@ -436,7 +446,8 @@ def _is_incomplete_argument(ctx: Context, param: Parameter) -> bool: return False assert param.name is not None - value = ctx.params[param.name] + # Will be None if expose_value is False. + value = ctx.params.get(param.name) return ( param.nargs == -1 or ctx.get_parameter_source(param.name) is not ParameterSource.COMMANDLINE @@ -482,7 +493,10 @@ def _is_incomplete_option(ctx: Context, args: t.List[str], param: Parameter) -> def _resolve_context( - cli: BaseCommand, ctx_args: t.Dict[str, t.Any], prog_name: str, args: t.List[str] + cli: BaseCommand, + ctx_args: t.MutableMapping[str, t.Any], + prog_name: str, + args: t.List[str], ) -> Context: """Produce the context hierarchy starting with the command and traversing the complete arguments. This only follows the commands, @@ -509,6 +523,8 @@ def _resolve_context( ctx = cmd.make_context(name, args, parent=ctx, resilient_parsing=True) args = ctx.protected_args + ctx.args else: + sub_ctx = ctx + while args: name, cmd, args = command.resolve_command(ctx, args) diff --git a/libs/click/termui.py b/libs/click/termui.py index bfb2f5ae6..db7a4b286 100644 --- a/libs/click/termui.py +++ b/libs/click/termui.py @@ -1,14 +1,12 @@ import inspect import io import itertools -import os import sys import typing as t from gettext import gettext as _ from ._compat import isatty from ._compat import strip_ansi -from ._compat import WIN from .exceptions import Abort from .exceptions import UsageError from .globals import resolve_color_default @@ -73,7 +71,7 @@ def _build_prompt( def _format_default(default: t.Any) -> t.Any: if isinstance(default, (io.IOBase, LazyFile)) and hasattr(default, "name"): - return default.name # type: ignore + return default.name return default @@ -443,10 +441,9 @@ def clear() -> None: """ if not isatty(sys.stdout): return - if WIN: - os.system("cls") - else: - sys.stdout.write("\033[2J\033[1;1H") + + # ANSI escape \033[2J clears the screen, \033[1;1H moves the cursor + echo("\033[2J\033[1;1H", nl=False) def _interpret_color( diff --git a/libs/click/testing.py b/libs/click/testing.py index e395c2edf..e0df0d2a6 100644 --- a/libs/click/testing.py +++ b/libs/click/testing.py @@ -79,11 +79,11 @@ class _NamedTextIOWrapper(io.TextIOWrapper): def make_input_stream( - input: t.Optional[t.Union[str, bytes, t.IO]], charset: str + input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]], charset: str ) -> t.BinaryIO: # Is already an input stream. if hasattr(input, "read"): - rv = _find_binary_reader(t.cast(t.IO, input)) + rv = _find_binary_reader(t.cast(t.IO[t.Any], input)) if rv is not None: return rv @@ -95,7 +95,7 @@ def make_input_stream( elif isinstance(input, str): input = input.encode(charset) - return io.BytesIO(t.cast(bytes, input)) + return io.BytesIO(input) class Result: @@ -183,7 +183,7 @@ class CliRunner: mix_stderr: bool = True, ) -> None: self.charset = charset - self.env = env or {} + self.env: t.Mapping[str, t.Optional[str]] = env or {} self.echo_stdin = echo_stdin self.mix_stderr = mix_stderr @@ -206,7 +206,7 @@ class CliRunner: @contextlib.contextmanager def isolation( self, - input: t.Optional[t.Union[str, bytes, t.IO]] = None, + input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None, env: t.Optional[t.Mapping[str, t.Optional[str]]] = None, color: bool = False, ) -> t.Iterator[t.Tuple[io.BytesIO, t.Optional[io.BytesIO]]]: @@ -301,7 +301,7 @@ class CliRunner: default_color = color def should_strip_ansi( - stream: t.Optional[t.IO] = None, color: t.Optional[bool] = None + stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None ) -> bool: if color is None: return not default_color @@ -350,7 +350,7 @@ class CliRunner: self, cli: "BaseCommand", args: t.Optional[t.Union[str, t.Sequence[str]]] = None, - input: t.Optional[t.Union[str, bytes, t.IO]] = None, + input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None, env: t.Optional[t.Mapping[str, t.Optional[str]]] = None, catch_exceptions: bool = True, color: bool = False, @@ -449,7 +449,7 @@ class CliRunner: @contextlib.contextmanager def isolated_filesystem( - self, temp_dir: t.Optional[t.Union[str, os.PathLike]] = None + self, temp_dir: t.Optional[t.Union[str, "os.PathLike[str]"]] = None ) -> t.Iterator[str]: """A context manager that creates a temporary directory and changes the current working directory to it. This isolates tests @@ -464,11 +464,11 @@ class CliRunner: Added the ``temp_dir`` parameter. """ cwd = os.getcwd() - dt = tempfile.mkdtemp(dir=temp_dir) # type: ignore[type-var] + dt = tempfile.mkdtemp(dir=temp_dir) os.chdir(dt) try: - yield t.cast(str, dt) + yield dt finally: os.chdir(cwd) diff --git a/libs/click/types.py b/libs/click/types.py index b45ee53d0..2b1d1797f 100644 --- a/libs/click/types.py +++ b/libs/click/types.py @@ -1,14 +1,15 @@ import os import stat +import sys import typing as t from datetime import datetime from gettext import gettext as _ from gettext import ngettext from ._compat import _get_argv_encoding -from ._compat import get_filesystem_encoding from ._compat import open_stream from .exceptions import BadParameter +from .utils import format_filename from .utils import LazyFile from .utils import safecall @@ -162,7 +163,7 @@ class CompositeParamType(ParamType): class FuncParamType(ParamType): def __init__(self, func: t.Callable[[t.Any], t.Any]) -> None: - self.name = func.__name__ + self.name: str = func.__name__ self.func = func def to_info_dict(self) -> t.Dict[str, t.Any]: @@ -207,7 +208,7 @@ class StringParamType(ParamType): try: value = value.decode(enc) except UnicodeError: - fs_enc = get_filesystem_encoding() + fs_enc = sys.getfilesystemencoding() if fs_enc != enc: try: value = value.decode(fs_enc) @@ -353,7 +354,11 @@ class DateTime(ParamType): name = "datetime" def __init__(self, formats: t.Optional[t.Sequence[str]] = None): - self.formats = formats or ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"] + self.formats: t.Sequence[str] = formats or [ + "%Y-%m-%d", + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%d %H:%M:%S", + ] def to_info_dict(self) -> t.Dict[str, t.Any]: info_dict = super().to_info_dict() @@ -397,7 +402,7 @@ class DateTime(ParamType): class _NumberParamTypeBase(ParamType): - _number_class: t.ClassVar[t.Type] + _number_class: t.ClassVar[t.Type[t.Any]] def convert( self, value: t.Any, param: t.Optional["Parameter"], ctx: t.Optional["Context"] @@ -662,7 +667,7 @@ class File(ParamType): """ name = "filename" - envvar_list_splitter = os.path.pathsep + envvar_list_splitter: t.ClassVar[str] = os.path.pathsep def __init__( self, @@ -683,36 +688,38 @@ class File(ParamType): info_dict.update(mode=self.mode, encoding=self.encoding) return info_dict - def resolve_lazy_flag(self, value: t.Any) -> bool: + def resolve_lazy_flag(self, value: "t.Union[str, os.PathLike[str]]") -> bool: if self.lazy is not None: return self.lazy - if value == "-": + if os.fspath(value) == "-": return False elif "w" in self.mode: return True return False def convert( - self, value: t.Any, param: t.Optional["Parameter"], ctx: t.Optional["Context"] - ) -> t.Any: - try: - if hasattr(value, "read") or hasattr(value, "write"): - return value + self, + value: t.Union[str, "os.PathLike[str]", t.IO[t.Any]], + param: t.Optional["Parameter"], + ctx: t.Optional["Context"], + ) -> t.IO[t.Any]: + if _is_file_like(value): + return value + + value = t.cast("t.Union[str, os.PathLike[str]]", value) + try: lazy = self.resolve_lazy_flag(value) if lazy: - f: t.IO = t.cast( - t.IO, - LazyFile( - value, self.mode, self.encoding, self.errors, atomic=self.atomic - ), + lf = LazyFile( + value, self.mode, self.encoding, self.errors, atomic=self.atomic ) if ctx is not None: - ctx.call_on_close(f.close_intelligently) # type: ignore + ctx.call_on_close(lf.close_intelligently) - return f + return t.cast(t.IO[t.Any], lf) f, should_close = open_stream( value, self.mode, self.encoding, self.errors, atomic=self.atomic @@ -731,7 +738,7 @@ class File(ParamType): return f except OSError as e: # noqa: B014 - self.fail(f"'{os.fsdecode(value)}': {e.strerror}", param, ctx) + self.fail(f"'{format_filename(value)}': {e.strerror}", param, ctx) def shell_complete( self, ctx: "Context", param: "Parameter", incomplete: str @@ -750,6 +757,10 @@ class File(ParamType): return [CompletionItem(incomplete, type="file")] +def _is_file_like(value: t.Any) -> "te.TypeGuard[t.IO[t.Any]]": + return hasattr(value, "read") or hasattr(value, "write") + + class Path(ParamType): """The ``Path`` type is similar to the :class:`File` type, but returns the filename instead of an open file. Various checks can be @@ -777,13 +788,13 @@ class Path(ParamType): Added the ``executable`` parameter. .. versionchanged:: 8.0 - Allow passing ``type=pathlib.Path``. + Allow passing ``path_type=pathlib.Path``. .. versionchanged:: 6.0 Added the ``allow_dash`` parameter. """ - envvar_list_splitter = os.path.pathsep + envvar_list_splitter: t.ClassVar[str] = os.path.pathsep def __init__( self, @@ -794,7 +805,7 @@ class Path(ParamType): readable: bool = True, resolve_path: bool = False, allow_dash: bool = False, - path_type: t.Optional[t.Type] = None, + path_type: t.Optional[t.Type[t.Any]] = None, executable: bool = False, ): self.exists = exists @@ -808,7 +819,7 @@ class Path(ParamType): self.type = path_type if self.file_okay and not self.dir_okay: - self.name = _("file") + self.name: str = _("file") elif self.dir_okay and not self.file_okay: self.name = _("directory") else: @@ -826,20 +837,25 @@ class Path(ParamType): ) return info_dict - def coerce_path_result(self, rv: t.Any) -> t.Any: - if self.type is not None and not isinstance(rv, self.type): + def coerce_path_result( + self, value: "t.Union[str, os.PathLike[str]]" + ) -> "t.Union[str, bytes, os.PathLike[str]]": + if self.type is not None and not isinstance(value, self.type): if self.type is str: - rv = os.fsdecode(rv) + return os.fsdecode(value) elif self.type is bytes: - rv = os.fsencode(rv) + return os.fsencode(value) else: - rv = self.type(rv) + return t.cast("os.PathLike[str]", self.type(value)) - return rv + return value def convert( - self, value: t.Any, param: t.Optional["Parameter"], ctx: t.Optional["Context"] - ) -> t.Any: + self, + value: "t.Union[str, os.PathLike[str]]", + param: t.Optional["Parameter"], + ctx: t.Optional["Context"], + ) -> "t.Union[str, bytes, os.PathLike[str]]": rv = value is_dash = self.file_okay and self.allow_dash and rv in (b"-", "-") @@ -859,7 +875,7 @@ class Path(ParamType): return self.coerce_path_result(rv) self.fail( _("{name} {filename!r} does not exist.").format( - name=self.name.title(), filename=os.fsdecode(value) + name=self.name.title(), filename=format_filename(value) ), param, ctx, @@ -868,7 +884,7 @@ class Path(ParamType): if not self.file_okay and stat.S_ISREG(st.st_mode): self.fail( _("{name} {filename!r} is a file.").format( - name=self.name.title(), filename=os.fsdecode(value) + name=self.name.title(), filename=format_filename(value) ), param, ctx, @@ -876,7 +892,7 @@ class Path(ParamType): if not self.dir_okay and stat.S_ISDIR(st.st_mode): self.fail( _("{name} '{filename}' is a directory.").format( - name=self.name.title(), filename=os.fsdecode(value) + name=self.name.title(), filename=format_filename(value) ), param, ctx, @@ -885,7 +901,7 @@ class Path(ParamType): if self.readable and not os.access(rv, os.R_OK): self.fail( _("{name} {filename!r} is not readable.").format( - name=self.name.title(), filename=os.fsdecode(value) + name=self.name.title(), filename=format_filename(value) ), param, ctx, @@ -894,7 +910,7 @@ class Path(ParamType): if self.writable and not os.access(rv, os.W_OK): self.fail( _("{name} {filename!r} is not writable.").format( - name=self.name.title(), filename=os.fsdecode(value) + name=self.name.title(), filename=format_filename(value) ), param, ctx, @@ -903,7 +919,7 @@ class Path(ParamType): if self.executable and not os.access(value, os.X_OK): self.fail( _("{name} {filename!r} is not executable.").format( - name=self.name.title(), filename=os.fsdecode(value) + name=self.name.title(), filename=format_filename(value) ), param, ctx, @@ -944,8 +960,8 @@ class Tuple(CompositeParamType): :param types: a list of types that should be used for the tuple items. """ - def __init__(self, types: t.Sequence[t.Union[t.Type, ParamType]]) -> None: - self.types = [convert_type(ty) for ty in types] + def __init__(self, types: t.Sequence[t.Union[t.Type[t.Any], ParamType]]) -> None: + self.types: t.Sequence[ParamType] = [convert_type(ty) for ty in types] def to_info_dict(self) -> t.Dict[str, t.Any]: info_dict = super().to_info_dict() diff --git a/libs/click/utils.py b/libs/click/utils.py index 8283788ac..d536434f0 100644 --- a/libs/click/utils.py +++ b/libs/click/utils.py @@ -4,13 +4,13 @@ import sys import typing as t from functools import update_wrapper from types import ModuleType +from types import TracebackType from ._compat import _default_text_stderr from ._compat import _default_text_stdout from ._compat import _find_binary_writer from ._compat import auto_wrap_for_ansi from ._compat import binary_streams -from ._compat import get_filesystem_encoding from ._compat import open_stream from ._compat import should_strip_ansi from ._compat import strip_ansi @@ -21,30 +21,33 @@ from .globals import resolve_color_default if t.TYPE_CHECKING: import typing_extensions as te -F = t.TypeVar("F", bound=t.Callable[..., t.Any]) + P = te.ParamSpec("P") + +R = t.TypeVar("R") def _posixify(name: str) -> str: return "-".join(name.split()).lower() -def safecall(func: F) -> F: +def safecall(func: "t.Callable[P, R]") -> "t.Callable[P, t.Optional[R]]": """Wraps a function so that it swallows exceptions.""" - def wrapper(*args, **kwargs): # type: ignore + def wrapper(*args: "P.args", **kwargs: "P.kwargs") -> t.Optional[R]: try: return func(*args, **kwargs) except Exception: pass + return None - return update_wrapper(t.cast(F, wrapper), func) + return update_wrapper(wrapper, func) def make_str(value: t.Any) -> str: """Converts a value into a valid string.""" if isinstance(value, bytes): try: - return value.decode(get_filesystem_encoding()) + return value.decode(sys.getfilesystemencoding()) except UnicodeError: return value.decode("utf-8", "replace") return str(value) @@ -109,20 +112,21 @@ class LazyFile: def __init__( self, - filename: str, + filename: t.Union[str, "os.PathLike[str]"], mode: str = "r", encoding: t.Optional[str] = None, errors: t.Optional[str] = "strict", atomic: bool = False, ): - self.name = filename + self.name: str = os.fspath(filename) self.mode = mode self.encoding = encoding self.errors = errors self.atomic = atomic - self._f: t.Optional[t.IO] + self._f: t.Optional[t.IO[t.Any]] + self.should_close: bool - if filename == "-": + if self.name == "-": self._f, self.should_close = open_stream(filename, mode, encoding, errors) else: if "r" in mode: @@ -139,9 +143,9 @@ class LazyFile: def __repr__(self) -> str: if self._f is not None: return repr(self._f) - return f"<unopened file '{self.name}' {self.mode}>" + return f"<unopened file '{format_filename(self.name)}' {self.mode}>" - def open(self) -> t.IO: + def open(self) -> t.IO[t.Any]: """Opens the file if it's not yet open. This call might fail with a :exc:`FileError`. Not handling this error will produce an error that Click shows. @@ -174,7 +178,12 @@ class LazyFile: def __enter__(self) -> "LazyFile": return self - def __exit__(self, exc_type, exc_value, tb): # type: ignore + def __exit__( + self, + exc_type: t.Optional[t.Type[BaseException]], + exc_value: t.Optional[BaseException], + tb: t.Optional[TracebackType], + ) -> None: self.close_intelligently() def __iter__(self) -> t.Iterator[t.AnyStr]: @@ -183,8 +192,8 @@ class LazyFile: class KeepOpenFile: - def __init__(self, file: t.IO) -> None: - self._file = file + def __init__(self, file: t.IO[t.Any]) -> None: + self._file: t.IO[t.Any] = file def __getattr__(self, name: str) -> t.Any: return getattr(self._file, name) @@ -192,7 +201,12 @@ class KeepOpenFile: def __enter__(self) -> "KeepOpenFile": return self - def __exit__(self, exc_type, exc_value, tb): # type: ignore + def __exit__( + self, + exc_type: t.Optional[t.Type[BaseException]], + exc_value: t.Optional[BaseException], + tb: t.Optional[TracebackType], + ) -> None: pass def __repr__(self) -> str: @@ -253,6 +267,11 @@ def echo( else: file = _default_text_stdout() + # There are no standard streams attached to write to. For example, + # pythonw on Windows. + if file is None: + return + # Convert non bytes/text into the native string type. if message is not None and not isinstance(message, (str, bytes, bytearray)): out: t.Optional[t.Union[str, bytes]] = str(message) @@ -340,7 +359,7 @@ def open_file( errors: t.Optional[str] = "strict", lazy: bool = False, atomic: bool = False, -) -> t.IO: +) -> t.IO[t.Any]: """Open a file, with extra behavior to handle ``'-'`` to indicate a standard stream, lazy open on write, and atomic write. Similar to the behavior of the :class:`~click.File` param type. @@ -370,24 +389,39 @@ def open_file( .. versionadded:: 3.0 """ if lazy: - return t.cast(t.IO, LazyFile(filename, mode, encoding, errors, atomic=atomic)) + return t.cast( + t.IO[t.Any], LazyFile(filename, mode, encoding, errors, atomic=atomic) + ) f, should_close = open_stream(filename, mode, encoding, errors, atomic=atomic) if not should_close: - f = t.cast(t.IO, KeepOpenFile(f)) + f = t.cast(t.IO[t.Any], KeepOpenFile(f)) return f def format_filename( - filename: t.Union[str, bytes, os.PathLike], shorten: bool = False + filename: "t.Union[str, bytes, os.PathLike[str], os.PathLike[bytes]]", + shorten: bool = False, ) -> str: - """Formats a filename for user display. The main purpose of this - function is to ensure that the filename can be displayed at all. This - will decode the filename to unicode if necessary in a way that it will - not fail. Optionally, it can shorten the filename to not include the - full path to the filename. + """Format a filename as a string for display. Ensures the filename can be + displayed by replacing any invalid bytes or surrogate escapes in the name + with the replacement character ``�``. + + Invalid bytes or surrogate escapes will raise an error when written to a + stream with ``errors="strict". This will typically happen with ``stdout`` + when the locale is something like ``en_GB.UTF-8``. + + Many scenarios *are* safe to write surrogates though, due to PEP 538 and + PEP 540, including: + + - Writing to ``stderr``, which uses ``errors="backslashreplace"``. + - The system has ``LANG=C.UTF-8``, ``C``, or ``POSIX``. Python opens + stdout and stderr with ``errors="surrogateescape"``. + - None of ``LANG/LC_*`` are set. Python assumes ``LANG=C.UTF-8``. + - Python is started in UTF-8 mode with ``PYTHONUTF8=1`` or ``-X utf8``. + Python opens stdout and stderr with ``errors="surrogateescape"``. :param filename: formats a filename for UI display. This will also convert the filename into unicode without failing. @@ -396,8 +430,17 @@ def format_filename( """ if shorten: filename = os.path.basename(filename) + else: + filename = os.fspath(filename) + + if isinstance(filename, bytes): + filename = filename.decode(sys.getfilesystemencoding(), "replace") + else: + filename = filename.encode("utf-8", "surrogateescape").decode( + "utf-8", "replace" + ) - return os.fsdecode(filename) + return filename def get_app_dir(app_name: str, roaming: bool = True, force_posix: bool = False) -> str: @@ -425,7 +468,7 @@ def get_app_dir(app_name: str, roaming: bool = True, force_posix: bool = False) :param app_name: the application name. This should be properly capitalized and can contain whitespace. :param roaming: controls if the folder should be roaming or not on Windows. - Has no affect otherwise. + Has no effect otherwise. :param force_posix: if this is set to `True` then on any POSIX system the folder will be stored in the home folder with a leading dot instead of the XDG config home or darwin's @@ -458,7 +501,7 @@ class PacifyFlushWrapper: pipe, all calls and attributes are proxied. """ - def __init__(self, wrapped: t.IO) -> None: + def __init__(self, wrapped: t.IO[t.Any]) -> None: self.wrapped = wrapped def flush(self) -> None: @@ -506,7 +549,8 @@ def _detect_program_name( # The value of __package__ indicates how Python was called. It may # not exist if a setuptools script is installed as an egg. It may be # set incorrectly for entry points created with pip on Windows. - if getattr(_main, "__package__", None) is None or ( + # It is set to "" inside a Shiv or PEX zipapp. + if getattr(_main, "__package__", None) in {None, ""} or ( os.name == "nt" and _main.__package__ == "" and not os.path.exists(path) |