forked from ton-studio/ton-etl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnft_history.py
95 lines (78 loc) · 3.45 KB
/
nft_history.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from loguru import logger
from db import DB
from model.parser import Parser, TOPIC_NFT_TRANSFERS
from model.nft_history import NftHistory
BURN_ADDRESSES = [
Parser.uf2raw("EQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAM9c"),
Parser.uf2raw("EQAREREREREREREREREREREREREREREREREREREREREREeYT"),
]
class NftHistoryParser(Parser):
def topics(self):
return [TOPIC_NFT_TRANSFERS]
def predicate(self, obj) -> bool:
return True
def handle_internal(self, obj: dict, db: DB):
if obj.get("tx_aborted"):
return
current_owner_sale = db.get_nft_sale(obj.get("old_owner"))
if not current_owner_sale:
# TODO Add handler for not parsed sales
pass
new_owner_sale = db.get_nft_sale(obj.get("new_owner"))
if not new_owner_sale:
# TODO Add handler for not parsed sales
pass
current_owner = obj.get("old_owner")
new_owner = obj.get("new_owner")
sale_address = None
code_hash = None
marketplace = None
price = None
is_auction = None
if new_owner_sale:
event_type = NftHistory.EVENT_TYPE_INIT_SALE
sale_address = new_owner_sale.get("address")
code_hash = new_owner_sale.get("code_hash")
marketplace = new_owner_sale.get("marketplace")
current_owner = new_owner_sale.get("owner")
new_owner = None
is_auction = new_owner_sale.get("is_auction")
price = 0 if is_auction else new_owner_sale.get("price")
elif current_owner_sale and current_owner_sale.get("owner") == obj.get("new_owner"):
event_type = NftHistory.EVENT_TYPE_CANCEL_SALE
sale_address = current_owner_sale.get("address")
code_hash = current_owner_sale.get("code_hash")
marketplace = current_owner_sale.get("marketplace")
current_owner = current_owner_sale.get("owner")
new_owner = None
is_auction = current_owner_sale.get("is_auction")
price = 0 if is_auction else current_owner_sale.get("price")
elif current_owner_sale and current_owner_sale.get("owner") != obj.get("new_owner"):
event_type = NftHistory.EVENT_TYPE_SALE
sale_address = current_owner_sale.get("address")
code_hash = current_owner_sale.get("code_hash")
marketplace = current_owner_sale.get("marketplace")
current_owner = current_owner_sale.get("owner")
is_auction = current_owner_sale.get("is_auction")
price = current_owner_sale.get("price")
elif obj.get("new_owner") in BURN_ADDRESSES:
event_type = NftHistory.EVENT_TYPE_BURN
else:
event_type = NftHistory.EVENT_TYPE_TRANSFER
nft_history = NftHistory(
tx_hash=Parser.require(obj.get('tx_hash', None)),
utime=Parser.require(obj.get("tx_now")),
tx_lt=Parser.require(obj.get("tx_lt")),
event_type=event_type,
nft_item_address=obj.get("nft_item_address"),
collection_address=obj.get("nft_collection_address"),
sale_address=sale_address,
code_hash=code_hash,
marketplace=marketplace,
current_owner=current_owner,
new_owner=new_owner,
price=price,
is_auction=is_auction,
)
logger.info(f"Adding NFT history event {nft_history}")
db.serialize(nft_history)