diff --git a/setup.py b/setup.py index 1a847c1..15373bf 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import setup, find_packages -__version__ = "1.4.10" +__version__ = "1.4.11" with open("README.md", "r", encoding="utf-8") as fh: long_description = fh.read() diff --git a/src/crypto_pkg/attacks/block_ciphers/double_encryption.py b/src/crypto_pkg/attacks/block_ciphers/double_encryption.py index 7a020e5..caa34de 100644 --- a/src/crypto_pkg/attacks/block_ciphers/double_encryption.py +++ b/src/crypto_pkg/attacks/block_ciphers/double_encryption.py @@ -1,10 +1,12 @@ import random -import sys from typing import Dict, Tuple, Union from Crypto.Cipher import AES from crypto_pkg.attacks.block_ciphers.utils import Text, prepare_key +from crypto_pkg.utils.logging import get_logger, set_level + +log = get_logger(__name__) class DoubleAESAttack: @@ -23,25 +25,33 @@ def decrypt(key: Text, cipher_text: Text) -> Text: @classmethod def lookup_table_computation(cls, plain_text: str, max_key: int = 24) -> Dict[int, int]: + log.info(f"Compute lookup table for plaintext {plain_text}") p = Text(text=bytes.fromhex(plain_text)) + log.debug("Starting lookup table computation") out = {cls.encrypt(key=prepare_key(key=i, max_key=max_key), plain_text=p).integer: i for i in range(1, 2 ** max_key)} + log.debug("Lookup table computation completed") return out @classmethod def search_match(cls, cipher_text: str, lookup_table: Dict[int, int]) -> Union[Tuple[Text, Text], None]: + log.info(f"Search match for ciphertext {cipher_text}") c = Text(text=bytes.fromhex(cipher_text)) - for i in range(2 ** 24): key = prepare_key(i) m = cls.decrypt(key=key, cipher_text=c) if lookup_table.get(m.integer): + log.debug(f"Match found for key {key}") return prepare_key(lookup_table.get(m.integer)), key @classmethod - def attack(cls, plain_text: str, cipher_text: str, max_key: int = 24): + @set_level(logger=log) + def attack(cls, plain_text: str, cipher_text: str, max_key: int = 24, _verbose: bool = False): + log.info(f"Constructing encryption lookup table for plain text {plain_text} ans maximum key size {max_key}") look_up_table = cls.lookup_table_computation(plain_text=plain_text, max_key=max_key) + log.info("Search encryption match in lookup table") keys = cls.search_match(cipher_text=cipher_text, lookup_table=look_up_table) + log.debug(f"Key Found") return keys diff --git a/src/crypto_pkg/attacks/block_ciphers/modified_aes.py b/src/crypto_pkg/attacks/block_ciphers/modified_aes.py index 6e32493..d6da011 100644 --- a/src/crypto_pkg/attacks/block_ciphers/modified_aes.py +++ b/src/crypto_pkg/attacks/block_ciphers/modified_aes.py @@ -6,11 +6,9 @@ from crypto_pkg.attacks.block_ciphers.utils import prepare_key from crypto_pkg.ciphers.symmetric.aes import CustomAES, array_to_matrix, get_array_from_state +from crypto_pkg.utils.logging import set_level, get_logger -logging.basicConfig(level=logging.CRITICAL) - -_log = getLogger(__name__) -_log.setLevel(logging.CRITICAL) +log = get_logger(__name__) class ModifiedAES(CustomAES): @@ -47,12 +45,11 @@ def attack_section(self, plain_text, cipher_block_ref, init_pos, section_n=0): c = self.encrypt(key=k_block, plain_text=plain_text) c_by_block = [c[i * 4:i * 4 + 4] for i in range(len(c))] if c_by_block[section_n] == cipher_block_ref[section_n]: - _log.info(f"key guess for block {section_n}: {key.hex}") + log.info(f"key guess for block {section_n}: {key.hex}") return key - def attack(self, plain_text: str, cipher_text: str, verbose: bool = False): - if verbose: - _log.setLevel(logging.DEBUG) + @set_level(logger=log) + def attack(self, plain_text: str, cipher_text: str, _verbose: bool = False): p_int_list = [int(item, 16) for item in [plain_text[i * 2:i * 2 + 2] for i in range(len(plain_text))] if item != ''] c_int_list = [int(item, 16) for item in [cipher_text[i * 2:i * 2 + 2] for i in range(len(cipher_text))] if @@ -66,13 +63,13 @@ def attack(self, plain_text: str, cipher_text: str, verbose: bool = False): [p_int_list, c_by_block_ref, 128, 3] ) - _log.debug("Run attack on sub-blocks in parallel") + log.debug("Run attack on sub-blocks in parallel") with Pool() as pool: res = pool.starmap(self.attack_section, args) r = [int(item.hex, 16) for item in res] - _log.debug(f"Parallel execution terminated with keys guesses {r}") + log.debug(f"Parallel execution terminated with keys guesses {r}") out = r[0] ^ r[1] ^ r[2] ^ r[3] - _log.info(f"128bits key guess: {out}") + log.info(f"128bits key guess: {out}") return out @@ -93,7 +90,7 @@ def attack(self, plain_text: str, cipher_text: str, verbose: bool = False): ct = bytes(c).hex() # ---- Run the attack - _log.debug(f"Run the attack with plain-text {p} and cipher-text {p}") + log.debug(f"Run the attack with plain-text {p} and cipher-text {p}") aes = ModifiedAES() result = aes.attack(plain_text=pt, cipher_text=ct, verbose=True) assert result == int(to_find_key, 16) diff --git a/src/crypto_pkg/attacks/power_analysis/correlation_power_analysis.py b/src/crypto_pkg/attacks/power_analysis/correlation_power_analysis.py index 024ae8b..bbff798 100644 --- a/src/crypto_pkg/attacks/power_analysis/correlation_power_analysis.py +++ b/src/crypto_pkg/attacks/power_analysis/correlation_power_analysis.py @@ -144,8 +144,9 @@ def compute_c(self, predicted_currents: np.ndarray, byte_position: int, save: bo np.save(f"matrices/matrix_{byte_position}.npy", np.array(c)) return c + @set_level(logger=log) def attack_byte(self, byte_position: int = 0, plot: bool = False, - store: bool = True, re_calculate: bool = False) -> Tuple[int, np.ndarray]: + store: bool = True, re_calculate: bool = False, _verbose: bool = False) -> Tuple[int, np.ndarray]: """ Correlation attack of one byte @@ -154,6 +155,7 @@ def attack_byte(self, byte_position: int = 0, plot: bool = False, plot: show the correlation plot 'byte_position' or not - default = False store: save the correlation matrices for the byte 'byte_position' or not - default = True re_calculate: re-calculate the correlation matrix for the byte 'byte_position' even it has been stored + _verbose: Returns: Tuple(byte_position, key byte) """ @@ -180,7 +182,7 @@ def attack_byte(self, byte_position: int = 0, plot: bool = False, @set_level(logger=log) def attack_full_key(self, show_plot_correlations: bool = False, store_correlation_matrices: bool = False, - re_calculate_correlation_matrices: bool = True, verbose: bool = False): + re_calculate_correlation_matrices: bool = True, _verbose: bool = False): cores = multiprocessing.cpu_count() log.info(f"Number of cores: {cores}. The program wil run in chunks of {cores} byte positions\n") diff --git a/src/crypto_pkg/attacks/stream_ciphers/geffe_cipher.py b/src/crypto_pkg/attacks/stream_ciphers/geffe_cipher.py index b7360a3..029ccbf 100644 --- a/src/crypto_pkg/attacks/stream_ciphers/geffe_cipher.py +++ b/src/crypto_pkg/attacks/stream_ciphers/geffe_cipher.py @@ -1,8 +1,11 @@ from decimal import Decimal from enum import Enum -from typing import List, Tuple, Union +from typing import List, Tuple, Union, Dict from crypto_pkg.ciphers.symmetric.geffe import Geffe +from crypto_pkg.utils.logging import get_logger, set_level + +log = get_logger() def int_2_base_2(k, n): @@ -52,7 +55,7 @@ def __init__(self, all_taps: List[List[int]], n: int, f: List[int], stream_ref: self.max_clock = max_clock @staticmethod - def check_match(a1, a2): + def check_match(a1, a2) -> Decimal: match = [1 for i in range(len(a1)) if a1[i] == a2[i]] return Decimal(sum(match)) / Decimal(len(a1)) @@ -92,7 +95,7 @@ def look_for_correlation(self, thresholds: Union[List[Tuple[ThresholdsOperator, return key0, key2 - def find_k1(self, key0, key2): + def find_k1(self, key0, key2) -> Dict[str, list]: g = Geffe(self.n, self.all_taps, self.f) all_keys = [[k0_item, k1, k2_item] for k0_item in key0 for k1 in range(self.max_iter) for k2_item in key2] @@ -101,19 +104,25 @@ def find_k1(self, key0, key2): success = [item[1] for item in res if item[0] is True] if success: result = success[0] - return f"\nSuccess\nThe key is (k0,k1,k2) = {int_2_base_2(result[0], self.n)}," \ - f"{int_2_base_2(result[1], self.n)}," \ - f"{int_2_base_2(result[2], self.n)}" + return {"k0": int_2_base_2(result[0], self.n), + "k1": int_2_base_2(result[1], self.n), + "k2": int_2_base_2(result[2], self.n) + } else: - return "\nFail" + raise Exception("Attack Failed") - def attack(self, thresholds): + @set_level(log) + def attack(self, thresholds, _verbose: bool = False): + log.info("Search for possible seeds") k0, k2 = self.look_for_correlation(thresholds=thresholds) - print("Possible choices") - print(f"\tk_0 = {k0} = {[int_2_base_2(item, 16) for item in k0]}") - print(f"\tk_2 = {k2} = {[int_2_base_2(item, 16) for item in k2]}") + log.info("Possible choices for seeds of LFSR 1 and 3") + msg = f"Possible choices\n\tk_0 = {k0} = {[int_2_base_2(item, 16) for item in k0]}\n" \ + f"\tk_2 = {k2} = {[int_2_base_2(item, 16) for item in k2]}" + log.debug(msg) + log.info("Find seed for LFSR 2") out = self.find_k1(key0=k0, key2=k2) - print(out) + msg = f"\nSuccess\nThe key is (k0,k1,k2)\n\t = {out['k0']},{out['k1']},{out['k2']}" + log.info(f"{msg}") return out @@ -132,4 +141,4 @@ def attack(self, thresholds): tsh = [(ThresholdsOperator.MAX, Decimal('0.5') - epsilon_0), None, (ThresholdsOperator.MIN, Decimal('0.5') + epsilon_1)] - attack.attack(tsh) + print(attack.attack(tsh)) diff --git a/src/crypto_pkg/clis/attacks.py b/src/crypto_pkg/clis/attacks.py index 1580405..d6f600f 100644 --- a/src/crypto_pkg/clis/attacks.py +++ b/src/crypto_pkg/clis/attacks.py @@ -1,10 +1,6 @@ -import multiprocessing import os -import pickle -import time from decimal import Decimal import random -from multiprocessing import Pool from typing import Optional import typer @@ -17,7 +13,6 @@ from crypto_pkg.attacks.stream_ciphers.geffe_cipher import Attack as GeffeAttack, ThresholdsOperator from crypto_pkg.contracts.cli_dto import ModifiedAESIn from importlib import resources -import io app = typer.Typer(pretty_exceptions_show_locals=False, no_args_is_help=True) @@ -27,7 +22,9 @@ def get_hex(x): @app.command('geffe') -def attack_geffe(): +def attack_geffe( + verbose: bool = typer.Option(False, help="Show debug logs") +): """ Example on how to use the attack on a Geffe stream cipher.\n The function doesn't tae any argument, the stream of the cipher is hardcoded together with the LSFRs @@ -37,7 +34,6 @@ def attack_geffe(): stream = '01001110000011101100011101010111011100000011010001111001101101100000000111110110111011011001010111101100111001111100001111100101110000000010110101001111110110010001111101010110011010010110101011000101' # Geffe tabs taps = [[0, 1, 4, 7], [0, 1, 7, 11], [0, 2, 3, 5]] - stream_l = [int(item) for item in stream] attack = GeffeAttack(all_taps=taps, stream_ref=stream, f=[1, 1, 0, 1, 0, 0, 0, 1], max_clock=200, n=16) epsilon_0 = Decimal('0.25') @@ -45,7 +41,7 @@ def attack_geffe(): tsh = [(ThresholdsOperator.MAX, Decimal('0.5') - epsilon_0), None, (ThresholdsOperator.MIN, Decimal('0.5') + epsilon_1)] - attack.attack(tsh) + attack.attack(thresholds=tsh, _verbose=verbose) @app.command("modifiedAES") @@ -99,7 +95,7 @@ def attack_modifier_aes( # ---- Run the attack print(f"Run the attack with plain-text {p} and cipher-text {p}") aes = ModifiedAES() - result = aes.attack(plain_text=model.plain_text, cipher_text=ct, verbose=verbose) + result = aes.attack(plain_text=model.plain_text, cipher_text=ct, _verbose=verbose) if model.key is not None: # Check that the key is the one provided assert result == int(model.key, 16) @@ -125,6 +121,7 @@ def attack_modifier_aes( def attack_double_encryption( plain_text: Optional[str] = typer.Option(None, help="128bits plain text to encrypt"), cipher_text: Optional[str] = typer.Option(None, help="128bits encryption of the plain_text"), + verbose: Optional[bool] = typer.Option(False, help="Show debug logs"), ): """ Example on how to use the double encryption attack on AES.\n @@ -164,7 +161,7 @@ def attack_double_encryption( print("\nStating the attack") print("It might take a bit, but don't worry we'll find it") - ks = DoubleAESAttack.attack(plain_text=pt, cipher_text=ct, max_key=24) + ks = DoubleAESAttack.attack(plain_text=pt, cipher_text=ct, max_key=24, _verbose=verbose) if ks: print("\nKeys found:") print(f"\tk1: 0x{ks[0].hex}") @@ -176,7 +173,8 @@ def attack_correlation_power_analysis( filename: str = typer.Argument('test_file.pickle', help="Filename of the pickle file with the measurements"), max_datapoints: Optional[int] = typer.Option(400, help="Maximum number of data points to consider"), - byte_position: Optional[int] = typer.Option(None, help="Byte position to attack") + byte_position: Optional[int] = typer.Option(None, help="Byte position to attack"), + verbose: Optional[bool] = typer.Option(None, help="Show debug logs") ): """ Example on how to use the power correlation attack.\n @@ -199,11 +197,11 @@ def attack_correlation_power_analysis( if byte_position is not None: key_byte = attack.attack_byte(byte_position=byte_position, plot=False, store=False, - re_calculate=True) + re_calculate=True, _verbose=verbose) print(f"Key byte found: {hex(key_byte[1])[2:]}") return key = attack.attack_full_key(store_correlation_matrices=False, re_calculate_correlation_matrices=False, - show_plot_correlations=False, verbose=True) + show_plot_correlations=False, _verbose=verbose) print("Key Found") print(key) os.remove(filename) diff --git a/src/crypto_pkg/utils/logging.py b/src/crypto_pkg/utils/logging.py index 42ff69e..6ce50cd 100644 --- a/src/crypto_pkg/utils/logging.py +++ b/src/crypto_pkg/utils/logging.py @@ -11,10 +11,14 @@ def get_logger(location: str = __name__): def set_level(logger): def deco(func): - def wrapper( *args, **kwargs): - if kwargs.get('verbose') and kwargs.get('verbose') is True: + def wrapper(*args, **kwargs): + _verbose = kwargs.get('_verbose', None) + if _verbose is None and len(args) == 5: + _verbose = args[4] + if _verbose is True: logger.setLevel(logging.DEBUG) return func(*args, **kwargs) + return wrapper return deco diff --git a/test_file.pickle b/test_file.pickle new file mode 100644 index 0000000..524f73a Binary files /dev/null and b/test_file.pickle differ