From 3a2cf56518b3eb68a2dad733b9e28056bd3aadad Mon Sep 17 00:00:00 2001 From: Luca <44218525+programmingAthlete@users.noreply.github.com> Date: Fri, 15 Dec 2023 19:59:14 +0100 Subject: [PATCH] Cli (#18) * CLI * version --- README.md | 11 + requirements.txt | 4 +- setup.cfg | 6 + setup.py | 7 +- .../block_ciphers/double_encryption.py | 5 +- .../attacks/block_ciphers/modified_aes.py | 4 +- .../correlation_power_analysis.py | 79 ++++++- src/crypto_pkg/clis/__init__.py | 0 src/crypto_pkg/clis/attacks.py | 216 ++++++++++++++++++ src/crypto_pkg/clis/cli.py | 6 + src/crypto_pkg/contracts/cli_dto.py | 16 ++ src/crypto_pkg/entry_point.py | 6 + 12 files changed, 349 insertions(+), 11 deletions(-) create mode 100644 setup.cfg create mode 100644 src/crypto_pkg/clis/__init__.py create mode 100644 src/crypto_pkg/clis/attacks.py create mode 100644 src/crypto_pkg/clis/cli.py create mode 100644 src/crypto_pkg/contracts/cli_dto.py create mode 100644 src/crypto_pkg/entry_point.py diff --git a/README.md b/README.md index 8eb556a..6deb0df 100644 --- a/README.md +++ b/README.md @@ -34,5 +34,16 @@ Usage examples are provided in the attacks source code files
pip install -e .
+
+crypto attacks modifiedAES --help
+
+crypto attacks geffe --help
+
+crypto attacks AES-double-encryption --help
+
+crypto attacks correlation-power-analysis --help
+
## Usage
The Textbook RSA and the DGVH PKEs are used in the [BruteSniffing_Fisher](https://github.com/programmingAthlete/BruteSniffing_Fisher) repository.
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index d72daac..1f8b14e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1 +1,3 @@
-pycryptodome==3.19.0
\ No newline at end of file
+pycryptodome==3.19.0
+typer[all]
+pydantic<2
\ No newline at end of file
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..42f28be
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,6 @@
+[metadata]
+name = crypto_pkg
+
+[options.entry_points]
+console_scripts =
+ crypto = crypto_pkg.entry_point:main
\ No newline at end of file
diff --git a/setup.py b/setup.py
index 7254000..b31edb3 100644
--- a/setup.py
+++ b/setup.py
@@ -1,6 +1,6 @@
from setuptools import setup, find_packages
-__version__ = "1.3.1"
+__version__ = "1.4.0"
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
@@ -19,5 +19,8 @@
packages=find_packages(where="src", exclude=["*tests*"]),
install_requires=requirements,
author="programmingAthlete",
- zip_safe=True
+ zip_safe=True,
+ console_scripts={
+ "crypto":"crypto.entry_point:main"
+ }
)
diff --git a/src/crypto_pkg/attacks/block_ciphers/double_encryption.py b/src/crypto_pkg/attacks/block_ciphers/double_encryption.py
index 3c146f6..7a020e5 100644
--- a/src/crypto_pkg/attacks/block_ciphers/double_encryption.py
+++ b/src/crypto_pkg/attacks/block_ciphers/double_encryption.py
@@ -47,7 +47,10 @@ def attack(cls, plain_text: str, cipher_text: str, max_key: int = 24):
if __name__ == '__main__':
- ''' Example '''
+ ''' Example\n
+ IMPORTANT NOTICE: faisable with general keys, it still has a complexitx of 2^128.
+ In this attack it is assumed that the keys are made of 24bits unknown bits followed by all zero bits.
+ '''
# ---- Generate a plain text - cipher text pair
# Generate keys
diff --git a/src/crypto_pkg/attacks/block_ciphers/modified_aes.py b/src/crypto_pkg/attacks/block_ciphers/modified_aes.py
index 9125668..6e32493 100644
--- a/src/crypto_pkg/attacks/block_ciphers/modified_aes.py
+++ b/src/crypto_pkg/attacks/block_ciphers/modified_aes.py
@@ -58,7 +58,7 @@ def attack(self, plain_text: str, cipher_text: str, verbose: bool = False):
c_int_list = [int(item, 16) for item in [cipher_text[i * 2:i * 2 + 2] for i in range(len(cipher_text))] if
item != '']
- c_by_block_ref = [c_int_list[i * 4:i * 4 + 4] for i in range(len(c))]
+ c_by_block_ref = [c_int_list[i * 4:i * 4 + 4] for i in range(len(c_int_list))]
args = (
[p_int_list, c_by_block_ref, 32, 0],
[p_int_list, c_by_block_ref, 64, 1],
@@ -69,8 +69,8 @@ def attack(self, plain_text: str, cipher_text: str, verbose: bool = False):
_log.debug("Run attack on sub-blocks in parallel")
with Pool() as pool:
res = pool.starmap(self.attack_section, args)
- _log.debug(f"Parallel execution terminated with keys guesses {res}")
r = [int(item.hex, 16) for item in res]
+ _log.debug(f"Parallel execution terminated with keys guesses {r}")
out = r[0] ^ r[1] ^ r[2] ^ r[3]
_log.info(f"128bits key guess: {out}")
return out
diff --git a/src/crypto_pkg/attacks/power_analysis/correlation_power_analysis.py b/src/crypto_pkg/attacks/power_analysis/correlation_power_analysis.py
index 3054886..787ef39 100644
--- a/src/crypto_pkg/attacks/power_analysis/correlation_power_analysis.py
+++ b/src/crypto_pkg/attacks/power_analysis/correlation_power_analysis.py
@@ -1,4 +1,6 @@
+import argparse
import logging
+import multiprocessing
import os
import time
from multiprocessing import Pool
@@ -158,13 +160,43 @@ def attack_byte(self, byte_position: int = 0, plot: bool = False,
return byte_position, np.unravel_index(np.argmax(c), c.shape)[0]
-if __name__ == '__main__':
- filename = "test_file_name.pickle"
- # Run the full correlation attack
- attack = Attack(data_filename=filename, max_datapoints=400)
+def full_attack(arguments):
+ log.debug("Checking the existence of 'matrices' and 'plot' sub-directories")
+ must_have_dirs = ["matrices", "plots"]
+ if not os.path.exists(arguments.filename):
+ log.error(f"File {arguments.filename} does not exist")
+ raise Exception(f"File {arguments.filename} does not exist")
+ for item in must_have_dirs:
+ if not os.path.exists(item):
+ log.warning(f"Directory {item} not found -> creating it")
+ os.makedirs(item)
+ else:
+ log.debug(f"Directory {item} found -> all good")
+
+ print("\nArguments provided")
+ args_attr = arguments.__dict__
+ for arg in args_attr:
+ print(f"\t{arg} -> {args_attr[arg]}")
+ print("\n")
+
+ attack = Attack(data_filename=arguments.filename, max_datapoints=arguments.max_datapoint)
+ if args.byte_position is not None:
+ key_byte = attack.attack_byte(byte_position=arguments.byte_position, plot=arguments.show_plot_correlations,
+ store=arguments.store_correlation_matrices,
+ re_calculate=arguments.re_calculate_correlation_matrices)
+ print(f"Key byte found: {hex(key_byte[1])[2:]}")
+ return
+
+ cores = multiprocessing.cpu_count()
+ log.info(f"Number of cores: {cores}. The program wil run in chunks of {cores} byte positions")
+ print()
+
args_to_processes = tuple(
- [[i, False, False, True] for i
+ [[i, arguments.show_plot_correlations, arguments.store_correlation_matrices,
+ arguments.re_calculate_correlation_matrices] for i
in range(16)])
+ log.debug(f"Arguments to the process {args_to_processes}")
+ print()
log.info("Starting the multiprocessing attack")
ti = time.time()
@@ -182,3 +214,40 @@ def attack_byte(self, byte_position: int = 0, plot: bool = False,
key = ''.join(key_list)
print(f"\nKey Found")
print(key)
+ return
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ prog='CorrelationPowerAttack',
+ description='Find encryption key via correlation power analysis attack')
+ parser.add_argument('-f', '--filename', help="Filename of teh pickle file from where to read the traces", type=str)
+ parser.add_argument('-s', '--store_correlation_matrices', help='Store the correlation matrices in a .npy file to '
+ 'avoid having to re-ccalculated it at each'
+ ' execution',
+ action='store_true')
+ parser.add_argument('--no-store_correlation_matrices', dest='store_correlation_matrices', action='store_false')
+ parser.add_argument('-r', '--re_calculate_correlation_matrices', help='Recalculate the correlation matrix',
+ action='store_true')
+ parser.add_argument('--no-re_calculate_correlation_matrices', dest='re_calculate_correlation_matrices',
+ action='store_false')
+ parser.add_argument('-p', '--show_plot_correlations', help='Show correlation plots - default=False',
+ action='store_true')
+ parser.add_argument('--no-show_plot_correlations', dest='show_plot_correlations',
+ action='store_false')
+ parser.add_argument('-v', '--verbose', help='Show debug logs', action='store_true')
+ parser.add_argument('--no-verbose', help='Show debug logs', action='store_false', dest='verbose')
+ parser.add_argument('-l', '--max_datapoint', help='Maximum number of data points to consider - default=3000',
+ type=int)
+ parser.add_argument('-b', '--byte_position', type=int,
+ help='Provide the byte position that you want to attack. If this '
+ 'argument is provided, the program will only runt he attack for'
+ ' the provided byte position')
+
+ parser.set_defaults(store_correlation_matrices=False, re_calculate_correlation_matrices=False,
+ show_plot_correlations=False, filename="group4.pickle", verbose=False, max_datapoint=4000,
+ byte_position=None)
+ args = parser.parse_args()
+ if args.verbose:
+ log.setLevel(logging.DEBUG)
+ full_attack(arguments=args)
diff --git a/src/crypto_pkg/clis/__init__.py b/src/crypto_pkg/clis/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/crypto_pkg/clis/attacks.py b/src/crypto_pkg/clis/attacks.py
new file mode 100644
index 0000000..ebd1362
--- /dev/null
+++ b/src/crypto_pkg/clis/attacks.py
@@ -0,0 +1,216 @@
+import os
+import time
+from decimal import Decimal
+import random
+from multiprocessing import Pool
+from typing import Optional
+
+import typer
+from Crypto.Cipher import AES
+
+from crypto_pkg.attacks.block_ciphers.double_encryption import DoubleAESAttack
+from crypto_pkg.attacks.block_ciphers.modified_aes import ModifiedAES
+from crypto_pkg.attacks.block_ciphers.utils import prepare_key
+from crypto_pkg.attacks.power_analysis.correlation_power_analysis import Attack as PowerAnalysisAttack
+from crypto_pkg.attacks.stream_ciphers.geffe_cipher import Attack as GeffeAttack, ThresholdsOperator
+from crypto_pkg.contracts.cli_dto import ModifiedAESIn
+
+app = typer.Typer(pretty_exceptions_show_locals=False, no_args_is_help=True)
+
+
+def get_hex(x):
+ return '{:02x}'.format(x).zfill(32)
+
+
+@app.command('geffe')
+def attack_geffe():
+ """
+ Example on how to use the attack on a Geffe stream cipher.\n
+ The function doesn't tae any argument, the stream of the cipher is hardcoded together with the LSFRs
+ settings
+ """
+ # Choose Geffe output
+ stream = '01001110000011101100011101010111011100000011010001111001101101100000000111110110111011011001010111101100111001111100001111100101110000000010110101001111110110010001111101010110011010010110101011000101'
+ # Geffe tabs
+ taps = [[0, 1, 4, 7], [0, 1, 7, 11], [0, 2, 3, 5]]
+ stream_l = [int(item) for item in stream]
+ attack = GeffeAttack(all_taps=taps, stream_ref=stream, f=[1, 1, 0, 1, 0, 0, 0, 1], max_clock=200, n=16)
+
+ epsilon_0 = Decimal('0.25')
+ epsilon_1 = Decimal('0.25')
+ tsh = [(ThresholdsOperator.MAX, Decimal('0.5') - epsilon_0), None,
+ (ThresholdsOperator.MIN, Decimal('0.5') + epsilon_1)]
+
+ attack.attack(tsh)
+
+
+@app.command("modifiedAES")
+def attack_modifier_aes(
+ plain_text: Optional[str] = typer.Option(None, help="128bits plain text to encrypt"),
+ cipher_text: Optional[str] = typer.Option(None, help="128bits encryption of the plain_text"),
+ key: Optional[str] = typer.Option(None, help="Encryption 128bits key to Find"),
+ verbose: bool = typer.Option(False, help="Show debug logs")
+):
+ """
+ Example on how to use the attack on the modified AES.\n
+ If no arguments are provided, an hardcoded key will be used is such a way that the execution is not long, a plain
+ text will be generated at random and the corresponding cipher text will be generated using the hardcoded key.
+ After this the attack will recover the key from the (plain text, cipher text) pair.\n
+ If the plain_text is provided, the key hardcoded key will be used to generate the (plain text, cipher text pair)
+ and then recovered.\n
+ If the plain text and the cipher text are provided, the encryption key corresponding to the pait will be recovered.
+ This operation might take a bit of time depending on the key.
+ """
+ model = ModifiedAESIn(key=key, plain_text=plain_text, cipher_text=cipher_text)
+
+ if all(item is None for item in [model.key, model.plain_text, model.cipher_text]):
+ # ---- Generation of the plain text - cipher text pair
+ # Choose the key
+ model.key = '00000001000000100000000000000a01'
+ # Choose a random plain text
+ model.plain_text = format(random.getrandbits(128), 'x')
+ # Prepare plain text and key for encryption
+ p = [int(item, 16) for item in [model.plain_text[i * 2:i * 2 + 2] for i in range(len(model.plain_text))] if
+ item != '']
+ k = [int(item, 16) for item in [model.key[i * 2:i * 2 + 2] for i in range(len(model.key))] if item != '']
+ # Generate cipher text
+ aes = ModifiedAES()
+ c = aes.encrypt(key=k, plain_text=p)
+ ct = bytes(c).hex()
+ elif model.plain_text is not None and model.cipher_text is not None:
+ model.plain_text = plain_text
+ ct = cipher_text
+ p = [int(item, 16) for item in [model.plain_text[i * 2:i * 2 + 2] for i in range(len(model.plain_text))] if
+ item != '']
+ elif model.plain_text is not None and model.cipher_text is None and model.key is not None:
+ model.plain_text = plain_text
+ aes = ModifiedAES()
+ k = [int(item, 16) for item in [key[i * 2:i * 2 + 2] for i in range(len(key))] if item != '']
+ ct = bytes(aes.encrypt(key=k, plain_text=model.plain_text)).hex()
+ p = [int(item, 16) for item in [model.plain_text[i * 2:i * 2 + 2] for i in range(len(model.plain_text))] if
+ item != '']
+ else:
+ raise Exception("Parameters not provided correctly - if plain text is provided, ")
+
+ # ---- Run the attack
+ print(f"Run the attack with plain-text {p} and cipher-text {p}")
+ aes = ModifiedAES()
+ result = aes.attack(plain_text=model.plain_text, cipher_text=ct, verbose=verbose)
+ if model.key is not None:
+ # Check that the key is the one provided
+ assert result == int(model.key, 16)
+ k_hex = get_hex(result)
+ k = [int(item, 16) for item in [k_hex[i * 2:i * 2 + 2] for i in range(len(k_hex))] if item != '']
+ p = [int(item, 16) for item in [model.plain_text[i * 2:i * 2 + 2] for i in range(len(model.plain_text))] if
+ item != '']
+ c = aes.encrypt(key=k, plain_text=p)
+ assert bytes(c).hex() == ct
+ else:
+ # Re-encrypt the plain text to validate the key
+ aes = ModifiedAES()
+ k_hex = get_hex(result)
+ k = [int(item, 16) for item in [k_hex[i * 2:i * 2 + 2] for i in range(len(k_hex))] if item != '']
+ p = [int(item, 16) for item in [model.plain_text[i * 2:i * 2 + 2] for i in range(len(model.plain_text))] if
+ item != '']
+ c = bytes(aes.encrypt(key=k, plain_text=p)).hex()
+ assert c == ct
+ print(f"\nSuccess: key {result} recovered")
+
+
+@app.command("AES-double-encryption")
+def attack_double_encryption(
+ plain_text: Optional[str] = typer.Option(None, help="128bits plain text to encrypt"),
+ cipher_text: Optional[str] = typer.Option(None, help="128bits encryption of the plain_text"),
+):
+ """
+ Example on how to use the double encryption attack on AES.\n
+ If no arguments are provided, the two keys and a plain text will be generated to create the
+ (plain text, cipher text) pair. Then the key will be recovered
+ If the plain text and the cipher text are provided, they will be used as (plain text, cipher text) par.
+ IMPORTANT NOTICE: Make sure that the pair you provide are generated with faisable keys: this attack is not
+ faisable with general keys, it still has a complexitx of 2^128.
+ In this attack it is assumed that the keys are made of 24bits unknown bits followed by all zero bits.
+ """
+
+ if plain_text is None and cipher_text is None:
+ # ---- Generate a plain text - cipher text pair
+ # Suppose that the keys is made of 24bits unknown bits followed by all zero bits
+
+ # Generate keys
+ k1 = prepare_key(random.getrandbits(24))
+ k2 = prepare_key(random.getrandbits(24))
+
+ # Generate random plain text
+ pt = format(random.getrandbits(128), 'x')
+
+ cipher1 = AES.new(k1.ascii_hex, AES.MODE_ECB)
+ c1 = cipher1.encrypt(bytes.fromhex(pt))
+ cipher2 = AES.new(k2.ascii_hex, AES.MODE_ECB)
+ c2 = cipher2.encrypt(c1)
+ ct = c2.hex()
+ print(f"Key k1: {k1.hex}")
+ print(f"Key k2: {k2.hex}")
+ print("The attack will find back these keys")
+ else:
+ ct = cipher_text
+ pt = plain_text
+
+ print(f'known plain text: {pt}')
+ print(f'corresponding cipher text: {ct}')
+
+ print("\nStating the attack")
+ print("It might take a bit, but don't worry we'll find it")
+ ks = DoubleAESAttack.attack(plain_text=pt, cipher_text=ct, max_key=24)
+ if ks:
+ print("\nKeys found:")
+ print(f"\tk1: 0x{ks[0].hex}")
+ print(f"\tk2: 0x{ks[1].hex}")
+
+
+@app.command("correlation-power-analysis")
+def attack_correlation_power_analysis(
+ filename: str = typer.Argument(str, help="Filename of the pickle file with the measurements"),
+ max_datapoints: Optional[int] = typer.Option(400, help="Maximum number of data points to consider"),
+ byte_position: Optional[int] = typer.Option(None, help="Byte position to attack"),
+):
+ """
+ Example on how to use the power correlation attack.\n
+ The filename of the measurement file is required. This file mush be a valid pickle file with at leas 'max_datapoints'
+ datapoints\n
+ If a byte position is provided, only the provided key byte will be attacked, otherwise the whole key will be.
+ """
+
+ if not os.path.exists(filename):
+ msg = f"File {filename} does not exist"
+ print(msg)
+ raise Exception(f"File {msg}")
+
+ # Run the correlation attack on the provided byte position
+ attack = PowerAnalysisAttack(data_filename=filename, max_datapoints=max_datapoints)
+ if byte_position is not None:
+ key_byte = attack.attack_byte(byte_position=byte_position, plot=False,
+ store=False,
+ re_calculate=True)
+ print(f"Key byte found: {hex(key_byte[1])[2:]}")
+ return
+
+ # Run the full correlation attack
+ args_to_processes = tuple(
+ [[i, False, False, True] for i
+ in range(16)])
+
+ print("Starting the multiprocessing attack")
+ ti = time.time()
+ with Pool() as pool:
+ results = pool.starmap(attack.attack_byte, args_to_processes)
+ tf = time.time()
+ print(
+ f"\nAll processes finished. Final output: {results}. Execution time: {tf - ti} seconds -"
+ f" {(tf - ti) / 60} minutes")
+ print(f"Constructing the final key from the output")
+ out = [(pos, hex(item)[2:]) for (pos, item) in results]
+ sorted_list = sorted(out, key=lambda x: x[0])
+ key_list = [item[1] for item in sorted_list][::-1]
+ key = ''.join(key_list)
+ print(f"\nKey Found")
+ print(key)
diff --git a/src/crypto_pkg/clis/cli.py b/src/crypto_pkg/clis/cli.py
new file mode 100644
index 0000000..3e5edaf
--- /dev/null
+++ b/src/crypto_pkg/clis/cli.py
@@ -0,0 +1,6 @@
+import typer
+
+from crypto_pkg.clis.attacks import app as attacks
+
+app = typer.Typer(pretty_exceptions_show_locals=False, no_args_is_help=True)
+app.add_typer(attacks, name='attacks')
diff --git a/src/crypto_pkg/contracts/cli_dto.py b/src/crypto_pkg/contracts/cli_dto.py
new file mode 100644
index 0000000..7519200
--- /dev/null
+++ b/src/crypto_pkg/contracts/cli_dto.py
@@ -0,0 +1,16 @@
+from typing import Optional
+
+import pydantic
+from pydantic import root_validator
+
+
+class ModifiedAESIn(pydantic.BaseModel):
+ key: Optional[str]
+ plain_text: Optional[str]
+ cipher_text: Optional[str]
+
+ @root_validator
+ def check_values(cls, values):
+ if values.get("plain_text") is not None and values.get("cipher_text") is None and values.get("key") is None:
+ raise Exception("If plain_text is provided, and cipher_text not, the encryption key must be provided")
+ return values
diff --git a/src/crypto_pkg/entry_point.py b/src/crypto_pkg/entry_point.py
new file mode 100644
index 0000000..e49f49d
--- /dev/null
+++ b/src/crypto_pkg/entry_point.py
@@ -0,0 +1,6 @@
+from crypto_pkg.clis import cli
+
+
+def main():
+ cli.app(prog_name='crypto')
+ cli.app.add_typer()