-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscan.py
More file actions
120 lines (99 loc) · 3.7 KB
/
scan.py
File metadata and controls
120 lines (99 loc) · 3.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import threading
import requests
import ipaddress
from urllib.parse import urlparse
from tqdm import tqdm
class Scan():
threads = []
max_threads = 512
ip_network = []
url = ''
timeout = 500
request_options = None
verify = None
output_file = 'ip_response.txt'
def __init__(self, ip):
threading.Thread.__init__(self)
self.setIp(ip)
def setMaxThreads(self, max_threads):
self.max_threads = max_threads
return self
def setIp(self, ip, reverse=False):
ip_network = ipaddress.ip_network(ip)
self.ip_network = list(ip_network)
if reverse:
self.ip_network.reverse()
return self
def setUrl(self, url):
self.url = url
return self
def setTimeout(self, timeout):
self.timeout = timeout
return self
def setRequestOptions(self, **request_options):
self.request_options = request_options
return self
def setVerify(self, verify):
self.verify = verify
return self
def getVerify(self, response):
if self.verify is None:
return response.json()
else:
return self.verify(response)
def get_ip_response(self, ip_address):
try:
response = self.getRequest(ip_address)
result = self.getVerify(response)
with open(self.output_file, 'a') as f:
f.write(str(ip_address) + ': ' + str(result) + '\n')
except:
pass
def getRequest(self, ip_address):
# 解析URL,提取域名和路径
parsed_url = urlparse(self.url)
domain = parsed_url.netloc
path = parsed_url.path
if parsed_url.query:
path += '?' + parsed_url.query
# 准备headers,添加Host头
# 先创建默认的Host头
default_headers = {'Host': domain}
# 准备请求参数(默认值)
request_kwargs = {
'method': 'GET',
'url': f"http://{ip_address}{path}",
'timeout': self.timeout / 1000,
'headers': default_headers,
'allow_redirects': False
}
# 如果有额外的请求选项,让它们覆盖默认值
if self.request_options:
# 创建副本以避免修改原始选项
options_copy = self.request_options.copy()
# 处理headers:需要合并而不是覆盖
if 'headers' in options_copy:
# 将默认的Host头合并到用户设置的headers中
# 用户设置的headers优先,但如果用户没有设置Host,则保留默认的
user_headers = options_copy['headers']
if 'Host' not in user_headers:
user_headers['Host'] = domain
# 移除headers,因为我们已经在下面处理了
options_copy.pop('headers')
# 将合并后的headers设置为最终值
request_kwargs['headers'] = user_headers
# 更新其他所有参数(用户设置的会覆盖默认值)
request_kwargs.update(options_copy)
return requests.request(**request_kwargs)
def run(self):
with tqdm(total=len(self.ip_network)) as pbar:
for i, ip_address in enumerate(self.ip_network):
if len(self.threads) >= self.max_threads:
self.threads[0].join()
self.threads = self.threads[1:]
thread = threading.Thread(target=self.get_ip_response, args=(ip_address,))
thread.start()
self.threads.append(thread)
pbar.update(1)
for thread in self.threads:
thread.join()