From 341e2c00bc1dc047bca3da24d62a0a9f064126ea Mon Sep 17 00:00:00 2001 From: programmingAthlete Date: Fri, 22 Dec 2023 15:05:49 +0100 Subject: [PATCH] Squashed commit of the following: commit 3dba1abcf6ea9e024a9cf7a018d19f8c2595ecd1 Author: programmingAthlete Date: Fri Dec 22 15:05:06 2023 +0100 version commit a4ba9ba5cd32b28b758c0dc893f0f709c204622b Author: programmingAthlete Date: Fri Dec 22 15:04:09 2023 +0100 pythonise attack --- setup.py | 2 +- .../correlation_power_analysis.py | 98 ++++++++++--------- src/crypto_pkg/clis/attacks.py | 24 +---- src/crypto_pkg/settings.py | 5 + src/crypto_pkg/utils/__init__.py | 0 src/crypto_pkg/utils/logging.py | 9 ++ 6 files changed, 69 insertions(+), 69 deletions(-) create mode 100644 src/crypto_pkg/settings.py create mode 100644 src/crypto_pkg/utils/__init__.py create mode 100644 src/crypto_pkg/utils/logging.py diff --git a/setup.py b/setup.py index 4be2bd1..90fa22a 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import setup, find_packages -__version__ = "1.4.2" +__version__ = "1.4.3" with open("README.md", "r", encoding="utf-8") as fh: long_description = fh.read() diff --git a/src/crypto_pkg/attacks/power_analysis/correlation_power_analysis.py b/src/crypto_pkg/attacks/power_analysis/correlation_power_analysis.py index 5dc9c33..828e150 100644 --- a/src/crypto_pkg/attacks/power_analysis/correlation_power_analysis.py +++ b/src/crypto_pkg/attacks/power_analysis/correlation_power_analysis.py @@ -2,7 +2,6 @@ import logging import multiprocessing import os -import sys import time from multiprocessing import Pool from typing import Tuple, List @@ -11,12 +10,9 @@ import matplotlib.pyplot as plt from crypto_pkg.ciphers.symmetric.aes import sbox_table +from crypto_pkg.utils.logging import get_logger -logging.basicConfig(level=logging.INFO) - -log = logging.getLogger(__name__) - -log.setLevel(logging.INFO) +log = get_logger(__name__) def plot_c(data: np.ndarray, byte_position: int, plot: bool = False) -> None: @@ -81,20 +77,20 @@ def __init__(self, data_filename, max_datapoints): self.measurements = measurements @staticmethod - def predictCurrent(keyByte: int, plaintextByte: int) -> int: + def predict_current(key_byte: int, plaintext_byte: int) -> int: """ - Predict the current consumed from the SBOX(keyByte \oplus plainTextByte) operation. Using the Hamming weight + Predict the current consumed from the SBOX(keyByte oplus plainTextByte) operation. Using the Hamming weight Args: - keyByte: one-byte integer - plaintextByte: one-byte integer + key_byte: one-byte integer + plaintext_byte: one-byte integer Returns: - Hamming wight of SBOX(keyByte \oplus plainTextByte) + Hamming wight of SBOX(keyByte oplus plainTextByte) """ - return bin(sbox_table[keyByte ^ plaintextByte])[2:].count('1') + return bin(sbox_table[key_byte ^ plaintext_byte])[2:].count('1') @staticmethod - def calculatePearsonCoefficient(x: np.ndarray, y: np.ndarray) -> float: + def calculate_pearson_coefficient(x: np.ndarray, y: np.ndarray) -> float: """ Calculate Pearson Coefficient @@ -111,7 +107,7 @@ def calculatePearsonCoefficient(x: np.ndarray, y: np.ndarray) -> float: return z_mean / (std_x * std_y) @classmethod - def generatePredictedCurrents(cls, plain_texts, byte_position: int) -> np.ndarray: + def generate_predicted_currents(cls, plain_texts, byte_position: int) -> np.ndarray: """ Generate the matrix P of the predicted currents for all key bytes and plain text bytes @@ -126,10 +122,10 @@ def generatePredictedCurrents(cls, plain_texts, byte_position: int) -> np.ndarra for i in range(len(plain_texts)): b = plain_texts[i][byte_position] hexa = int(b, 16) - p[k][i] = cls.predictCurrent(keyByte=k, plaintextByte=hexa) + p[k][i] = cls.predict_current(key_byte=k, plaintext_byte=hexa) return p - def computeC(self, predicted_currents: np.ndarray, byte_position: int, save: bool = False) -> np.ndarray: + def compute_c(self, predicted_currents: np.ndarray, byte_position: int, save: bool = False) -> np.ndarray: """ Compute the correlation matrix @@ -143,7 +139,7 @@ def computeC(self, predicted_currents: np.ndarray, byte_position: int, save: boo c = np.zeros((len(predicted_currents), len(self.measurements))) for i in range(len(predicted_currents)): for j in range(len(self.measurements)): - c[i][j] = abs(self.calculatePearsonCoefficient(predicted_currents[i], self.measurements[j])) + c[i][j] = abs(self.calculate_pearson_coefficient(predicted_currents[i], self.measurements[j])) if save: np.save(f"matrices/matrix_{byte_position}.npy", np.array(c)) return c @@ -172,14 +168,45 @@ def attack_byte(self, byte_position: int = 0, plot: bool = False, f"[Process {byte_position}] Matrix file not found or -r flag provided -> the correlation matrix for the" f" byte position {byte_position} is calculated") - predicted_current_keys = self.generatePredictedCurrents(plain_texts=self.plain_texts, - byte_position=byte_position) + predicted_current_keys = self.generate_predicted_currents(plain_texts=self.plain_texts, + byte_position=byte_position) log.debug(f"[Process {byte_position}] Current predicted for all keys") log.info(f"[Process {byte_position}] Calculating Correlation matrix C") - c = self.computeC(save=store, byte_position=byte_position, predicted_currents=predicted_current_keys) + c = self.compute_c(save=store, byte_position=byte_position, predicted_currents=predicted_current_keys) + if plot: + plot_c(data=c, byte_position=byte_position, plot=plot) log.info(f"[Process {byte_position}] Process {byte_position} finished") return byte_position, np.unravel_index(np.argmax(c), c.shape)[0] + def attack_full_key(self, show_plot_correlations: bool = False, store_correlation_matrices: bool = False, + re_calculate_correlation_matrices: bool = True): + cores = multiprocessing.cpu_count() + log.info(f"Number of cores: {cores}. The program wil run in chunks of {cores} byte positions\n") + + args_to_processes = tuple( + [[i, show_plot_correlations, store_correlation_matrices, + re_calculate_correlation_matrices] for i + in range(16)]) + log.debug(f"Arguments to the process {args_to_processes}") + print() + + log.info("Starting the multiprocessing attack") + ti = time.time() + with Pool() as pool: + results = pool.starmap(self.attack_byte, args_to_processes) + tf = time.time() + print() + log.info( + f"All processes finished. Final output: {results}. Execution time: {tf - ti} seconds -" + f" {(tf - ti) / 60} minutes") + log.debug(f"Constructing the final key from the output") + out = [(pos, hex(item)[2:]) for (pos, item) in results] + sorted_list = sorted(out, key=lambda x: x[0]) + key_list = [item[1] for item in sorted_list][::-1] + key = ''.join(key_list) + log.info(f"\nKey Found {key}") + return key + def full_attack(arguments): log.debug("Checking the existence of 'matrices' and 'plot' sub-directories") @@ -208,34 +235,11 @@ def full_attack(arguments): print(f"Key byte found: {hex(key_byte[1])[2:]}") return - cores = multiprocessing.cpu_count() - log.info(f"Number of cores: {cores}. The program wil run in chunks of {cores} byte positions") - print() - - args_to_processes = tuple( - [[i, arguments.show_plot_correlations, arguments.store_correlation_matrices, - arguments.re_calculate_correlation_matrices] for i - in range(16)]) - log.debug(f"Arguments to the process {args_to_processes}") - print() - - log.info("Starting the multiprocessing attack") - ti = time.time() - with Pool() as pool: - results = pool.starmap(attack.attack_byte, args_to_processes) - tf = time.time() - print() - log.info( - f"All processes finished. Final output: {results}. Execution time: {tf - ti} seconds -" - f" {(tf - ti) / 60} minutes") - log.debug(f"Constructing the final key from the output") - out = [(pos, hex(item)[2:]) for (pos, item) in results] - sorted_list = sorted(out, key=lambda x: x[0]) - key_list = [item[1] for item in sorted_list][::-1] - key = ''.join(key_list) - print(f"\nKey Found") + key = attack.attack_full_key(show_plot_correlations=arguments.show_plot_correlations, + store_correlation_matrices=arguments.store_correlation_matrices, + re_calculate_correlation_matrices=arguments.re_calculate_correlation_matrices) + print("Key Found") print(key) - return if __name__ == '__main__': diff --git a/src/crypto_pkg/clis/attacks.py b/src/crypto_pkg/clis/attacks.py index ec6fc32..3ec509c 100644 --- a/src/crypto_pkg/clis/attacks.py +++ b/src/crypto_pkg/clis/attacks.py @@ -195,25 +195,7 @@ def attack_correlation_power_analysis( re_calculate=True) print(f"Key byte found: {hex(key_byte[1])[2:]}") return - cores = multiprocessing.cpu_count() - print(f"Number of cores: {cores}. The program wil run in chunks of {cores} byte positions") - # Run the full correlation attack - args_to_processes = tuple( - [[i, False, False, True] for i - in range(16)]) - - print("Starting the multiprocessing attack") - ti = time.time() - with Pool() as pool: - results = pool.starmap(attack.attack_byte, args_to_processes) - tf = time.time() - print( - f"\nAll processes finished. Final output: {results}. Execution time: {tf - ti} seconds -" - f" {(tf - ti) / 60} minutes") - print(f"Constructing the final key from the output") - out = [(pos, hex(item)[2:]) for (pos, item) in results] - sorted_list = sorted(out, key=lambda x: x[0]) - key_list = [item[1] for item in sorted_list][::-1] - key = ''.join(key_list) - print(f"\nKey Found") + key = attack.attack_full_key(store_correlation_matrices=False, re_calculate_correlation_matrices=False, + show_plot_correlations=False) + print("Key Found") print(key) diff --git a/src/crypto_pkg/settings.py b/src/crypto_pkg/settings.py new file mode 100644 index 0000000..f85612c --- /dev/null +++ b/src/crypto_pkg/settings.py @@ -0,0 +1,5 @@ +import logging + +log_level = logging.INFO + +logging.basicConfig(level=log_level) diff --git a/src/crypto_pkg/utils/__init__.py b/src/crypto_pkg/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/crypto_pkg/utils/logging.py b/src/crypto_pkg/utils/logging.py new file mode 100644 index 0000000..da87eb6 --- /dev/null +++ b/src/crypto_pkg/utils/logging.py @@ -0,0 +1,9 @@ +import logging + +from crypto_pkg import settings + + +def get_logger(location: str = __name__): + log = logging.getLogger(location) + log.setLevel(settings.log_level) + return log