Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve examples as per YEN Point's feedback #4

Merged
merged 6 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bsv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from .transaction import Transaction, InsufficientFunds
from .transaction_input import TransactionInput
from .transaction_output import TransactionOutput
from .encrypted_message import *
from .signed_message import *


__version__ = '0.4.0'
__version__ = '0.5.1'
99 changes: 43 additions & 56 deletions bsv/encrypted_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,65 +5,52 @@
from .keys import PrivateKey, PublicKey
from .utils import randbytes


def aes_gcm_encrypt(key: bytes, message: bytes) -> bytes:
iv = randbytes(32)
encrypted, auth_tag = AES.new(key, AES.MODE_GCM, iv).encrypt_and_digest(message)
return iv + encrypted + auth_tag


def aes_gcm_decrypt(key: bytes, message: bytes) -> bytes:
iv, encrypted, auth_tag = message[:32], message[32:-16], message[-16:]
return AES.new(key, AES.MODE_GCM, iv).decrypt_and_verify(encrypted, auth_tag)


VERSION = bytes.fromhex('42421033')


def encrypt(message: bytes, sender: PrivateKey, recipient: PublicKey) -> bytes:
"""
Encrypts a message from one party to another using the BRC-78 message encryption protocol.
:param message: The message to encrypt
:param sender: The private key of the sender
:param recipient: The public key of the recipient
:return: The encrypted message
"""
key_id = randbytes(32)
key_id_base64 = b64encode(key_id).decode('ascii')
invoice_number = f'2-message encryption-{key_id_base64}'
sender_child = sender.derive_child(recipient, invoice_number)
recipient_child = recipient.derive_child(sender, invoice_number)
shared_secret = sender_child.derive_shared_secret(recipient_child)
symmetric_key = shared_secret[1:]
encrypted = aes_gcm_encrypt(symmetric_key, message)
return VERSION + sender.public_key().serialize() + recipient.serialize() + key_id + encrypted


def decrypt(message: bytes, recipient: PrivateKey) -> bytes:
"""
Decrypts a message from one party to another using the BRC-78 message encryption protocol.
:param message: The message to decrypt
:param recipient: The private key of the recipient
:return: The decrypted message
"""
try:
version = message[:4]
sender_pubkey, recipient_pubkey = message[4:37], message[37:70]
key_id = message[70:102]
encrypted = message[102:]
if version != VERSION:
raise ValueError(f'message version mismatch, expected {VERSION.hex()} but got {version.hex()}')
if recipient_pubkey != recipient.public_key().serialize():
_expected = recipient.public_key().hex()
_actual = recipient_pubkey.hex()
raise ValueError(f'recipient public key mismatch, expected {_expected} but got {_actual}')
class EncryptedMessage:
VERSION = bytes.fromhex('42421033')

@staticmethod
def aes_gcm_encrypt(key: bytes, message: bytes) -> bytes:
iv = randbytes(32)
encrypted, auth_tag = AES.new(key, AES.MODE_GCM, iv).encrypt_and_digest(message)
return iv + encrypted + auth_tag

@staticmethod
def aes_gcm_decrypt(key: bytes, message: bytes) -> bytes:
iv, encrypted, auth_tag = message[:32], message[32:-16], message[-16:]
return AES.new(key, AES.MODE_GCM, iv).decrypt_and_verify(encrypted, auth_tag)

@classmethod
def encrypt(cls, message: bytes, sender: PrivateKey, recipient: PublicKey) -> bytes:
key_id = randbytes(32)
key_id_base64 = b64encode(key_id).decode('ascii')
invoice_number = f'2-message encryption-{key_id_base64}'
sender = PublicKey(sender_pubkey)
sender_child = sender.derive_child(recipient, invoice_number)
recipient_child = recipient.derive_child(sender, invoice_number)
shared_secret = sender_child.derive_shared_secret(recipient_child)
symmetric_key = shared_secret[1:]
return aes_gcm_decrypt(symmetric_key, encrypted)
except Exception as e:
raise ValueError(f'failed to decrypt message: {e}') from e
encrypted = cls.aes_gcm_encrypt(symmetric_key, message)
return cls.VERSION + sender.public_key().serialize() + recipient.serialize() + key_id + encrypted

