Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/1300.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added (tech preview) support for signing Debian packages when uploading to a Repository.
52 changes: 52 additions & 0 deletions pulp_deb/app/migrations/0032_package_signing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Generated by Django 4.2.25 on 2025-10-23 21:43

from django.db import migrations, models
import django.db.models.deletion
import django_lifecycle.mixins
import pulpcore.app.models.base


class Migration(migrations.Migration):

dependencies = [
('core', '0145_domainize_import_export'),
('deb', '0001_initial_squashed_0031_add_domains'),
]

operations = [
migrations.CreateModel(
name='AptPackageSigningService',
fields=[
('signingservice_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.signingservice')),
],
options={
'abstract': False,
},
bases=('core.signingservice',),
),
migrations.AddField(
model_name='aptrepository',
name='package_signing_fingerprint',
field=models.TextField(max_length=40, null=True),
),
migrations.AddField(
model_name='aptrepository',
name='package_signing_service',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, to='deb.aptpackagesigningservice'),
),
migrations.CreateModel(
name='AptRepositoryReleasePackageSigningFingerprintOverride',
fields=[
('pulp_id', models.UUIDField(default=pulpcore.app.models.base.pulp_uuid, editable=False, primary_key=True, serialize=False)),
('pulp_created', models.DateTimeField(auto_now_add=True)),
('pulp_last_updated', models.DateTimeField(auto_now=True, null=True)),
('package_signing_fingerprint', models.TextField(max_length=40)),
('release_distribution', models.TextField()),
('repository', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='package_signing_fingerprint_release_overrides', to='deb.aptrepository')),
],
options={
'unique_together': {('repository', 'release_distribution')},
},
bases=(django_lifecycle.mixins.LifecycleModelMixin, models.Model),
),
]
8 changes: 6 additions & 2 deletions pulp_deb/app/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
SourcePackage,
)

from .signing_service import AptReleaseSigningService
from .signing_service import AptReleaseSigningService, AptPackageSigningService

from .content.metadata import (
Release,
Expand All @@ -28,4 +28,8 @@

from .remote import AptRemote

from .repository import AptRepository, AptRepositoryReleaseServiceOverride
from .repository import (
AptRepository,
AptRepositoryReleaseServiceOverride,
AptRepositoryReleasePackageSigningFingerprintOverride,
)
51 changes: 49 additions & 2 deletions pulp_deb/app/models/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
SourceIndex,
SourcePackage,
SourcePackageReleaseComponent,
AptPackageSigningService,
)

import logging
Expand Down Expand Up @@ -66,13 +67,24 @@ class AptRepository(Repository, AutoAddObjPermsMixin):
signing_service = models.ForeignKey(
AptReleaseSigningService, on_delete=models.PROTECT, null=True
)

package_signing_service = models.ForeignKey(
AptPackageSigningService, on_delete=models.SET_NULL, null=True
)

package_signing_fingerprint = models.TextField(null=True, max_length=40)

