-
Notifications
You must be signed in to change notification settings - Fork 1
/
identifier.py
164 lines (141 loc) · 6.43 KB
/
identifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import logging
from semantic import semantic_feature
from flow import data_flow, token_flow
log = logging.getLogger(__name__)
class AttackIdentifier:
def __init__(
self,
input_contract,
contracts,
main_contract_sign_list,
external_call_in_func_sigature,
visited_contracts,
visited_funcs,
):
self.contracts = contracts
self.main_contract_sign_list = main_contract_sign_list
self.external_call_in_func_sigature = external_call_in_func_sigature
self.intra_callsigs = []
self.sensitive_callsigs = []
self.attack_matrix = {
"br": False,
"dos": False,
"reentrancy": False,
"price_manipulation": False,
}
self.visited_contracts = visited_contracts
self.visited_funcs = visited_funcs
self.victim_callback_info = {}
self.attack_reenter_info = {}
self.input_contract = input_contract
self.flow_analysis = data_flow.FlowAnalysis(input_contract, contracts)
self.semantic_analysis = semantic_feature.AttackSemantics(contracts)
def detect(self):
cross_contract = False
for key in self.contracts.keys():
if self.contracts[key].level != 0:
cross_contract = True
if not cross_contract:
return False, self.attack_matrix
result = False
# br and dos detection
if self.semantic_analysis.intraprocedural_br_analysis():
self.attack_matrix["br"] = True
if self.semantic_analysis.intraprocedural_dos_analysis():
self.attack_matrix["dos"] = True
if self.flow_analysis.intraprocedural_fla_analysis():
log.info("intraprocedural analysis true of flashloan attack")
print("intraprocedural analysis true of flashloan attack")
self.attack_matrix["price_manipulation"] = True
else:
# define the vulnerable trace
pps_near_fl_source = self.flow_analysis.get_pps_near_fl_source()
pps_near_fl_sink = self.flow_analysis.get_pps_near_fl_sink()
self.attack_matrix["price_manipulation"] = (
self.flow_analysis.find_potential_price_manipulation_attack(
pps_near_fl_source, pps_near_fl_sink
)
)
if self.semantic_analysis.op_externalcall_callback_analysis():
self.attack_matrix["price_manipulation"] = True
# so how to define the tainted source
# !the tainted source should only be from the analyzed contracts (i.e., input contract)
pps_near_source = self.flow_analysis.get_pps_near_source()
# and how to define the sentive sink
pps_near_sink, sensitive_callsigs = self.flow_analysis.get_pps_near_sink()
# set call sigs in the sink site
self.sensitive_callsigs = sensitive_callsigs
reachable = False
reachable_site = {}
# for every source, find whether one sink can be reached
for pp1 in pps_near_source:
for pp2 in pps_near_sink:
if self.flow_analysis.is_same(pp1, pp2):
reachable = True
caller = pp2["caller"]
caller_funcSign = pp2["caller_funcSign"]
reachable_site[pp2["func_sign"]] = {
"caller": caller,
"caller_callback_funcSign": caller_funcSign,
}
elif self.flow_analysis.is_reachable(pp1, pp2):
reachable = True
caller = pp2["caller"]
caller_funcSign = pp2["caller_funcSign"]
reachable_site[pp2["func_sign"]] = {
"caller": caller,
"caller_callback_funcSign": caller_funcSign,
}
victim_callback_info = {}
attack_reenter_info = {}
if reachable:
# judge whether the attacker contract implements the sensitive functions (called by victims)
overlap = list(
set(sensitive_callsigs).intersection(
set(self.external_call_in_func_sigature)
)
)
if len(overlap) > 0:
for i in overlap:
victim_callback_info[i] = []
attack_reenter_info[i] = []
if i in reachable_site:
if reachable_site[i] not in victim_callback_info[i]:
victim_callback_info[i].append(reachable_site[i])
for key in self.contracts.keys():
if (
str(self.contracts[key].func_sign) == str(i)
and self.contracts[key].level == 0
):
externall_calls = self.contracts[key].external_calls
for ec in externall_calls:
temp_target_address = ec["logic_addr"]
temp_funcSign = ec["funcSign"]
res = {
"reenter_target": temp_target_address,
"reenter_funcSign": temp_funcSign,
}
if (
res not in attack_reenter_info[i]
and temp_target_address in self.visited_contracts
and temp_funcSign in self.visited_funcs
):
attack_reenter_info[i].append(res)
result = True
self.attack_matrix["reentrancy"] = True
if (
self.semantic_analysis.double_call_to_same_contract()
or self.semantic_analysis.double_call_to_same_contract_by_storage()
or self.semantic_analysis.preset_call_in_standard_erc20_transfer()
):
self.attack_matrix["reentrancy"] = True
result = True
self.victim_callback_info = victim_callback_info
self.attack_reenter_info = attack_reenter_info
return result, self.attack_matrix
def get_reen_info(self):
return self.victim_callback_info, self.attack_reenter_info
def get_sig_info(self):
return self.sensitive_callsigs
def get_attack_matric(self):
return self.attack_matrix