summaryrefslogtreecommitdiffhomepage
path: root/libs/twine/package.py
diff options
context:
space:
mode:
Diffstat (limited to 'libs/twine/package.py')
-rw-r--r--libs/twine/package.py291
1 files changed, 291 insertions, 0 deletions
diff --git a/libs/twine/package.py b/libs/twine/package.py
new file mode 100644
index 000000000..745752ff2
--- /dev/null
+++ b/libs/twine/package.py
@@ -0,0 +1,291 @@
+# Copyright 2015 Ian Cordasco
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import hashlib
+import io
+import os
+import re
+import subprocess
+from typing import Dict, NamedTuple, Optional, Sequence, Tuple, Union
+
+import importlib_metadata
+import pkginfo
+
+from twine import exceptions
+from twine import wheel
+from twine import wininst
+
+DIST_TYPES = {
+ "bdist_wheel": wheel.Wheel,
+ "bdist_wininst": wininst.WinInst,
+ "bdist_egg": pkginfo.BDist,
+ "sdist": pkginfo.SDist,
+}
+
+DIST_EXTENSIONS = {
+ ".whl": "bdist_wheel",
+ ".exe": "bdist_wininst",
+ ".egg": "bdist_egg",
+ ".tar.bz2": "sdist",
+ ".tar.gz": "sdist",
+ ".zip": "sdist",
+}
+
+MetadataValue = Union[str, Sequence[str]]
+
+
+def _safe_name(name: str) -> str:
+ """Convert an arbitrary string to a standard distribution name.
+
+ Any runs of non-alphanumeric/. characters are replaced with a single '-'.
+
+ Copied from pkg_resources.safe_name for compatibility with warehouse.
+ See https://github.com/pypa/twine/issues/743.
+ """
+ return re.sub("[^A-Za-z0-9.]+", "-", name)
+
+
+class PackageFile:
+ def __init__(
+ self,
+ filename: str,
+ comment: Optional[str],
+ metadata: pkginfo.Distribution,
+ python_version: Optional[str],
+ filetype: Optional[str],
+ ) -> None:
+ self.filename = filename
+ self.basefilename = os.path.basename(filename)
+ self.comment = comment
+ self.metadata = metadata
+ self.python_version = python_version
+ self.filetype = filetype
+ self.safe_name = _safe_name(metadata.name)
+ self.signed_filename = self.filename + ".asc"
+ self.signed_basefilename = self.basefilename + ".asc"
+ self.gpg_signature: Optional[Tuple[str, bytes]] = None
+
+ hasher = HashManager(filename)
+ hasher.hash()
+ hexdigest = hasher.hexdigest()
+
+ self.md5_digest = hexdigest.md5
+ self.sha2_digest = hexdigest.sha2
+ self.blake2_256_digest = hexdigest.blake2
+
+ @classmethod
+ def from_filename(cls, filename: str, comment: Optional[str]) -> "PackageFile":
+ # Extract the metadata from the package
+ for ext, dtype in DIST_EXTENSIONS.items():
+ if filename.endswith(ext):
+ try:
+ meta = DIST_TYPES[dtype](filename)
+ except EOFError:
+ raise exceptions.InvalidDistribution(
+ "Invalid distribution file: '%s'" % os.path.basename(filename)
+ )
+ else:
+ break
+ else:
+ raise exceptions.InvalidDistribution(
+ "Unknown distribution format: '%s'" % os.path.basename(filename)
+ )
+
+ # If pkginfo encounters a metadata version it doesn't support, it may
+ # give us back empty metadata. At the very least, we should have a name
+ # and version
+ if not (meta.name and meta.version):
+ raise exceptions.InvalidDistribution(
+ "Invalid distribution metadata. Try upgrading twine if possible."
+ )
+
+ py_version: Optional[str]
+ if dtype == "bdist_egg":
+ (dist,) = importlib_metadata.Distribution.discover( # type: ignore[no-untyped-call] # python/importlib_metadata#288 # noqa: E501
+ path=[filename]
+ )
+ py_version = dist.metadata["Version"]
+ elif dtype == "bdist_wheel":
+ py_version = meta.py_version
+ elif dtype == "bdist_wininst":
+ py_version = meta.py_version
+ else:
+ py_version = None
+
+ return cls(filename, comment, meta, py_version, dtype)
+
+ def metadata_dictionary(self) -> Dict[str, MetadataValue]:
+ meta = self.metadata
+ data = {
+ # identify release
+ "name": self.safe_name,
+ "version": meta.version,
+ # file content
+ "filetype": self.filetype,
+ "pyversion": self.python_version,
+ # additional meta-data
+ "metadata_version": meta.metadata_version,
+ "summary": meta.summary,
+ "home_page": meta.home_page,
+ "author": meta.author,
+ "author_email": meta.author_email,
+ "maintainer": meta.maintainer,
+ "maintainer_email": meta.maintainer_email,
+ "license": meta.license,
+ "description": meta.description,
+ "keywords": meta.keywords,
+ "platform": meta.platforms,
+ "classifiers": meta.classifiers,
+ "download_url": meta.download_url,
+ "supported_platform": meta.supported_platforms,
+ "comment": self.comment,
+ "md5_digest": self.md5_digest,
+ "sha256_digest": self.sha2_digest,
+ "blake2_256_digest": self.blake2_256_digest,
+ # PEP 314
+ "provides": meta.provides,
+ "requires": meta.requires,
+ "obsoletes": meta.obsoletes,
+ # Metadata 1.2
+ "project_urls": meta.project_urls,
+ "provides_dist": meta.provides_dist,
+ "obsoletes_dist": meta.obsoletes_dist,
+ "requires_dist": meta.requires_dist,
+ "requires_external": meta.requires_external,
+ "requires_python": meta.requires_python,
+ # Metadata 2.1
+ "provides_extras": meta.provides_extras,
+ "description_content_type": meta.description_content_type,
+ }
+
+ if self.gpg_signature is not None:
+ data["gpg_signature"] = self.gpg_signature
+
+ return data
+
+ def add_gpg_signature(
+ self, signature_filepath: str, signature_filename: str
+ ) -> None:
+ if self.gpg_signature is not None:
+ raise exceptions.InvalidDistribution("GPG Signature can only be added once")
+
+ with open(signature_filepath, "rb") as gpg:
+ self.gpg_signature = (signature_filename, gpg.read())
+
+ def sign(self, sign_with: str, identity: Optional[str]) -> None:
+ print(f"Signing {self.basefilename}")
+ gpg_args: Tuple[str, ...] = (sign_with, "--detach-sign")
+ if identity:
+ gpg_args += ("--local-user", identity)
+ gpg_args += ("-a", self.filename)
+ self.run_gpg(gpg_args)
+
+ self.add_gpg_signature(self.signed_filename, self.signed_basefilename)
+
+ @classmethod
+ def run_gpg(cls, gpg_args: Tuple[str, ...]) -> None:
+ try:
+ subprocess.check_call(gpg_args)
+ return
+ except FileNotFoundError:
+ if gpg_args[0] != "gpg":
+ raise exceptions.InvalidSigningExecutable(
+ "{} executable not available.".format(gpg_args[0])
+ )
+
+ print("gpg executable not available. Attempting fallback to gpg2.")
+ try:
+ subprocess.check_call(("gpg2",) + gpg_args[1:])
+ except FileNotFoundError:
+ print("gpg2 executable not available.")
+ raise exceptions.InvalidSigningExecutable(
+ "'gpg' or 'gpg2' executables not available. "
+ "Try installing one of these or specifying an executable "
+ "with the --sign-with flag."
+ )
+
+
+class Hexdigest(NamedTuple):
+ md5: Optional[str]
+ sha2: Optional[str]
+ blake2: Optional[str]
+
+
+class HashManager:
+ """Manage our hashing objects for simplicity.
+
+ This will also allow us to better test this logic.
+ """
+
+ def __init__(self, filename: str) -> None:
+ """Initialize our manager and hasher objects."""
+ self.filename = filename
+
+ self._md5_hasher = None
+ try:
+ self._md5_hasher = hashlib.md5()
+ except ValueError:
+ # FIPs mode disables MD5
+ pass
+
+ self._sha2_hasher = hashlib.sha256()
+
+ self._blake_hasher = None
+ try:
+ self._blake_hasher = hashlib.blake2b(digest_size=256 // 8)
+ except ValueError:
+ # FIPS mode disables blake2
+ pass
+
+ def _md5_update(self, content: bytes) -> None:
+ if self._md5_hasher is not None:
+ self._md5_hasher.update(content)
+
+ def _md5_hexdigest(self) -> Optional[str]:
+ if self._md5_hasher is not None:
+ return self._md5_hasher.hexdigest()
+ return None
+
+ def _sha2_update(self, content: bytes) -> None:
+ if self._sha2_hasher is not None:
+ self._sha2_hasher.update(content)
+
+ def _sha2_hexdigest(self) -> Optional[str]:
+ if self._sha2_hasher is not None:
+ return self._sha2_hasher.hexdigest()
+ return None
+
+ def _blake_update(self, content: bytes) -> None:
+ if self._blake_hasher is not None:
+ self._blake_hasher.update(content)
+
+ def _blake_hexdigest(self) -> Optional[str]:
+ if self._blake_hasher is not None:
+ return self._blake_hasher.hexdigest()
+ return None
+
+ def hash(self) -> None:
+ """Hash the file contents."""
+ with open(self.filename, "rb") as fp:
+ for content in iter(lambda: fp.read(io.DEFAULT_BUFFER_SIZE), b""):
+ self._md5_update(content)
+ self._sha2_update(content)
+ self._blake_update(content)
+
+ def hexdigest(self) -> Hexdigest:
+ """Return the hexdigest for the file."""
+ return Hexdigest(
+ self._md5_hexdigest(),
+ self._sha2_hexdigest(),
+ self._blake_hexdigest(),
+ )