Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 3dba1ab
Author: programmingAthlete <luca.bonamino@hotmail.com>
Date:   Fri Dec 22 15:05:06 2023 +0100

    version

commit a4ba9ba
Author: programmingAthlete <luca.bonamino@hotmail.com>
Date:   Fri Dec 22 15:04:09 2023 +0100

    pythonise attack
  • Loading branch information
programmingAthlete committed Dec 22, 2023
1 parent df93358 commit 341e2c0
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 69 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup, find_packages

__version__ = "1.4.2"
__version__ = "1.4.3"

with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
Expand Down
98 changes: 51 additions & 47 deletions src/crypto_pkg/attacks/power_analysis/correlation_power_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
import multiprocessing
import os
import sys
import time
from multiprocessing import Pool
from typing import Tuple, List
Expand All @@ -11,12 +10,9 @@
import matplotlib.pyplot as plt

from crypto_pkg.ciphers.symmetric.aes import sbox_table
from crypto_pkg.utils.logging import get_logger

logging.basicConfig(level=logging.INFO)

log = logging.getLogger(__name__)

log.setLevel(logging.INFO)
log = get_logger(__name__)


def plot_c(data: np.ndarray, byte_position: int, plot: bool = False) -> None:
Expand Down Expand Up @@ -81,20 +77,20 @@ def __init__(self, data_filename, max_datapoints):
self.measurements = measurements

@staticmethod
def predictCurrent(keyByte: int, plaintextByte: int) -> int:
def predict_current(key_byte: int, plaintext_byte: int) -> int:
"""
Predict the current consumed from the SBOX(keyByte \oplus plainTextByte) operation. Using the Hamming weight
Predict the current consumed from the SBOX(keyByte oplus plainTextByte) operation. Using the Hamming weight
Args:
keyByte: one-byte integer
plaintextByte: one-byte integer
key_byte: one-byte integer
plaintext_byte: one-byte integer
Returns:
Hamming wight of SBOX(keyByte \oplus plainTextByte)
Hamming wight of SBOX(keyByte oplus plainTextByte)
"""
return bin(sbox_table[keyByte ^ plaintextByte])[2:].count('1')
return bin(sbox_table[key_byte ^ plaintext_byte])[2:].count('1')

@staticmethod
def calculatePearsonCoefficient(x: np.ndarray, y: np.ndarray) -> float:
def calculate_pearson_coefficient(x: np.ndarray, y: np.ndarray) -> float:
"""
Calculate Pearson Coefficient
Expand All @@ -111,7 +107,7 @@ def calculatePearsonCoefficient(x: np.ndarray, y: np.ndarray) -> float:
return z_mean / (std_x * std_y)

@classmethod
def generatePredictedCurrents(cls, plain_texts, byte_position: int) -> np.ndarray:
def generate_predicted_currents(cls, plain_texts, byte_position: int) -> np.ndarray:
"""
Generate the matrix P of the predicted currents for all key bytes and plain text bytes
Expand All @@ -126,10 +122,10 @@ def generatePredictedCurrents(cls, plain_texts, byte_position: int) -> np.ndarra
for i in range(len(plain_texts)):
b = plain_texts[i][byte_position]
hexa = int(b, 16)
p[k][i] = cls.predictCurrent(keyByte=k, plaintextByte=hexa)
p[k][i] = cls.predict_current(key_byte=k, plaintext_byte=hexa)
return p

