Skip to content

Commit

Permalink
add logger
Browse files Browse the repository at this point in the history
  • Loading branch information
programmingAthlete committed Dec 24, 2023
1 parent 53649ae commit 186d9b5
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 46 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup, find_packages

__version__ = "1.4.10"
__version__ = "1.4.11"

with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
Expand Down
16 changes: 13 additions & 3 deletions src/crypto_pkg/attacks/block_ciphers/double_encryption.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import random
import sys
from typing import Dict, Tuple, Union

from Crypto.Cipher import AES

from crypto_pkg.attacks.block_ciphers.utils import Text, prepare_key
from crypto_pkg.utils.logging import get_logger, set_level

log = get_logger(__name__)


class DoubleAESAttack:
Expand All @@ -23,25 +25,33 @@ def decrypt(key: Text, cipher_text: Text) -> Text:

@classmethod
def lookup_table_computation(cls, plain_text: str, max_key: int = 24) -> Dict[int, int]:
log.info(f"Compute lookup table for plaintext {plain_text}")
p = Text(text=bytes.fromhex(plain_text))
log.debug("Starting lookup table computation")
out = {cls.encrypt(key=prepare_key(key=i, max_key=max_key), plain_text=p).integer: i for i in
range(1, 2 ** max_key)}
log.debug("Lookup table computation completed")
return out

@classmethod
def search_match(cls, cipher_text: str, lookup_table: Dict[int, int]) -> Union[Tuple[Text, Text], None]:
log.info(f"Search match for ciphertext {cipher_text}")
c = Text(text=bytes.fromhex(cipher_text))

for i in range(2 ** 24):
key = prepare_key(i)
m = cls.decrypt(key=key, cipher_text=c)
if lookup_table.get(m.integer):
log.debug(f"Match found for key {key}")
return prepare_key(lookup_table.get(m.integer)), key

@classmethod
def attack(cls, plain_text: str, cipher_text: str, max_key: int = 24):
@set_level(logger=log)
def attack(cls, plain_text: str, cipher_text: str, max_key: int = 24, _verbose: bool = False):
log.info(f"Constructing encryption lookup table for plain text {plain_text} ans maximum key size {max_key}")
look_up_table = cls.lookup_table_computation(plain_text=plain_text, max_key=max_key)
log.info("Search encryption match in lookup table")
keys = cls.search_match(cipher_text=cipher_text, lookup_table=look_up_table)
log.debug(f"Key Found")
return keys


Expand Down
21 changes: 9 additions & 12 deletions src/crypto_pkg/attacks/block_ciphers/modified_aes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@

from crypto_pkg.attacks.block_ciphers.utils import prepare_key
from crypto_pkg.ciphers.symmetric.aes import CustomAES, array_to_matrix, get_array_from_state
from crypto_pkg.utils.logging import set_level, get_logger

logging.basicConfig(level=logging.CRITICAL)

_log = getLogger(__name__)
_log.setLevel(logging.CRITICAL)
log = get_logger(__name__)


class ModifiedAES(CustomAES):
Expand Down Expand Up @@ -47,12 +45,11 @@ def attack_section(self, plain_text, cipher_block_ref, init_pos, section_n=0):
c = self.encrypt(key=k_block, plain_text=plain_text)
c_by_block = [c[i * 4:i * 4 + 4] for i in range(len(c))]
if c_by_block[section_n] == cipher_block_ref[section_n]:
_log.info(f"key guess for block {section_n}: {key.hex}")
log.info(f"key guess for block {section_n}: {key.hex}")
return key

def attack(self, plain_text: str, cipher_text: str, verbose: bool = False):
if verbose:
_log.setLevel(logging.DEBUG)
@set_level(logger=log)
def attack(self, plain_text: str, cipher_text: str, _verbose: bool = False):
p_int_list = [int(item, 16) for item in [plain_text[i * 2:i * 2 + 2] for i in range(len(plain_text))] if
item != '']
c_int_list = [int(item, 16) for item in [cipher_text[i * 2:i * 2 + 2] for i in range(len(cipher_text))] if
Expand All @@ -66,13 +63,13 @@ def attack(self, plain_text: str, cipher_text: str, verbose: bool = False):
[p_int_list, c_by_block_ref, 128, 3]
)

_log.debug("Run attack on sub-blocks in parallel")
log.debug("Run attack on sub-blocks in parallel")
with Pool() as pool:
res = pool.starmap(self.attack_section, args)
r = [int(item.hex, 16) for item in res]
_log.debug(f"Parallel execution terminated with keys guesses {r}")
log.debug(f"Parallel execution terminated with keys guesses {r}")
out = r[0] ^ r[1] ^ r[2] ^ r[3]
_log.info(f"128bits key guess: {out}")
log.info(f"128bits key guess: {out}")
return out


Expand All @@ -93,7 +90,7 @@ def attack(self, plain_text: str, cipher_text: str, verbose: bool = False):
ct = bytes(c).hex()

