Skip to content

Commit

Permalink
Cli (#18)
Browse files Browse the repository at this point in the history
* CLI

* version
  • Loading branch information
programmingAthlete authored Dec 15, 2023
1 parent 28d2cd9 commit 3a2cf56
Show file tree
Hide file tree
Showing 12 changed files with 349 additions and 11 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,16 @@ Usage examples are provided in the attacks source code files
<li>attacks/power_analysis/correlation_power_analysis.py</li>
</ul>

### From CLI
<code>pip install -e .</code>

<code>crypto attacks modifiedAES --help</code>

<code>crypto attacks geffe --help</code>

<code>crypto attacks AES-double-encryption --help</code>

<code>crypto attacks correlation-power-analysis --help</code>

## Usage
The <i>Textbook RSA</i> and the <i>DGVH</i> PKEs are used in the [BruteSniffing_Fisher](https://github.com/programmingAthlete/BruteSniffing_Fisher) repository.
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pycryptodome==3.19.0
pycryptodome==3.19.0
typer[all]
pydantic<2
6 changes: 6 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[metadata]
name = crypto_pkg

[options.entry_points]
console_scripts =
crypto = crypto_pkg.entry_point:main
7 changes: 5 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup, find_packages

__version__ = "1.3.1"
__version__ = "1.4.0"

with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
Expand All @@ -19,5 +19,8 @@
packages=find_packages(where="src", exclude=["*tests*"]),
install_requires=requirements,
author="programmingAthlete",
zip_safe=True
zip_safe=True,
console_scripts={
"crypto":"crypto.entry_point:main"
}
)
5 changes: 4 additions & 1 deletion src/crypto_pkg/attacks/block_ciphers/double_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ def attack(cls, plain_text: str, cipher_text: str, max_key: int = 24):

if __name__ == '__main__':

''' Example '''
''' Example\n
IMPORTANT NOTICE: faisable with general keys, it still has a complexitx of 2^128.
In this attack it is assumed that the keys are made of 24bits unknown bits followed by all zero bits.
'''

# ---- Generate a plain text - cipher text pair
# Generate keys
Expand Down
4 changes: 2 additions & 2 deletions src/crypto_pkg/attacks/block_ciphers/modified_aes.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def attack(self, plain_text: str, cipher_text: str, verbose: bool = False):
c_int_list = [int(item, 16) for item in [cipher_text[i * 2:i * 2 + 2] for i in range(len(cipher_text))] if
item != '']

c_by_block_ref = [c_int_list[i * 4:i * 4 + 4] for i in range(len(c))]
c_by_block_ref = [c_int_list[i * 4:i * 4 + 4] for i in range(len(c_int_list))]
args = (
[p_int_list, c_by_block_ref, 32, 0],
[p_int_list, c_by_block_ref, 64, 1],
Expand All @@ -69,8 +69,8 @@ def attack(self, plain_text: str, cipher_text: str, verbose: bool = False):
_log.debug("Run attack on sub-blocks in parallel")
with Pool() as pool:
res = pool.starmap(self.attack_section, args)
_log.debug(f"Parallel execution terminated with keys guesses {res}")
r = [int(item.hex, 16) for item in res]
_log.debug(f"Parallel execution terminated with keys guesses {r}")
out = r[0] ^ r[1] ^ r[2] ^ r[3]
_log.info(f"128bits key guess: {out}")
return out
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import argparse
import logging
import multiprocessing
import os
import time
from multiprocessing import Pool
Expand Down Expand Up @@ -158,13 +160,43 @@ def attack_byte(self, byte_position: int = 0, plot: bool = False,
return byte_position, np.unravel_index(np.argmax(c), c.shape)[0]


if __name__ == '__main__':
filename = "test_file_name.pickle"
# Run the full correlation attack
attack = Attack(data_filename=filename, max_datapoints=400)
def full_attack(arguments):
log.debug("Checking the existence of 'matrices' and 'plot' sub-directories")
must_have_dirs = ["matrices", "plots"]
if not os.path.exists(arguments.filename):
log.error(f"File {arguments.filename} does not exist")
raise Exception(f"File {arguments.filename} does not exist")
for item in must_have_dirs:
if not os.path.exists(item):
log.warning(f"Directory {item} not found -> creating it")
os.makedirs(item)
else:
log.debug(f"Directory {item} found -> all good")

print("\nArguments provided")
args_attr = arguments.__dict__
for arg in args_attr:
print(f"\t{arg} -> {args_attr[arg]}")
print("\n")

attack = Attack(data_filename=arguments.filename, max_datapoints=arguments.max_datapoint)
if args.byte_position is not None:
key_byte = attack.attack_byte(byte_position=arguments.byte_position, plot=arguments.show_plot_correlations,
store=arguments.store_correlation_matrices,
re_calculate=arguments.re_calculate_correlation_matrices)
print(f"Key byte found: {hex(key_byte[1])[2:]}")
return

cores = multiprocessing.cpu_count()
log.info(f"Number of cores: {cores}. The program wil run in chunks of {cores} byte positions")
print()

args_to_processes = tuple(
[[i, False, False, True] for i
[[i, arguments.show_plot_correlations, arguments.store_correlation_matrices,
arguments.re_calculate_correlation_matrices] for i
in range(16)])
log.debug(f"Arguments to the process {args_to_processes}")
print()

log.info("Starting the multiprocessing attack")
ti = time.time()
Expand All @@ -182,3 +214,40 @@ def attack_byte(self, byte_position: int = 0, plot: bool = False,
key = ''.join(key_list)
print(f"\nKey Found")
print(key)
return


if __name__ == '__main__':
parser = argparse.ArgumentParser(
prog='CorrelationPowerAttack',
description='Find encryption key via correlation power analysis attack')
parser.add_argument('-f', '--filename', help="Filename of teh pickle file from where to read the traces", type=str)
parser.add_argument('-s', '--store_correlation_matrices', help='Store the correlation matrices in a .npy file to '
'avoid having to re-ccalculated it at each'
' execution',
action='store_true')
parser.add_argument('--no-store_correlation_matrices', dest='store_correlation_matrices', action='store_false')
parser.add_argument('-r', '--re_calculate_correlation_matrices', help='Recalculate the correlation matrix',
action='store_true')
parser.add_argument('--no-re_calculate_correlation_matrices', dest='re_calculate_correlation_matrices',
action='store_false')
parser.add_argument('-p', '--show_plot_correlations', help='Show correlation plots - default=False',
action='store_true')
parser.add_argument('--no-show_plot_correlations', dest='show_plot_correlations',
action='store_false')
parser.add_argument('-v', '--verbose', help='Show debug logs', action='store_true')
parser.add_argument('--no-verbose', help='Show debug logs', action='store_false', dest='verbose')
parser.add_argument('-l', '--max_datapoint', help='Maximum number of data points to consider - default=3000',
type=int)
parser.add_argument('-b', '--byte_position', type=int,
help='Provide the byte position that you want to attack. If this '
'argument is provided, the program will only runt he attack for'
' the provided byte position')

parser.set_defaults(store_correlation_matrices=False, re_calculate_correlation_matrices=False,
show_plot_correlations=False, filename="group4.pickle", verbose=False, max_datapoint=4000,
byte_position=None)
args = parser.parse_args()
if args.verbose:
log.setLevel(logging.DEBUG)
full_attack(arguments=args)
Empty file added src/crypto_pkg/clis/__init__.py
Empty file.
216 changes: 216 additions & 0 deletions src/crypto_pkg/clis/attacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import os
import time
from decimal import Decimal
import random
from multiprocessing import Pool
from typing import Optional

import typer
from Crypto.Cipher import AES

from crypto_pkg.attacks.block_ciphers.double_encryption import DoubleAESAttack
from crypto_pkg.attacks.block_ciphers.modified_aes import ModifiedAES
from crypto_pkg.attacks.block_ciphers.utils import prepare_key
from crypto_pkg.attacks.power_analysis.correlation_power_analysis import Attack as PowerAnalysisAttack
from crypto_pkg.attacks.stream_ciphers.geffe_cipher import Attack as GeffeAttack, ThresholdsOperator
from crypto_pkg.contracts.cli_dto import ModifiedAESIn

app = typer.Typer(pretty_exceptions_show_locals=False, no_args_is_help=True)


def get_hex(x):
return '{:02x}'.format(x).zfill(32)


@app.command('geffe')
def attack_geffe():
"""
Example on how to use the attack on a Geffe stream cipher.\n
The function doesn't tae any argument, the stream of the cipher is hardcoded together with the LSFRs
settings
"""
# Choose Geffe output
stream = '01001110000011101100011101010111011100000011010001111001101101100000000111110110111011011001010111101100111001111100001111100101110000000010110101001111110110010001111101010110011010010110101011000101'
# Geffe tabs
taps = [[0, 1, 4, 7], [0, 1, 7, 11], [0, 2, 3, 5]]
stream_l = [int(item) for item in stream]
attack = GeffeAttack(all_taps=taps, stream_ref=stream, f=[1, 1, 0, 1, 0, 0, 0, 1], max_clock=200, n=16)

epsilon_0 = Decimal('0.25')
epsilon_1 = Decimal('0.25')
tsh = [(ThresholdsOperator.MAX, Decimal('0.5') - epsilon_0), None,
(ThresholdsOperator.MIN, Decimal('0.5') + epsilon_1)]

attack.attack(tsh)


@app.command("modifiedAES")
def attack_modifier_aes(
plain_text: Optional[str] = typer.Option(None, help="128bits plain text to encrypt"),
cipher_text: Optional[str] = typer.Option(None, help="128bits encryption of the plain_text"),
key: Optional[str] = typer.Option(None, help="Encryption 128bits key to Find"),
verbose: bool = typer.Option(False, help="Show debug logs")
):
"""
Example on how to use the attack on the modified AES.\n
If no arguments are provided, an hardcoded key will be used is such a way that the execution is not long, a plain
text will be generated at random and the corresponding cipher text will be generated using the hardcoded key.
After this the attack will recover the key from the (plain text, cipher text) pair.\n
If the plain_text is provided, the key hardcoded key will be used to generate the (plain text, cipher text pair)
and then recovered.\n
If the plain text and the cipher text are provided, the encryption key corresponding to the pait will be recovered.
This operation might take a bit of time depending on the key.
"""
model = ModifiedAESIn(key=key, plain_text=plain_text, cipher_text=cipher_text)

if all(item is None for item in [model.key, model.plain_text, model.cipher_text]):
# ---- Generation of the plain text - cipher text pair
# Choose the key
model.key = '00000001000000100000000000000a01'
# Choose a random plain text
model.plain_text = format(random.getrandbits(128), 'x')
# Prepare plain text and key for encryption
p = [int(item, 16) for item in [model.plain_text[i * 2:i * 2 + 2] for i in range(len(model.plain_text))] if
item != '']
k = [int(item, 16) for item in [model.key[i * 2:i * 2 + 2] for i in range(len(model.key))] if item != '']
# Generate cipher text
aes = ModifiedAES()
c = aes.encrypt(key=k, plain_text=p)
ct = bytes(c).hex()
elif model.plain_text is not None and model.cipher_text is not None:
model.plain_text = plain_text
ct = cipher_text
p = [int(item, 16) for item in [model.plain_text[i * 2:i * 2 + 2] for i in range(len(model.plain_text))] if
item != '']
elif model.plain_text is not None and model.cipher_text is None and model.key is not None:
model.plain_text = plain_text
aes = ModifiedAES()
k = [int(item, 16) for item in [key[i * 2:i * 2 + 2] for i in range(len(key))] if item != '']
ct = bytes(aes.encrypt(key=k, plain_text=model.plain_text)).hex()
p = [int(item, 16) for item in [model.plain_text[i * 2:i * 2 + 2] for i in range(len(model.plain_text))] if
item != '']
else:
raise Exception("Parameters not provided correctly - if plain text is provided, ")

# ---- Run the attack
print(f"Run the attack with plain-text {p} and cipher-text {p}")
aes = ModifiedAES()
result = aes.attack(plain_text=model.plain_text, cipher_text=ct, verbose=verbose)
if model.key is not None:
# Check that the key is the one provided
assert result == int(model.key, 16)
k_hex = get_hex(result)
k = [int(item, 16) for item in [k_hex[i * 2:i * 2 + 2] for i in range(len(k_hex))] if item != '']
p = [int(item, 16) for item in [model.plain_text[i * 2:i * 2 + 2] for i in range(len(model.plain_text))] if
item != '']
c = aes.encrypt(key=k, plain_text=p)
assert bytes(c).hex() == ct
else:
# Re-encrypt the plain text to validate the key
aes = ModifiedAES()
k_hex = get_hex(result)
k = [int(item, 16) for item in [k_hex[i * 2:i * 2 + 2] for i in range(len(k_hex))] if item != '']
p = [int(item, 16) for item in [model.plain_text[i * 2:i * 2 + 2] for i in range(len(model.plain_text))] if
item != '']
c = bytes(aes.encrypt(key=k, plain_text=p)).hex()
assert c == ct
print(f"\nSuccess: key {result} recovered")


@app.command("AES-double-encryption")
def attack_double_encryption(
plain_text: Optional[str] = typer.Option(None, help="128bits plain text to encrypt"),
cipher_text: Optional[str] = typer.Option(None, help="128bits encryption of the plain_text"),
):
"""
Example on how to use the double encryption attack on AES.\n
If no arguments are provided, the two keys and a plain text will be generated to create the
(plain text, cipher text) pair. Then the key will be recovered
If the plain text and the cipher text are provided, they will be used as (plain text, cipher text) par.
IMPORTANT NOTICE: Make sure that the pair you provide are generated with faisable keys: this attack is not
faisable with general keys, it still has a complexitx of 2^128.
In this attack it is assumed that the keys are made of 24bits unknown bits followed by all zero bits.
"""

if plain_text is None and cipher_text is None:
# ---- Generate a plain text - cipher text pair
# Suppose that the keys is made of 24bits unknown bits followed by all zero bits

# Generate keys
k1 = prepare_key(random.getrandbits(24))
k2 = prepare_key(random.getrandbits(24))

# Generate random plain text
pt = format(random.getrandbits(128), 'x')

cipher1 = AES.new(k1.ascii_hex, AES.MODE_ECB)
c1 = cipher1.encrypt(bytes.fromhex(pt))
cipher2 = AES.new(k2.ascii_hex, AES.MODE_ECB)
c2 = cipher2.encrypt(c1)
ct = c2.hex()
print(f"Key k1: {k1.hex}")
print(f"Key k2: {k2.hex}")
print("The attack will find back these keys")
else:
ct = cipher_text
pt = plain_text

print(f'known plain text: {pt}')
print(f'corresponding cipher text: {ct}')

print("\nStating the attack")
print("It might take a bit, but don't worry we'll find it")
ks = DoubleAESAttack.attack(plain_text=pt, cipher_text=ct, max_key=24)
if ks:
print("\nKeys found:")
print(f"\tk1: 0x{ks[0].hex}")
print(f"\tk2: 0x{ks[1].hex}")


@app.command("correlation-power-analysis")
def attack_correlation_power_analysis(
filename: str = typer.Argument(str, help="Filename of the pickle file with the measurements"),
max_datapoints: Optional[int] = typer.Option(400, help="Maximum number of data points to consider"),
byte_position: Optional[int] = typer.Option(None, help="Byte position to attack"),
):
"""
Example on how to use the power correlation attack.\n
The filename of the measurement file is required. This file mush be a valid pickle file with at leas 'max_datapoints'
datapoints\n
If a byte position is provided, only the provided key byte will be attacked, otherwise the whole key will be.
"""

if not os.path.exists(filename):
msg = f"File {filename} does not exist"
print(msg)
raise Exception(f"File {msg}")

# Run the correlation attack on the provided byte position
attack = PowerAnalysisAttack(data_filename=filename, max_datapoints=max_datapoints)
if byte_position is not None:
key_byte = attack.attack_byte(byte_position=byte_position, plot=False,
store=False,
re_calculate=True)
print(f"Key byte found: {hex(key_byte[1])[2:]}")
return

# Run the full correlation attack
args_to_processes = tuple(
[[i, False, False, True] for i
in range(16)])

print("Starting the multiprocessing attack")
ti = time.time()
with Pool() as pool:
results = pool.starmap(attack.attack_byte, args_to_processes)
tf = time.time()
print(
f"\nAll processes finished. Final output: {results}. Execution time: {tf - ti} seconds -"
f" {(tf - ti) / 60} minutes")
print(f"Constructing the final key from the output")
out = [(pos, hex(item)[2:]) for (pos, item) in results]
sorted_list = sorted(out, key=lambda x: x[0])
key_list = [item[1] for item in sorted_list][::-1]
key = ''.join(key_list)
print(f"\nKey Found")
print(key)
Loading

0 comments on commit 3a2cf56

Please sign in to comment.