-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMessages.py
126 lines (110 loc) · 3.64 KB
/
Messages.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import struct
from bitstring import *
from BlockandPiece import Piece
class keep_alive:
# keep-alive: <len=0000>
length = 0
def __init__(self):
return
@classmethod
def byteStringForKeepAlive(cls):
return struct.pack('>I', cls.length)
class chock:
# choke: <len=0001><id=0>
length = 1
message_ID = 0
def __init__(self) -> None:
pass
@classmethod
def to_bytes(self):
return pack(">IB", self.length, self.message_ID)
@classmethod
def parse_response(self, response):
length, message_ID = struct.unpack(">IB", response[:5])
if length == self.length and message_ID == self.message_ID:
return 1
else: return -1
class unchoke:
# unchoke: <len=0001><id=1>
length = 1
message_ID = 1
def __init__(self):
return
@classmethod
def parse_response(self, response):
length, message_id = struct.unpack('>IB', response[:5])
if length == self.length and message_id == self.message_ID:
return 1
else:
return -1
class interested:
# interested: <len=0001><id=2>
length = 1
message_ID = 2
def __init__(self):
return
def byteStringForInterested(self):
return struct.pack('>IB', self.length, self.message_ID)
class not_interested:
# not interested: <len=0001><id=3>
length = 1
message_ID = 3
def __init__(self) -> None:
pass
def byteStringForNotInterested(self):
return struct.pack(">IB", self.length, self.message_ID)
class have:
# have: <len=0005><id=4><piece index>
length = 5
message_ID = 4
def __init__(self, piece_index) -> None:
self.piece_index = piece_index
@classmethod
def parse_response(self, response):
length, message_ID, piece_index = struct.unpack(">IBI", response[:9])
if length == self.length and message_ID == self.message_ID:
return have(piece_index)
return -1
class Bitfield:
# bitfield: <len=0001+X><id=5><bitfield>
length = None
message_ID = 5
def __init__(self, bitfield):
self.bitfield = bitfield
self.bitfield_as_bytes = bitfield.tobytes()
self.bitfield_length = len(self.bitfield_as_bytes)
self.length = 1 + self.bitfield_length
@classmethod
def parse_response(cls, response):
length, message_ID = struct.unpack(">IB", response[:5])
bitfield_length = length - 1
if message_ID == cls.message_ID:
raw_bytes, = struct.unpack(f">{bitfield_length}s", response[5 : 5 + bitfield_length])
bitfield = BitArray(bytes = bytes(raw_bytes))
return Bitfield(bitfield)
else:
return -1
class request:
# request: <len=0013><id=6><index><begin><length>
length = 13
message_ID = 6
def __init__(self, piece_index, piece_offset, block_length):
self.piece_index = piece_index
self.piece_offset = piece_offset
self.block_length = block_length
def byteStringForRequest(self):
return struct.pack(">IBIII", self.length, self.message_ID, self.piece_index, self.piece_offset, self.block_length)
class pieceMessage:
# piece: <len=0009+X><id=7><index><begin><block>
length = None
message_ID = 7
def __init__(self):
return
@classmethod
def parse_response(cls, response):
block_length = len(response) - 13
length, message_ID, piece_index, block_offset , block= struct.unpack(f">IBII{block_length}s", response)
if cls.message_ID == message_ID:
return (piece_index, block_length, block_offset, block)
else:
return -1