Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add work-in-progress testing codebase #52

Open
wants to merge 2 commits into
base: hond
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions firmitas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,5 @@ def readconf(confobjc):
)
sys.exit(1)

if not os.path.exists(standard.hostloca):
logrdata.logrobjc.error(
"Please set the directory containing the service hostname map properly"
)
sys.exit(1)
else:
if os.path.exists(standard.hostloca):
standard.certdict = yaml.safe_load(Path(standard.hostloca).read_text())
143 changes: 107 additions & 36 deletions firmitas/base/maintool.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,49 +54,120 @@ def readcert(certobjc):
return strtdate, stopdate, cstarted, cstopped, daystobt, daystodd, issuauth, serialno


def generate():
logrdata.logrobjc.info("Generating into the configured directory")
doneqant, failqant, totlqant = 0, 0, 0

logrdata.logrobjc.info("Validating X.509-standard TLS certificate(s)")
certloca = Path(standard.certloca)

for file in certloca.iterdir():
if not file.is_file() or ".crt" not in file.name:
continue

certpath = Path(file.as_posix())
totlqant += 1

if not os.path.exists(certpath):
logrdata.logrobjc.warning(
f"[{file.stem}] The specified X.509-standard TLS certificate could not "
+ "be located"
)
failqant += 1
continue

try:
certobjc = x509.load_pem_x509_certificate(certpath.read_bytes(), default_backend())
readdata = readcert(certobjc)
except ValueError:
logrdata.logrobjc.error(
f"[{file.stem}] The specified X.509-standard TLS certificate could not be read"
)
failqant += 1
else:
logrdata.logrobjc.info(
f"[{file.stem}] The specified X.509-standard TLS certificate was read successfully"
)
standard.certdict[file.name] = {
"path": file.name,
"user": standard.username,
"certstat": {
"strtdate": readdata[0],
"stopdate": readdata[1],
"cstarted": readdata[2],
"cstopped": readdata[3],
"daystobt": readdata[4],
"daystodd": readdata[5],
"issuauth": readdata[6],
"serialno": readdata[7],
},
"notistat": {
"done": False,
"link": "",
"time": "",
}
}
doneqant += 1

logrdata.logrobjc.info(
f"Of {totlqant} TLS certificates, {doneqant} TLS certificate(s) were read successfully "
+ f"while {failqant} TLS certificate(s) could not be read"
)

with open(standard.hostloca, "w") as yamlfile:
print(standard.hostloca, len(standard.certdict))
yaml.safe_dump(standard.certdict, yamlfile)


def probedir():
logrdata.logrobjc.info("Probing into the configured directory")
doneqant, failqant, totlqant = 0, 0, 0

logrdata.logrobjc.info("Validating X.509-standard TLS certificate(s)")
print(standard.hostloca, len(standard.certdict))
standard.certdict = yaml.safe_load(Path(standard.hostloca).read_text())
logrdata.logrobjc.info(
f"Validating {len(standard.certdict)} X.509-standard TLS certificates"
)

for nameindx in standard.certdict:
totlqant += 1
certpath = Path(standard.certloca, standard.certdict[nameindx]["path"])
if os.path.exists(certpath):
try:
certobjc = x509.load_pem_x509_certificate(certpath.read_bytes(), default_backend())
(
standard.certdict[nameindx]["certstat"]["strtdate"],
standard.certdict[nameindx]["certstat"]["stopdate"],
standard.certdict[nameindx]["certstat"]["cstarted"],
standard.certdict[nameindx]["certstat"]["cstopped"],
standard.certdict[nameindx]["certstat"]["daystobt"],
standard.certdict[nameindx]["certstat"]["daystodd"],
standard.certdict[nameindx]["certstat"]["issuauth"],
standard.certdict[nameindx]["certstat"]["serialno"],
) = readcert(certobjc)
doneqant += 1
logrdata.logrobjc.info(
f"[{nameindx}] The specified X.509-standard TLS certificate was read "
+ "successfully"
)
except ValueError:
failqant += 1
logrdata.logrobjc.error(
f"[{nameindx}] The specified X.509-standard TLS certificate could not be read"
)
else:
failqant += 1
totlqant += 1

if not os.path.exists(certpath):
logrdata.logrobjc.warning(
f"[{nameindx}] The specified X.509-standard TLS certificate could not "
+ "be located"
)
failqant += 1
continue

