diff --git a/nodeman/models.py b/nodeman/models.py index cbe6395..fc11a25 100644 --- a/nodeman/models.py +++ b/nodeman/models.py @@ -7,7 +7,7 @@ from pydantic import BaseModel, Field, StringConstraints, field_validator from pydantic.types import AwareDatetime -from .db_models import TapirNode +from .db_models import TapirCertificate, TapirNode from .jose import PrivateJwk, PrivateSymmetric, PublicJwk, PublicJwks, public_key_factory from .settings import MqttUrl @@ -92,16 +92,26 @@ class NodeBootstrapInformation(BaseModel): class NodeCertificate(BaseModel): x509_certificate: str = Field(title="X.509 Client Certificate Bundle") - x509_ca_certificate: str = Field(title="X.509 CA Certificate Bundle") - x509_certificate_serial_number: int | None = Field(default=None, exclude=True) + x509_ca_certificate: str | None = Field(title="X.509 CA Certificate Bundle") + x509_certificate_serial_number: str x509_certificate_not_valid_after: datetime @field_validator("x509_certificate", "x509_ca_certificate") @classmethod - def validate_pem_bundle(cls, v: str): - _ = load_pem_x509_certificates(v.encode()) + def validate_pem_bundle(cls, v: str | None): + if v is not None: + _ = load_pem_x509_certificates(v.encode()) return v + @classmethod + def from_db_model(cls, certificate: TapirCertificate): + return cls( + x509_certificate=certificate.certificate, + x509_ca_certificate=None, + x509_certificate_serial_number=str(certificate.serial), + x509_certificate_not_valid_after=certificate.not_valid_after, + ) + class NodeConfiguration(BaseModel): name: str = Field(title="Node name", examples=["node.example.com"]) diff --git a/nodeman/nodes.py b/nodeman/nodes.py index e7ffd98..c18fc3e 100644 --- a/nodeman/nodes.py +++ b/nodeman/nodes.py @@ -101,7 +101,7 @@ def process_csr_request(request: Request, csr: x509.CertificateSigningRequest, n TapirCertificate.from_x509_certificate(name=name, x509_certificate=x509_certificate).save() logger.info( - "Issued certificate for name=%s serial=%d not_valid_after=%s", + "Issued certificate for name=%s serial=%s not_valid_after=%s", name, x509_certificate_serial_number, x509_not_valid_after_utc, @@ -115,7 +115,7 @@ def process_csr_request(request: Request, csr: x509.CertificateSigningRequest, n return NodeCertificate( x509_certificate=x509_certificate_pem, x509_ca_certificate=x509_ca_certificate_pem, - x509_certificate_serial_number=x509_certificate_serial_number, + x509_certificate_serial_number=str(x509_certificate_serial_number), x509_certificate_not_valid_after=x509_certificate.not_valid_after_utc, ) @@ -465,3 +465,24 @@ async def get_node_configuration( response.headers["Cache-Control"] = f"public, max-age={max_age}" return res + + +@router.get( + "/api/v1/node/{name}/certificate", + responses={ + status.HTTP_200_OK: {"model": NodeCertificate}, + status.HTTP_404_NOT_FOUND: {}, + }, + tags=["client"], + response_model_exclude_none=True, +) +async def get_node_certificate(name: str) -> NodeCertificate: + """Get node certificate""" + + node = find_node(name) + + if certificate := TapirCertificate.objects(name=node.name).order_by("-_id").first(): + return NodeCertificate.from_db_model(certificate) + + logging.debug("Certificate for node %s not found", name, extra={"nodename": name}) + raise HTTPException(status.HTTP_404_NOT_FOUND) diff --git a/tests/test_api.py b/tests/test_api.py index 399aa14..d1c9376 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -152,6 +152,20 @@ def _test_enroll(data_key: JWK, x509_key: PrivateKey, requested_name: str | None assert node_information["name"] == name assert response.headers.get("Cache-Control") is not None + ###################### + # Get node certificate + + response = client.get(f"{node_url}/certificate") + assert response.status_code == status.HTTP_200_OK + node_certificate = response.json() + print(json.dumps(node_certificate, indent=4)) + assert node_certificate["x509_certificate"] == enrollment_response["x509_certificate"] + assert isinstance(node_certificate["x509_certificate_serial_number"], str) + assert node_certificate["x509_certificate_serial_number"] == enrollment_response["x509_certificate_serial_number"] + assert ( + node_certificate["x509_certificate_not_valid_after"] == enrollment_response["x509_certificate_not_valid_after"] + ) + ##################### # Get node public key