1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
from __future__ import annotations
import shlex
import subprocess
import sys
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from .. import util
from ..util import compat
REVISION_SCRIPT_TOKEN = "REVISION_SCRIPT_FILENAME"
_registry: dict = {}
def register(name: str) -> Callable:
"""A function decorator that will register that function as a write hook.
See the documentation linked below for an example.
.. versionadded:: 1.2.0
.. seealso::
:ref:`post_write_hooks_custom`
"""
def decorate(fn):
_registry[name] = fn
return fn
return decorate
def _invoke(
name: str, revision: str, options: Dict[str, Union[str, int]]
) -> Any:
"""Invokes the formatter registered for the given name.
:param name: The name of a formatter in the registry
:param revision: A :class:`.MigrationRevision` instance
:param options: A dict containing kwargs passed to the
specified formatter.
:raises: :class:`alembic.util.CommandError`
"""
try:
hook = _registry[name]
except KeyError as ke:
raise util.CommandError(
"No formatter with name '%s' registered" % name
) from ke
else:
return hook(revision, options)
def _run_hooks(path: str, hook_config: Dict[str, str]) -> None:
"""Invoke hooks for a generated revision."""
from .base import _split_on_space_comma
names = _split_on_space_comma.split(hook_config.get("hooks", ""))
for name in names:
if not name:
continue
opts = {
key[len(name) + 1 :]: hook_config[key]
for key in hook_config
if key.startswith(name + ".")
}
opts["_hook_name"] = name
try:
type_ = opts["type"]
except KeyError as ke:
raise util.CommandError(
"Key %s.type is required for post write hook %r" % (name, name)
) from ke
else:
util.status(
'Running post write hook "%s"' % name,
_invoke,
type_,
path,
opts,
newline=True,
)
def _parse_cmdline_options(cmdline_options_str: str, path: str) -> List[str]:
"""Parse options from a string into a list.
Also substitutes the revision script token with the actual filename of
the revision script.
If the revision script token doesn't occur in the options string, it is
automatically prepended.
"""
if REVISION_SCRIPT_TOKEN not in cmdline_options_str:
cmdline_options_str = REVISION_SCRIPT_TOKEN + " " + cmdline_options_str
cmdline_options_list = shlex.split(
cmdline_options_str, posix=compat.is_posix
)
cmdline_options_list = [
option.replace(REVISION_SCRIPT_TOKEN, path)
for option in cmdline_options_list
]
return cmdline_options_list
@register("console_scripts")
def console_scripts(
path: str, options: dict, ignore_output: bool = False
) -> None:
try:
entrypoint_name = options["entrypoint"]
except KeyError as ke:
raise util.CommandError(
"Key %s.entrypoint is required for post write hook %r"
% (options["_hook_name"], options["_hook_name"])
) from ke
for entry in compat.importlib_metadata_get("console_scripts"):
if entry.name == entrypoint_name:
impl: Any = entry
break
else:
raise util.CommandError(
f"Could not find entrypoint console_scripts.{entrypoint_name}"
)
cwd: Optional[str] = options.get("cwd", None)
cmdline_options_str = options.get("options", "")
cmdline_options_list = _parse_cmdline_options(cmdline_options_str, path)
kw: Dict[str, Any] = {}
if ignore_output:
kw["stdout"] = kw["stderr"] = subprocess.DEVNULL
subprocess.run(
[
sys.executable,
"-c",
"import %s; %s.%s()" % (impl.module, impl.module, impl.attr),
]
+ cmdline_options_list,
cwd=cwd,
**kw,
)
|