try:
certobjc = x509.load_pem_x509_certificate(certpath.read_bytes(), default_backend())
readdata = readcert(certobjc)
except ValueError:
logrdata.logrobjc.error(
f"[{nameindx.replace('.crt', '')}] The specified X.509-standard TLS certificate could not be read"
)
failqant += 1
else:
logrdata.logrobjc.info(
f"[{nameindx.replace('.crt', '')}] The specified X.509-standard TLS certificate was read successfully"
)
(
standard.certdict[nameindx]["certstat"]["strtdate"],
standard.certdict[nameindx]["certstat"]["stopdate"],
standard.certdict[nameindx]["certstat"]["cstarted"],
standard.certdict[nameindx]["certstat"]["cstopped"],
standard.certdict[nameindx]["certstat"]["daystobt"],
standard.certdict[nameindx]["certstat"]["daystodd"],
standard.certdict[nameindx]["certstat"]["issuauth"],
standard.certdict[nameindx]["certstat"]["serialno"],
) = readdata
doneqant += 1

logrdata.logrobjc.info(
f"Of {totlqant} TLS certificates, {doneqant} TLS certificate(s) were read successfully "
f"Of {totlqant} TLS certificate(s), {doneqant} TLS certificate(s) were read successfully "
+ f"while {failqant} TLS certificate(s) could not be read"
)

with open(standard.hostloca, "w") as yamlfile:
yaml.safe_dump(standard.certdict, yamlfile)