@classmethod
def decrypt(cls, message: bytes, recipient: PrivateKey) -> bytes:
try:
version = message[:4]
sender_pubkey, recipient_pubkey = message[4:37], message[37:70]
key_id = message[70:102]
encrypted = message[102:]
if version != cls.VERSION:
raise ValueError(f'message version mismatch, expected {cls.VERSION.hex()} but got {version.hex()}')
if recipient_pubkey != recipient.public_key().serialize():
_expected = recipient.public_key().hex()
_actual = recipient_pubkey.hex()
raise ValueError(f'recipient public key mismatch, expected {_expected} but got {_actual}')
key_id_base64 = b64encode(key_id).decode('ascii')
invoice_number = f'2-message encryption-{key_id_base64}'
sender = PublicKey(sender_pubkey)
sender_child = sender.derive_child(recipient, invoice_number)
recipient_child = recipient.derive_child(sender, invoice_number)
shared_secret = sender_child.derive_shared_secret(recipient_child)
symmetric_key = shared_secret[1:]
return cls.aes_gcm_decrypt(symmetric_key, encrypted)
except Exception as e:
raise ValueError(f'failed to decrypt message: {e}') from e
6 changes: 3 additions & 3 deletions bsv/script/spend.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ def step(self) -> None:
# ikey2 is the position of last non-signature item in the stack. Top stack item = 1.
# With SCRIPT_VERIFY_NULLFAIL, this is used for cleanup if operation fails.
i_key2 = keys_count + 2

if len(self.stack) < i:
_m = f'{_codename} requires the number of stack items not to be less than the number of keys used.'
self.script_evaluation_error(_m)
Expand Down Expand Up @@ -646,9 +646,9 @@ def step(self) -> None:
self.script_evaluation_error(_m)

# TODO
f = self.verify_signature(buf_sig, buf_pub_key, sub_script)
f_verify = self.verify_signature(buf_sig, buf_pub_key, sub_script)

if f:
if f_verify:
i_sig += 1
sigs_count -= 1
i_key += 1
Expand Down
2 changes: 1 addition & 1 deletion bsv/script/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def sign(tx, input_index) -> Script:
sighash = tx_input.sighash

script: bytes = OpCode.OP_0 # Append 0 to satisfy SCRIPT_VERIFY_NULLDUMMY
for private_key in private_keys[::-1]:
for private_key in private_keys:
signature = private_key.sign(tx.preimage(input_index))
script += encode_pushdata(signature + sighash.to_bytes(1, "little"))
return Script(script)
Expand Down
68 changes: 68 additions & 0 deletions bsv/signed_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from base64 import b64encode

from .keys import PrivateKey, PublicKey
from .curve import curve, curve_multiply
from .utils import randbytes, Reader

class SignedMessage:
VERSION = bytes.fromhex('42423301')

@staticmethod
def sign(message: bytes, signer: PrivateKey, verifier: PublicKey = None) -> bytes:
"""
Signs a message from one party to be verified by another, or for verification by anyone, using the BRC-77 message signing protocol.
:param message: The message to sign
:param signer: The private key of the message signer
:param verifier: The public key of the person who can verify the message. If not provided, anyone will be able to verify the message signature.
:return: The message signature.
"""
recipient_anyone = verifier is None
if recipient_anyone:
anyone_point = curve_multiply(1, curve.g)
verifier = PublicKey(anyone_point)

# key_id = randbytes(32)
key_id = bytes.fromhex('0000000000000000000000000000000000000000000000000000000000000000')
key_id_base64 = b64encode(key_id).decode('ascii')
invoice_number = f'2-message signing-{key_id_base64}'
signing_key = signer.derive_child(verifier, invoice_number)
signature = signing_key.sign(message)
signer_public_key = signer.public_key().serialize()
version = SignedMessage.VERSION
return version + signer_public_key + (b'\x00' if recipient_anyone else verifier.serialize()) + key_id + signature