# ---- Run the attack
_log.debug(f"Run the attack with plain-text {p} and cipher-text {p}")
log.debug(f"Run the attack with plain-text {p} and cipher-text {p}")
aes = ModifiedAES()
result = aes.attack(plain_text=pt, cipher_text=ct, verbose=True)
assert result == int(to_find_key, 16)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ def compute_c(self, predicted_currents: np.ndarray, byte_position: int, save: bo
np.save(f"matrices/matrix_{byte_position}.npy", np.array(c))
return c

@set_level(logger=log)
def attack_byte(self, byte_position: int = 0, plot: bool = False,
store: bool = True, re_calculate: bool = False) -> Tuple[int, np.ndarray]:
store: bool = True, re_calculate: bool = False, _verbose: bool = False) -> Tuple[int, np.ndarray]:
"""
Correlation attack of one byte
Expand All @@ -154,6 +155,7 @@ def attack_byte(self, byte_position: int = 0, plot: bool = False,
plot: show the correlation plot 'byte_position' or not - default = False
store: save the correlation matrices for the byte 'byte_position' or not - default = True
re_calculate: re-calculate the correlation matrix for the byte 'byte_position' even it has been stored
_verbose:
Returns:
Tuple(byte_position, key byte)
"""
Expand All @@ -180,7 +182,7 @@ def attack_byte(self, byte_position: int = 0, plot: bool = False,

@set_level(logger=log)
def attack_full_key(self, show_plot_correlations: bool = False, store_correlation_matrices: bool = False,
re_calculate_correlation_matrices: bool = True, verbose: bool = False):
re_calculate_correlation_matrices: bool = True, _verbose: bool = False):
cores = multiprocessing.cpu_count()
log.info(f"Number of cores: {cores}. The program wil run in chunks of {cores} byte positions\n")

Expand Down
35 changes: 22 additions & 13 deletions src/crypto_pkg/attacks/stream_ciphers/geffe_cipher.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from decimal import Decimal
from enum import Enum
from typing import List, Tuple, Union
from typing import List, Tuple, Union, Dict

from crypto_pkg.ciphers.symmetric.geffe import Geffe
from crypto_pkg.utils.logging import get_logger, set_level

log = get_logger()