Expand All @@ -110,12 +181,12 @@ def gonotify():
if standard.certdict[certindx]["certstat"]["cstopped"]:
afstopcn += 1
logrdata.logrobjc.warning(
f"[{certindx}] The specified X.509 TLS certificate is not valid anymore"
f"[{certindx.replace('.crt', '')}] The specified X.509 TLS certificate is not valid anymore"
)
else:
if standard.certdict[certindx]["certstat"]["daystodd"] <= standard.daysqant:
logrdata.logrobjc.warning(
f"[{certindx}] The specified X.509 TLS certificate is about to expire "
f"[{certindx.replace('.crt', '')}] The specified X.509 TLS certificate is about to expire "
+ f"in under {standard.daysqant} days from now"
)
if not standard.certdict[certindx]["notistat"]["done"]:
Expand All @@ -135,7 +206,7 @@ def gonotify():
if rtrnobjc[0]:
succqant += 1
logrdata.logrobjc.info(
f"[{certindx}] The notification ticket for renewing the "
f"[{certindx.replace('.crt', '')}] The notification ticket for renewing the "
+ "TLS certificate has now been created"
)
standard.certdict[certindx]["notistat"]["done"] = rtrnobjc[0]
Expand All @@ -148,9 +219,9 @@ def gonotify():
f"[{certindx}] The specified X.509 TLS certificate is not valid yet"
)
logrdata.logrobjc.info(
f"Of {totlqant} TLS certificates, {bfstrtcn} TLS certificate(s) were not valid "
+ f"yet, {afstopcn} TLS certificates were not valid anymore and {succqant} TLS "
+ "certificates were notified of being near their validity expiry"
f"Of {totlqant} TLS certificate(s), {bfstrtcn} TLS certificate(s) were not valid "
+ f"yet, {afstopcn} TLS certificate(s) were not valid anymore and {succqant} TLS "
+ "certificate(s) were notified of being near their validity expiry"
)
with open(standard.hostloca, "w") as yamlfile:
yaml.safe_dump(standard.certdict, yamlfile)
Expand Down
10 changes: 9 additions & 1 deletion firmitas/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
"""


import os

import click

from firmitas import __vers__, readconf
from firmitas.base.maintool import gonotify, probedir
from firmitas.base.maintool import generate, gonotify, probedir
from firmitas.conf import logrdata, standard


@click.command(name="firmitas")
Expand All @@ -43,5 +46,10 @@ def main(conffile=None):
with open(conffile) as confobjc:
exec(compile(confobjc.read(), conffile, "exec"), confdict) # noqa : S102
readconf(confdict)

if not os.path.exists(standard.hostloca):
logrdata.logrobjc.warning("Generating a new service hostname dictionary")
generate()

probedir()
gonotify()
8 changes: 4 additions & 4 deletions firmitas/unit/gopagure.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def makenote(
):
try:
logrdata.logrobjc.debug(
f"[{servname}] Notification request attempt count - {retcount+1} of {standard.maxretry}"
f"[{servname.replace('.crt', '')}] Notification request attempt count - {retcount+1} of {standard.maxretry}"
)
rqstobjc = post(
url=f"https://pagure.io/api/0/{standard.reponame}/new_issue",
Expand All @@ -65,12 +65,12 @@ def makenote(
timeout=standard.rqsttime,
)
logrdata.logrobjc.debug(
f"[{servname}] The notification request was met with response code "
f"[{servname.replace('.crt', '')}] The notification request was met with response code "
+ f"{rqstobjc.status_code}"
)
if rqstobjc.status_code == 200:
logrdata.logrobjc.debug(
f"[{servname}] The created notification ticket was created with ID "
f"[{servname.replace('.crt', '')}] The created notification ticket was created with ID "
+ f"#{rqstobjc.json()['issue']['id']} ({rqstobjc.json()['issue']['full_url']})."
)
return (
Expand All @@ -82,6 +82,6 @@ def makenote(
return False, "", ""
except Exception as expt:
logrdata.logrobjc.error(
f"[{servname}] The notification ticket could not be created - {expt}"
f"[{servname.replace('.crt', '')}] The notification ticket could not be created - {expt}"
)
return False, "", ""
66 changes: 55 additions & 11 deletions test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

standard_list = [
"[INFO] Probing into the configured directory",
"[INFO] Validating 6 X.509-standard TLS certificates",
"[INFO] Validating X.509-standard TLS certificate(s)",
"[DEBUG] [joystick.stg] Issued by RabbitMQ STAGING CA",
"[DEBUG] [joystick.stg] Serial number 44541479035547978831580614561088909678",
"[DEBUG] [joystick.stg] Valid from 2019-05-28 23:04:35",
Expand All @@ -47,14 +47,25 @@
"[DEBUG] [waiverdb.stg] Valid until 2029-03-03 23:58:40",
"[INFO] [waiverdb.stg] The specified X.509-standard TLS certificate was read successfully", # noqa : E501
"[ERROR] [mistaken.stg] The specified X.509-standard TLS certificate could not be read", # noqa : E501
"[INFO] Of 6 TLS certificates, 5 TLS certificate(s) were read successfully while 1 TLS certificate(s) could not be read", # noqa : E501
]


def list_etoe_pagure(list_etoe: list = standard_list.copy()) -> list: # noqa : B008
def list_etoe_pagure_with_config(list_etoe: list = standard_list.copy()) -> list: # noqa : B008
list_etoe += [
"[WARNING] [dtfedmsg.stg] The specified X.509 TLS certificate is not valid anymore",
"[INFO] Of 6 TLS certificates, 1 TLS certificate(s) were not valid yet, 1 TLS certificates were not valid anymore and 0 TLS certificates were notified of being near their validity expiry", # noqa : E501
"[INFO] Of 6 TLS certificate(s), 1 TLS certificate(s) were not valid yet, 1 TLS certificate(s) were not valid anymore and 0 TLS certificate(s) were notified of being near their validity expiry", # noqa : E501
]
return list_etoe


def list_etoe_pagure_without_config(list_etoe: list = standard_list.copy()) -> list: # noqa : B008
list_etoe += [
"[WARNING] Generating a new service hostname dictionary",
"[INFO] Generating into the configured directory",
"[INFO] Of 6 TLS certificates, 5 TLS certificate(s) were read successfully while 1 TLS certificate(s) could not be read",
"[INFO] Of 5 TLS certificate(s), 5 TLS certificate(s) were read successfully while 0 TLS certificate(s) could not be read",
"[WARNING] [dtfedmsg.stg] The specified X.509 TLS certificate is not valid anymore",
"[INFO] Of 5 TLS certificate(s), 0 TLS certificate(s) were not valid yet, 1 TLS certificate(s) were not valid anymore and 0 TLS certificate(s) were notified of being near their validity expiry", # noqa : E501
]
return list_etoe

Expand All @@ -73,7 +84,7 @@ def list_etoe_github(list_etoe: list = standard_list.copy()) -> list: # noqa :
return list_etoe


def list_etoe_auth(list_etoe: list = standard_list.copy()) -> list: # noqa : B008
def list_etoe_auth_base(list_etoe: list = standard_list.copy()) -> list:
list_etoe += [
"[WARNING] [joystick.stg] The specified X.509 TLS certificate is about to expire in under", # noqa : E501
"[DEBUG] [joystick.stg] Notification request attempt count - 1 of 5",
Expand Down Expand Up @@ -103,12 +114,30 @@ def list_etoe_auth(list_etoe: list = standard_list.copy()) -> list: # noqa : B0
"[DEBUG] [waiverdb.stg] The notification request was met with response code 200",
"[DEBUG] [waiverdb.stg] The created notification ticket was created with ID",
"[INFO] [waiverdb.stg] The notification ticket for renewing the TLS certificate has now been created", # noqa : E501
"[INFO] Of 6 TLS certificates, 1 TLS certificate(s) were not valid yet, 1 TLS certificates were not valid anymore and 4 TLS certificates were notified of being near their validity expiry", # noqa : E501
]
return list_etoe


def list_etoe_nope(list_etoe: list = standard_list.copy()) -> list: # noqa : B008
def list_etoe_auth_with_config(list_etoe: list = list_etoe_auth_base()) -> list: # noqa : B008
list_etoe += [
"[INFO] Of 6 TLS certificate(s), 1 TLS certificate(s) were not valid yet, 1 TLS certificate(s) were not valid anymore and 4 TLS certificate(s) were notified of being near their validity expiry", # noqa : E501
]
return list_etoe


def list_etoe_auth_without_config(list_etoe: list = list_etoe_auth_base()) -> list:
list_etoe += [
"[WARNING] Generating a new service hostname dictionary",
"[INFO] Generating into the configured directory",
"[INFO] Of 6 TLS certificates, 5 TLS certificate(s) were read successfully while 1 TLS certificate(s) could not be read",
"[INFO] Of 5 TLS certificate(s), 5 TLS certificate(s) were read successfully while 0 TLS certificate(s) could not be read",
"[WARNING] [dtfedmsg.stg] The specified X.509 TLS certificate is not valid anymore",
"[INFO] Of 5 TLS certificate(s), 0 TLS certificate(s) were not valid yet, 1 TLS certificate(s) were not valid anymore and 4 TLS certificate(s) were notified of being near their validity expiry", # noqa : E501
]
return list_etoe


def list_etoe_nope_base(list_etoe: list = standard_list.copy()) -> list:
list_etoe += [
"[WARNING] [joystick.stg] The specified X.509 TLS certificate is about to expire in under",
"[DEBUG] [joystick.stg] Notification request attempt count - 1 of 5",
Expand Down Expand Up @@ -137,10 +166,25 @@ def list_etoe_nope(list_etoe: list = standard_list.copy()) -> list: # noqa : B0
"[DEBUG] [nuancier.stg] The notification request was met with response code 401",
"[DEBUG] [robosign.stg] The notification request was met with response code 401",
"[DEBUG] [waiverdb.stg] The notification request was met with response code 401",
]
return list_etoe


def list_etoe_nope_with_config(list_etoe: list = list_etoe_nope_base()) -> list: # noqa : B008
list_etoe += [
"[WARNING] [mistaken.stg] The specified X.509 TLS certificate is not valid yet",
"[WARNING] [nuancier.stg] The specified X.509 TLS certificate is about to expire in under",
"[WARNING] [robosign.stg] The specified X.509 TLS certificate is about to expire in under",
"[WARNING] [waiverdb.stg] The specified X.509 TLS certificate is about to expire in under",
"[INFO] Of 6 TLS certificates, 1 TLS certificate(s) were not valid yet, 1 TLS certificates were not valid anymore and 0 TLS certificates were notified of being near their validity expiry" # noqa : E501
"[INFO] Of 6 TLS certificate(s), 1 TLS certificate(s) were not valid yet, 1 TLS certificate(s) were not valid anymore and 0 TLS certificate(s) were notified of being near their validity expiry" # noqa : E501
]
return list_etoe


def list_etoe_nope_without_config(list_etoe: list = list_etoe_nope_base()) -> list:
list_etoe += [
"[WARNING] Generating a new service hostname dictionary",
"[INFO] Generating into the configured directory",
"[INFO] Of 6 TLS certificates, 5 TLS certificate(s) were read successfully while 1 TLS certificate(s) could not be read",
"[INFO] Of 5 TLS certificate(s), 5 TLS certificate(s) were read successfully while 0 TLS certificate(s) could not be read",
"[WARNING] [dtfedmsg.stg] The specified X.509 TLS certificate is not valid anymore",
"[INFO] Of 5 TLS certificate(s), 0 TLS certificate(s) were not valid yet, 1 TLS certificate(s) were not valid anymore and 0 TLS certificate(s) were notified of being near their validity expiry", # noqa : E501
]
return list_etoe
Loading
Loading