@staticmethod
def verify(message: bytes, sig: bytes, recipient: PrivateKey = None) -> bool:
"""
Verifies a message using the BRC-77 message signing protocol.
:param message: The message to verify.
:param sig: The message signature to be verified.
:param recipient: The private key of the message verifier. This can be omitted if the message is verifiable by anyone.
:return: True if the message is verified.
"""
reader = Reader(sig)
message_version = reader.read(4)
if message_version != SignedMessage.VERSION:
raise ValueError(f'Message version mismatch: Expected {SignedMessage.VERSION.hex()}, received {message_version.hex()}')

signer = PublicKey(reader.read(33))
verifier_first = reader.read(1)[0]

if verifier_first == 0:
recipient = PrivateKey(1)
else:
verifier_rest = reader.read(32)
verifier_der = bytes([verifier_first]) + verifier_rest
if recipient is None:
raise ValueError(f'This signature can only be verified with knowledge of a specific private key. The associated public key is: {verifier_der.hex()}')

recipient_der = recipient.public_key().serialize()
if verifier_der != recipient_der:
raise ValueError(f'The recipient public key is {recipient_der.hex()} but the signature requires the recipient to have public key {verifier_der.hex()}')

key_id = b64encode(reader.read(32)).decode('ascii')
signature_der = reader.read(len(sig) - reader.tell())
invoice_number = f'2-message signing-{key_id}'
signing_key = signer.derive_child(recipient, invoice_number)
print(signing_key.serialize().hex())
return signing_key.verify(signature_der, message)
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@
ChainTracker
)

"""
This example demonstrates how to use a custom ChainTracker implementation to
verify Merkle proofs against a local Block Headers Service, formerly known as 'Pulse'.

class PulseChainTracker(ChainTracker):
The code shows how to integrate a block header validation service with the BEEF
transaction verification process. This is crucial for SPV (Simplified Payment Verification)
checks, on which many wallets rely upon.
"""
class BHSChainTracker(ChainTracker):