def computeC(self, predicted_currents: np.ndarray, byte_position: int, save: bool = False) -> np.ndarray:
def compute_c(self, predicted_currents: np.ndarray, byte_position: int, save: bool = False) -> np.ndarray:
"""
Compute the correlation matrix
Expand All @@ -143,7 +139,7 @@ def computeC(self, predicted_currents: np.ndarray, byte_position: int, save: boo
c = np.zeros((len(predicted_currents), len(self.measurements)))
for i in range(len(predicted_currents)):
for j in range(len(self.measurements)):
c[i][j] = abs(self.calculatePearsonCoefficient(predicted_currents[i], self.measurements[j]))
c[i][j] = abs(self.calculate_pearson_coefficient(predicted_currents[i], self.measurements[j]))
if save:
np.save(f"matrices/matrix_{byte_position}.npy", np.array(c))
return c
Expand Down Expand Up @@ -172,14 +168,45 @@ def attack_byte(self, byte_position: int = 0, plot: bool = False,
f"[Process {byte_position}] Matrix file not found or -r flag provided -> the correlation matrix for the"
f" byte position {byte_position} is calculated")

predicted_current_keys = self.generatePredictedCurrents(plain_texts=self.plain_texts,
byte_position=byte_position)
predicted_current_keys = self.generate_predicted_currents(plain_texts=self.plain_texts,
byte_position=byte_position)
log.debug(f"[Process {byte_position}] Current predicted for all keys")
log.info(f"[Process {byte_position}] Calculating Correlation matrix C")
c = self.computeC(save=store, byte_position=byte_position, predicted_currents=predicted_current_keys)
c = self.compute_c(save=store, byte_position=byte_position, predicted_currents=predicted_current_keys)
if plot:
plot_c(data=c, byte_position=byte_position, plot=plot)
log.info(f"[Process {byte_position}] Process {byte_position} finished")
return byte_position, np.unravel_index(np.argmax(c), c.shape)[0]

def attack_full_key(self, show_plot_correlations: bool = False, store_correlation_matrices: bool = False,
re_calculate_correlation_matrices: bool = True):
cores = multiprocessing.cpu_count()
log.info(f"Number of cores: {cores}. The program wil run in chunks of {cores} byte positions\n")

args_to_processes = tuple(
[[i, show_plot_correlations, store_correlation_matrices,
re_calculate_correlation_matrices] for i
in range(16)])
log.debug(f"Arguments to the process {args_to_processes}")
print()

log.info("Starting the multiprocessing attack")
ti = time.time()
with Pool() as pool:
results = pool.starmap(self.attack_byte, args_to_processes)
tf = time.time()
print()
log.info(
f"All processes finished. Final output: {results}. Execution time: {tf - ti} seconds -"
f" {(tf - ti) / 60} minutes")
log.debug(f"Constructing the final key from the output")
out = [(pos, hex(item)[2:]) for (pos, item) in results]
sorted_list = sorted(out, key=lambda x: x[0])
key_list = [item[1] for item in sorted_list][::-1]
key = ''.join(key_list)
log.info(f"\nKey Found {key}")
return key


def full_attack(arguments):
log.debug("Checking the existence of 'matrices' and 'plot' sub-directories")
Expand Down Expand Up @@ -208,34 +235,11 @@ def full_attack(arguments):
print(f"Key byte found: {hex(key_byte[1])[2:]}")
return

cores = multiprocessing.cpu_count()
log.info(f"Number of cores: {cores}. The program wil run in chunks of {cores} byte positions")
print()

args_to_processes = tuple(
[[i, arguments.show_plot_correlations, arguments.store_correlation_matrices,
arguments.re_calculate_correlation_matrices] for i
in range(16)])
log.debug(f"Arguments to the process {args_to_processes}")
print()

log.info("Starting the multiprocessing attack")
ti = time.time()
with Pool() as pool:
results = pool.starmap(attack.attack_byte, args_to_processes)
tf = time.time()
print()
log.info(
f"All processes finished. Final output: {results}. Execution time: {tf - ti} seconds -"
f" {(tf - ti) / 60} minutes")
log.debug(f"Constructing the final key from the output")
out = [(pos, hex(item)[2:]) for (pos, item) in results]
sorted_list = sorted(out, key=lambda x: x[0])
key_list = [item[1] for item in sorted_list][::-1]
key = ''.join(key_list)
print(f"\nKey Found")
key = attack.attack_full_key(show_plot_correlations=arguments.show_plot_correlations,
store_correlation_matrices=arguments.store_correlation_matrices,
re_calculate_correlation_matrices=arguments.re_calculate_correlation_matrices)
print("Key Found")
print(key)
return


if __name__ == '__main__':
Expand Down
24 changes: 3 additions & 21 deletions src/crypto_pkg/clis/attacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,25 +195,7 @@ def attack_correlation_power_analysis(
re_calculate=True)
print(f"Key byte found: {hex(key_byte[1])[2:]}")
return
cores = multiprocessing.cpu_count()
print(f"Number of cores: {cores}. The program wil run in chunks of {cores} byte positions")
# Run the full correlation attack
args_to_processes = tuple(
[[i, False, False, True] for i
in range(16)])

print("Starting the multiprocessing attack")
ti = time.time()
with Pool() as pool:
results = pool.starmap(attack.attack_byte, args_to_processes)
tf = time.time()
print(
f"\nAll processes finished. Final output: {results}. Execution time: {tf - ti} seconds -"
f" {(tf - ti) / 60} minutes")
print(f"Constructing the final key from the output")
out = [(pos, hex(item)[2:]) for (pos, item) in results]
sorted_list = sorted(out, key=lambda x: x[0])
key_list = [item[1] for item in sorted_list][::-1]
key = ''.join(key_list)
print(f"\nKey Found")
key = attack.attack_full_key(store_correlation_matrices=False, re_calculate_correlation_matrices=False,
show_plot_correlations=False)
print("Key Found")
print(key)
5 changes: 5 additions & 0 deletions src/crypto_pkg/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import logging

log_level = logging.INFO

logging.basicConfig(level=log_level)
Empty file.
9 changes: 9 additions & 0 deletions src/crypto_pkg/utils/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import logging

from crypto_pkg import settings


def get_logger(location: str = __name__):
log = logging.getLogger(location)
log.setLevel(settings.log_level)
return log

0 comments on commit 341e2c0

Please sign in to comment.