# Implicit signing_service_release_overrides
# Implicit package_signing_fingerprint_release_overrides

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
permissions = [
("manage_roles_aptrepository", "Can manage roles on APT repositories"),
("modify_content_aptrepository", "Add content to, or remove content from a repository"),
(
"modify_content_aptrepository",
"Add content to, or remove content from a repository",
),
("repair_aptrepository", "Copy an APT repository"),
("sync_aptrepository", "Sync an APT repository"),
("delete_aptrepository_version", "Delete a repository version"),
Expand All @@ -91,6 +103,21 @@ def release_signing_service(self, release):
except AptRepositoryReleaseServiceOverride.DoesNotExist:
return self.signing_service

def release_package_signing_fingerprint(self, release):
"""
Return the Package Signing Fingerprint specified in the overrides if there is one for this
release, else return self.package_signing_fingerprint.
"""
if isinstance(release, Release):
release = release.distribution
try:
override = self.package_signing_fingerprint_release_overrides.get(
release_distribution=release
)
return override.package_signing_fingerprint
except AptRepositoryReleasePackageSigningFingerprintOverride.DoesNotExist:
return self.package_signing_fingerprint

def initialize_new_version(self, new_version):
"""
Remove old metadata from the repo before performing anything else for the new version. This
Expand Down Expand Up @@ -125,7 +152,9 @@ class AptRepositoryReleaseServiceOverride(BaseModel):
"""

repository = models.ForeignKey(
AptRepository, on_delete=models.CASCADE, related_name="signing_service_release_overrides"
AptRepository,
on_delete=models.CASCADE,
related_name="signing_service_release_overrides",
)
signing_service = models.ForeignKey(AptReleaseSigningService, on_delete=models.PROTECT)
release_distribution = models.TextField()
Expand All @@ -134,6 +163,24 @@ class Meta:
unique_together = (("repository", "release_distribution"),)


class AptRepositoryReleasePackageSigningFingerprintOverride(BaseModel):
"""
Override the signing fingerprint that a single Release will use in this AptRepository for
signing packages.
"""

repository = models.ForeignKey(
AptRepository,
on_delete=models.CASCADE,
related_name="package_signing_fingerprint_release_overrides",
)
package_signing_fingerprint = models.TextField(max_length=40)
release_distribution = models.TextField()

class Meta:
unique_together = (("repository", "release_distribution"),)


def find_dist_components(package_ids, content_set):
"""
Given a list of package_ids and a content_set, this function will find all distribution-
Expand Down
138 changes: 125 additions & 13 deletions pulp_deb/app/models/signing_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,30 @@
import os
from pathlib import Path
import shutil
import subprocess
from typing import Optional
import gnupg
import tempfile

from pulpcore.plugin.models import SigningService
from importlib_resources import files


def prepare_gpg(temp_directory_name, public_key, pubkey_fingerprint):
# Prepare GPG:
# gpg = gnupg.GPG(gnupghome=temp_directory_name)
gpg = gnupg.GPG(keyring=str(Path(temp_directory_name) / ".keyring"))
gpg.import_keys(public_key)
imported_keys = gpg.list_keys()

if len(imported_keys) != 1:
message = "We have imported more than one key! Aborting validation!"
raise RuntimeError(message)

if imported_keys[0]["fingerprint"] != pubkey_fingerprint:
message = "The signing service fingerprint does not appear to match its public key!"
raise RuntimeError(message)
return gpg


class AptReleaseSigningService(SigningService):
Expand Down Expand Up @@ -70,19 +92,7 @@ def validate(self):
raise RuntimeError(message.format(signature_file, signature_type))

# Prepare GPG:
gpg = gnupg.GPG(gnupghome=temp_directory_name)
gpg.import_keys(self.public_key)
imported_keys = gpg.list_keys()

if len(imported_keys) != 1:
message = "We have imported more than one key! Aborting validation!"
raise RuntimeError(message)

if imported_keys[0]["fingerprint"] != self.pubkey_fingerprint:
message = (
"The signing service fingerprint does not appear to match its public key!"
)
raise RuntimeError(message)
gpg = prepare_gpg(temp_directory_name, self.public_key, self.pubkey_fingerprint)

# Verify InRelease file
inline_path = signatures.get("inline")
Expand Down Expand Up @@ -138,3 +148,105 @@ def validate(self):
if verified.pubkey_fingerprint != self.pubkey_fingerprint:
message = "'{}' appears to have been signed using the wrong key!"
raise RuntimeError(message.format(detached_path))


class AptPackageSigningService(SigningService):
"""
A model used for signing Apt packages.

The pubkey_fingerprint should be passed explicitly in the sign method.
"""

def _env_variables(self, env_vars=None):
# Prevent the signing service pubkey to be used for signing a package.
# The pubkey should be provided explicitly.
_env_vars = {"PULP_SIGNING_KEY_FINGERPRINT": None}
if env_vars:
_env_vars.update(env_vars)
return super()._env_variables(_env_vars)

def sign(
self,
filename: str,
env_vars: Optional[dict] = None,
pubkey_fingerprint: Optional[str] = None,
):
"""
Sign a package @filename using @pubkey_fingerprint.

Args:
filename: The absolute path to the package to be signed.
env_vars: (optional) Dict of env_vars to be passed to the signing script.
pubkey_fingerprint: The V4 fingerprint that correlates with the private key to use.
"""
if not pubkey_fingerprint:
raise ValueError("A pubkey_fingerprint must be provided.")
_env_vars = env_vars or {}
_env_vars["PULP_SIGNING_KEY_FINGERPRINT"] = pubkey_fingerprint
return super().sign(filename, _env_vars)

def validate(self):
"""
Validate a signing service for an Apt package signature.

Specifically, it validates that self.signing_script can sign an apt package with
the sample key self.pubkey and that the self.sign() method returns:

```json
{"apt_package": "<path/to/package.deb>"}
```

Recreates the check that "debsig-verify" would be doing because debsig-verify is
complicated to set up correctly, and doing so would add a dependency that is not available
on rpm-based systems.
"""
with tempfile.TemporaryDirectory() as temp_directory_name:
# copy test deb package
sample_deb = shutil.copy(
files("pulp_deb").joinpath("tests/functional/data/packages/frigg_1.0_ppc64.deb"),
temp_directory_name,
)
return_value = self.sign(sample_deb, pubkey_fingerprint=self.pubkey_fingerprint)
try:
signed_deb = return_value["deb_package"]
except KeyError:
raise Exception(f"Malformed output from signing script: {return_value}")

# Prepare GPG:
gpg = prepare_gpg(temp_directory_name, self.public_key, self.pubkey_fingerprint)

self._validate_deb_package(signed_deb, self.pubkey_fingerprint, temp_directory_name, gpg)


@staticmethod
def _validate_deb_package(deb_package_path: str, pubkey_fingerprint: str, temp_directory_name: str, gpg: gnupg.GPG):
"""
Validate that the deb package at @deb_package_path is correctly signed.

This is a placeholder for future validation logic if needed.
"""
# unpack the archive
cmd = ["ar", "x", deb_package_path]
res = subprocess.run(cmd, cwd=temp_directory_name, capture_output=True)
if res.returncode != 0:
raise Exception(f"Failed to read package {deb_package_path}. Please check the package.")

# cat the unpacked archive bits together
temp_dir = Path(temp_directory_name)
with (temp_dir / "combined").open("wb") as combined:
for filename in ("debian-binary", "control.*", "data.*"):
# There will only be one control.tar.gz (or whatever) file, but we have to glob
# and iterate because the compression type can vary.
for x in temp_dir.glob(filename):
with x.open("rb") as f:
shutil.copyfileobj(f, combined)

# verify combined data with _gpgorigin detached signature
with (temp_dir / "_gpgorigin").open("rb") as gpgorigin:
verified = gpg.verify_file(gpgorigin, str(temp_dir / "combined"))
if not verified.valid:
raise Exception(f"GPG Verification of the signed package {deb_package_path} failed!")
if verified.pubkey_fingerprint != pubkey_fingerprint:
raise Exception(
f"'{deb_package_path}' appears to have been signed using the wrong key!"
)
31 changes: 31 additions & 0 deletions pulp_deb/app/serializers/content_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,37 @@ def deferred_validate(self, data):

return data

def validate(self, data):
validated_data = super().validate(data)
sign_package = self.context.get("sign_package", None)
# choose branch, if not set externally
if sign_package is None:
sign_package = bool(
validated_data.get("repository")
and validated_data["repository"].package_signing_service
)
self.context["sign_package"] = sign_package

# normal branch
if sign_package is False:
return validated_data

# signing branch
if not validated_data["repository"].package_signing_fingerprint:
raise ValidationError(
_(
"To sign a package on upload, the associated Repository must set both"
"'package_signing_service' and 'package_signing_fingerprint'."
)
)

if not validated_data.get("file") and not validated_data.get("upload"):
raise ValidationError(
_("To sign a package on upload, a file or upload must be provided.")
)

return validated_data

class Meta(SinglePackageUploadSerializer.Meta):
fields = (
SinglePackageUploadSerializer.Meta.fields
Expand Down
Loading