def int_2_base_2(k, n):
Expand Down Expand Up @@ -52,7 +55,7 @@ def __init__(self, all_taps: List[List[int]], n: int, f: List[int], stream_ref:
self.max_clock = max_clock

@staticmethod
def check_match(a1, a2):
def check_match(a1, a2) -> Decimal:
match = [1 for i in range(len(a1)) if a1[i] == a2[i]]
return Decimal(sum(match)) / Decimal(len(a1))

Expand Down Expand Up @@ -92,7 +95,7 @@ def look_for_correlation(self, thresholds: Union[List[Tuple[ThresholdsOperator,

return key0, key2

def find_k1(self, key0, key2):
def find_k1(self, key0, key2) -> Dict[str, list]:
g = Geffe(self.n, self.all_taps, self.f)
all_keys = [[k0_item, k1, k2_item] for k0_item in key0 for k1 in range(self.max_iter) for k2_item in
key2]
Expand All @@ -101,19 +104,25 @@ def find_k1(self, key0, key2):
success = [item[1] for item in res if item[0] is True]
if success:
result = success[0]
return f"\nSuccess\nThe key is (k0,k1,k2) = {int_2_base_2(result[0], self.n)}," \
f"{int_2_base_2(result[1], self.n)}," \
f"{int_2_base_2(result[2], self.n)}"
return {"k0": int_2_base_2(result[0], self.n),
"k1": int_2_base_2(result[1], self.n),
"k2": int_2_base_2(result[2], self.n)
}
else:
return "\nFail"
raise Exception("Attack Failed")

def attack(self, thresholds):
@set_level(log)
def attack(self, thresholds, _verbose: bool = False):
log.info("Search for possible seeds")
k0, k2 = self.look_for_correlation(thresholds=thresholds)
print("Possible choices")
print(f"\tk_0 = {k0} = {[int_2_base_2(item, 16) for item in k0]}")
print(f"\tk_2 = {k2} = {[int_2_base_2(item, 16) for item in k2]}")
log.info("Possible choices for seeds of LFSR 1 and 3")
msg = f"Possible choices\n\tk_0 = {k0} = {[int_2_base_2(item, 16) for item in k0]}\n" \
f"\tk_2 = {k2} = {[int_2_base_2(item, 16) for item in k2]}"
log.debug(msg)
log.info("Find seed for LFSR 2")
out = self.find_k1(key0=k0, key2=k2)
print(out)
msg = f"\nSuccess\nThe key is (k0,k1,k2)\n\t = {out['k0']},{out['k1']},{out['k2']}"
log.info(f"{msg}")
return out


Expand All @@ -132,4 +141,4 @@ def attack(self, thresholds):
tsh = [(ThresholdsOperator.MAX, Decimal('0.5') - epsilon_0), None,
(ThresholdsOperator.MIN, Decimal('0.5') + epsilon_1)]

attack.attack(tsh)
print(attack.attack(tsh))
24 changes: 11 additions & 13 deletions src/crypto_pkg/clis/attacks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import multiprocessing
import os
import pickle
import time
from decimal import Decimal
import random
from multiprocessing import Pool
from typing import Optional

import typer
Expand All @@ -17,7 +13,6 @@
from crypto_pkg.attacks.stream_ciphers.geffe_cipher import Attack as GeffeAttack, ThresholdsOperator
from crypto_pkg.contracts.cli_dto import ModifiedAESIn
from importlib import resources
import io

app = typer.Typer(pretty_exceptions_show_locals=False, no_args_is_help=True)

Expand All @@ -27,7 +22,9 @@ def get_hex(x):


@app.command('geffe')
def attack_geffe():
def attack_geffe(
verbose: bool = typer.Option(False, help="Show debug logs")
):
"""
Example on how to use the attack on a Geffe stream cipher.\n
The function doesn't tae any argument, the stream of the cipher is hardcoded together with the LSFRs
Expand All @@ -37,15 +34,14 @@ def attack_geffe():
stream = '01001110000011101100011101010111011100000011010001111001101101100000000111110110111011011001010111101100111001111100001111100101110000000010110101001111110110010001111101010110011010010110101011000101'
# Geffe tabs
taps = [[0, 1, 4, 7], [0, 1, 7, 11], [0, 2, 3, 5]]
stream_l = [int(item) for item in stream]
attack = GeffeAttack(all_taps=taps, stream_ref=stream, f=[1, 1, 0, 1, 0, 0, 0, 1], max_clock=200, n=16)

epsilon_0 = Decimal('0.25')
epsilon_1 = Decimal('0.25')
tsh = [(ThresholdsOperator.MAX, Decimal('0.5') - epsilon_0), None,
(ThresholdsOperator.MIN, Decimal('0.5') + epsilon_1)]

attack.attack(tsh)
attack.attack(thresholds=tsh, _verbose=verbose)


@app.command("modifiedAES")
Expand Down Expand Up @@ -99,7 +95,7 @@ def attack_modifier_aes(
# ---- Run the attack
print(f"Run the attack with plain-text {p} and cipher-text {p}")
aes = ModifiedAES()
result = aes.attack(plain_text=model.plain_text, cipher_text=ct, verbose=verbose)
result = aes.attack(plain_text=model.plain_text, cipher_text=ct, _verbose=verbose)
if model.key is not None:
# Check that the key is the one provided
assert result == int(model.key, 16)
Expand All @@ -125,6 +121,7 @@ def attack_modifier_aes(
def attack_double_encryption(
plain_text: Optional[str] = typer.Option(None, help="128bits plain text to encrypt"),
cipher_text: Optional[str] = typer.Option(None, help="128bits encryption of the plain_text"),
verbose: Optional[bool] = typer.Option(False, help="Show debug logs"),
):
"""
Example on how to use the double encryption attack on AES.\n
Expand Down Expand Up @@ -164,7 +161,7 @@ def attack_double_encryption(

print("\nStating the attack")
print("It might take a bit, but don't worry we'll find it")
ks = DoubleAESAttack.attack(plain_text=pt, cipher_text=ct, max_key=24)
ks = DoubleAESAttack.attack(plain_text=pt, cipher_text=ct, max_key=24, _verbose=verbose)
if ks:
print("\nKeys found:")
print(f"\tk1: 0x{ks[0].hex}")
Expand All @@ -176,7 +173,8 @@ def attack_correlation_power_analysis(
filename: str = typer.Argument('test_file.pickle',
help="Filename of the pickle file with the measurements"),
max_datapoints: Optional[int] = typer.Option(400, help="Maximum number of data points to consider"),
byte_position: Optional[int] = typer.Option(None, help="Byte position to attack")
byte_position: Optional[int] = typer.Option(None, help="Byte position to attack"),
verbose: Optional[bool] = typer.Option(None, help="Show debug logs")
):
"""
Example on how to use the power correlation attack.\n
Expand All @@ -199,11 +197,11 @@ def attack_correlation_power_analysis(
if byte_position is not None:
key_byte = attack.attack_byte(byte_position=byte_position, plot=False,
store=False,
re_calculate=True)
re_calculate=True, _verbose=verbose)
print(f"Key byte found: {hex(key_byte[1])[2:]}")
return
key = attack.attack_full_key(store_correlation_matrices=False, re_calculate_correlation_matrices=False,
show_plot_correlations=False, verbose=True)
show_plot_correlations=False, _verbose=verbose)
print("Key Found")
print(key)
os.remove(filename)
8 changes: 6 additions & 2 deletions src/crypto_pkg/utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ def get_logger(location: str = __name__):

def set_level(logger):
def deco(func):
def wrapper( *args, **kwargs):
if kwargs.get('verbose') and kwargs.get('verbose') is True:
def wrapper(*args, **kwargs):
_verbose = kwargs.get('_verbose', None)
if _verbose is None and len(args) == 5:
_verbose = args[4]
if _verbose is True:
logger.setLevel(logging.DEBUG)
return func(*args, **kwargs)

return wrapper

return deco
Binary file added test_file.pickle
Binary file not shown.

0 comments on commit 186d9b5

Please sign in to comment.