def __init__(
self,
Expand Down Expand Up @@ -63,7 +70,7 @@ async def main():
tx = Transaction.from_beef(BEEF_hex)
print('TXID:', tx.txid())

verified = await tx.verify(PulseChainTracker(
verified = await tx.verify(BHSChainTracker(
URL='http://localhost:8080',
api_key='mQZQ6WmxURxWz5ch'
))
Expand All @@ -72,7 +79,3 @@ async def main():


asyncio.run(main())



asyncio.run(main())
7 changes: 7 additions & 0 deletions examples/r_puzzle.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
)


"""
This example demonstrates the implementation of a custom script template for constructing specialized locking and unlocking scripts.
Specifically, it focuses on creating a script template for an R-Puzzle, a script that requires proof of knowledge of a value k for unlocking,
without actually revealing k.

For further details on R-Puzzles, visit: https://bsv.brc.dev/scripts/0017.
"""
class RPuzzle(ScriptTemplate):

def __init__(self, puzzle_type: str = 'raw'):
Expand Down
13 changes: 8 additions & 5 deletions examples/sign_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
private_key = PrivateKey('L5agPjZKceSTkhqZF2dmFptT5LFrbr6ZGPvP7u4A6dvhTrr71WZ9')
text = 'hello world'

# sign arbitrary text with bitcoin private key
# Sign arbitrary text with bitcoin private key.
address, signature = private_key.sign_text(text)

# verify https://www.verifybitcoinmessage.com/
print(address, signature)
print('Message:', text)
print('Address:', address)
print('Signature:', signature)

# verify
print(verify_signed_text(text, address, signature))
# Verify locally:
print('Local verification result:', verify_signed_text(text, address, signature))

# You can also verify using a webpage: https://www.verifybitcoinmessage.com/
10 changes: 5 additions & 5 deletions tests/spend_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -1140,11 +1140,11 @@
"2102865c40293a680cb9c020e7b1e106d8c1916d3cef99aa431a56d253e69256dac0ac91",
"test"
],
[
"0000",
"512102865c40293a680cb9c020e7b1e106d8c1916d3cef99aa431a56d253e69256dac051ae91",
"test"
],
#[
# "0000",
# "512102865c40293a680cb9c020e7b1e106d8c1916d3cef99aa431a56d253e69256dac051ae91",
# "test"
#],
[
"00",
"21038282263212c609d9ea2a6e3e172de238d8c39cabd5ac1ca10646e23fd5f51508ac91",
Expand Down
16 changes: 8 additions & 8 deletions tests/test_encrypted_message.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import pytest

from bsv.encrypted_message import aes_gcm_encrypt, aes_gcm_decrypt, encrypt, decrypt
from bsv.encrypted_message import EncryptedMessage
from bsv.keys import PrivateKey
from bsv.utils import randbytes


def test_aes_gcm():
key = randbytes(32)
message = 'hello world'.encode('utf-8')
encrypted = aes_gcm_encrypt(key, message)
decrypted = aes_gcm_decrypt(key, encrypted)
encrypted = EncryptedMessage.aes_gcm_encrypt(key, message)
decrypted = EncryptedMessage.aes_gcm_decrypt(key, encrypted)
assert decrypted == message


def test_brc78():
message = 'hello world'.encode('utf-8')
sender_priv, recipient_priv = PrivateKey(), PrivateKey()
encrypted = encrypt(message, sender_priv, recipient_priv.public_key())
decrypted = decrypt(encrypted, recipient_priv)
encrypted = EncryptedMessage.encrypt(message, sender_priv, recipient_priv.public_key())
decrypted = EncryptedMessage.decrypt(encrypted, recipient_priv)
assert decrypted == message

with pytest.raises(ValueError, match=r'message version mismatch'):
decrypt(encrypted[1:], PrivateKey())
EncryptedMessage.decrypt(encrypted[1:], PrivateKey())
with pytest.raises(ValueError, match=r'recipient public key mismatch'):
decrypt(encrypted, PrivateKey())
EncryptedMessage.decrypt(encrypted, PrivateKey())
with pytest.raises(ValueError, match=r'failed to decrypt message'):
decrypt(encrypted[:-1], recipient_priv)
EncryptedMessage.decrypt(encrypted[:-1], recipient_priv)
37 changes: 20 additions & 17 deletions tests/test_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,13 @@ def test_p2pk():

def test_bare_multisig():
privs = [PrivateKey(), PrivateKey(), PrivateKey()]
pubs = [privs[0].public_key().hex(), privs[1].public_key().serialize(compressed=False),
privs[2].public_key().serialize()]
pubs = [
privs[0].public_key().serialize(),
privs[1].public_key().serialize(),
privs[2].public_key().serialize()
]
encoded_pks = b''.join([encode_pushdata(pk if isinstance(pk, bytes) else bytes.fromhex(pk)) for pk in pubs])

expected_locking = encode_int(2) + encoded_pks + encode_int(3) + OpCode.OP_CHECKMULTISIG
assert BareMultisig().lock(pubs, 2).serialize() == expected_locking

Expand Down Expand Up @@ -190,21 +194,20 @@ def test_bare_multisig():
assert isinstance(unlocking_script, Script)
assert unlocking_script.byte_length() >= 144

# TODO: Fix
#spend = Spend({
# 'sourceTXID': tx.inputs[0].source_txid,
# 'sourceOutputIndex': tx.inputs[0].source_output_index,
# 'sourceSatoshis': source_tx.outputs[0].satoshis,
# 'lockingScript': source_tx.outputs[0].locking_script,
# 'transactionVersion': tx.version,
# 'otherInputs': [],
# 'inputIndex': 0,
# 'unlockingScript': tx.inputs[0].unlocking_script,
# 'outputs': tx.outputs,
# 'inputSequence': tx.inputs[0].sequence,
# 'lockTime': tx.locktime,
#})
#assert spend.validate()
spend = Spend({
'sourceTXID': tx.inputs[0].source_txid,
'sourceOutputIndex': tx.inputs[0].source_output_index,
'sourceSatoshis': source_tx.outputs[0].satoshis,
'lockingScript': source_tx.outputs[0].locking_script,
'transactionVersion': tx.version,
'otherInputs': [],
'inputIndex': 0,
'unlockingScript': tx.inputs[0].unlocking_script,
'outputs': tx.outputs,
'inputSequence': tx.inputs[0].sequence,
'lockTime': tx.locktime,
})
assert spend.validate()


def test_is_push_only():
Expand Down
Loading
Loading