Files
nixos/utils/pkgs/clicknload-proxy/clicknload-proxy.py

298 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Click'n'Load proxy - receives CNL requests and forwards to pyLoad API.
Implements the Click'n'Load protocol:
1. GET /jdcheck.js -> responds with jdownloader=true
2. POST /flash/addcrypted2 -> decrypts links and sends to pyLoad
"""
import argparse
import base64
import html
import json
import re
import sys
import traceback
import urllib.request
import urllib.parse
from http.server import HTTPServer, BaseHTTPRequestHandler
from Crypto.Cipher import AES
def log(msg):
"""Log with immediate flush for systemd journal visibility."""
print(f"[CNL] {msg}", flush=True)
def fetch_package_name(url):
"""Fetch package name from source page by extracting <h2> tag (like JDownloader)."""
if not url or not url.startswith("http"):
return None
try:
log(f"Fetching package name from {url}")
req = urllib.request.Request(
url,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
content = resp.read().decode("utf-8", errors="ignore")
# Extract <h2> content like JDownloader does
match = re.search(r'<h2[^>]*>([^<]+)<', content)
if match:
name = html.unescape(match.group(1)).strip()
log(f"Extracted package name: {name}")
return name
log("No <h2> tag found on page")
return None
except Exception as e:
log(f"Failed to fetch package name: {e}")
return None
def extract_package_name_from_links(links):
"""Extract package name from common prefix of link filenames."""
if not links:
return None
# Extract filenames from URLs
filenames = []
for link in links:
parsed = urllib.parse.urlparse(link)
path = urllib.parse.unquote(parsed.path)
filename = path.split("/")[-1] if path else ""
# Remove extension
if "." in filename:
filename = filename.rsplit(".", 1)[0]
if filename:
filenames.append(filename)
if not filenames:
return None
if len(filenames) == 1:
# Single file - use its name
name = filenames[0]
log(f"Single file, using name: {name}")
return name
# Find common prefix among all filenames
prefix = filenames[0]
for filename in filenames[1:]:
while prefix and not filename.startswith(prefix):
# Remove last character or segment
if "." in prefix:
prefix = prefix.rsplit(".", 1)[0]
elif "-" in prefix:
prefix = prefix.rsplit("-", 1)[0]
elif "_" in prefix:
prefix = prefix.rsplit("_", 1)[0]
else:
prefix = prefix[:-1]
# Clean up trailing separators
prefix = prefix.rstrip(".-_ ")
if prefix and len(prefix) >= 3:
log(f"Common prefix from {len(filenames)} files: {prefix}")
return prefix
# Fallback: use first filename
name = filenames[0]
log(f"No common prefix, using first filename: {name}")
return name
class ClickNLoadHandler(BaseHTTPRequestHandler):
pyload_url = None
pyload_user = None
pyload_pass = None
def log_message(self, format, *args):
log(f"{self.command} {self.path}")
def send_cors_headers(self):
"""Add CORS headers to allow cross-origin requests."""
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
def do_OPTIONS(self):
"""Handle CORS preflight requests."""
self.send_response(200)
self.send_cors_headers()
self.end_headers()
def do_GET(self):
if self.path == "/jdcheck.js":
self.send_response(200)
self.send_header("Content-Type", "text/javascript")
self.send_cors_headers()
self.end_headers()
self.wfile.write(b"jdownloader=true;")
else:
self.send_response(404)
self.send_cors_headers()
self.end_headers()
def do_POST(self):
if self.path == "/flash/addcrypted2":
content_length = int(self.headers.get("Content-Length", 0))
post_data = self.rfile.read(content_length).decode("utf-8")
params = urllib.parse.parse_qs(post_data)
jk = params.get("jk", [""])[0]
crypted = params.get("crypted", [""])[0]
source = params.get("source", ["Click'n'Load"])[0]
# Get actual page URL from Referer header
referer = self.headers.get("Referer", "")
log(f"Received addcrypted2: source={source}, referer={referer}")
log(f" jk_len={len(jk)}, crypted_len={len(crypted)}")
try:
links = self.decrypt_links(jk, crypted)
if links:
# Try to get package name: referer page -> link filenames -> source
package_name = (
fetch_package_name(referer) or
extract_package_name_from_links(links) or
source
)
self.add_to_pyload(links, package_name)
self.send_response(200)
self.send_cors_headers()
self.end_headers()
self.wfile.write(b"success")
else:
log("Error: No links found after decryption")
self.send_response(500)
self.send_cors_headers()
self.end_headers()
self.wfile.write(b"No links found")
except Exception as e:
log(f"Error: {e}")
log(traceback.format_exc())
self.send_response(500)
self.send_cors_headers()
self.end_headers()
self.wfile.write(str(e).encode())
else:
self.send_response(404)
self.send_cors_headers()
self.end_headers()
def decrypt_links(self, jk, crypted):
"""Decrypt Click'n'Load encrypted links."""
# Extract hex key from JavaScript function
# Format: function f(){ return '...hex...'; }
match = re.search(r"return\s*['\"]([0-9a-fA-F]+)['\"]", jk)
if not match:
log(f"Could not extract key from jk: {jk[:100]}...")
raise ValueError("Could not extract key from jk")
key_hex = match.group(1)
key = bytes.fromhex(key_hex)
log(f"Extracted key: {len(key)} bytes")
# Decrypt (AES-128-CBC, key is also IV)
cipher = AES.new(key, AES.MODE_CBC, iv=key)
encrypted = base64.b64decode(crypted)
decrypted = cipher.decrypt(encrypted)
# Remove PKCS7 padding
pad_len = decrypted[-1]
decrypted = decrypted[:-pad_len]
# Split into links
links = decrypted.decode("utf-8").strip().split("\n")
links = [l.strip() for l in links if l.strip()]
log(f"Decrypted {len(links)} links")
return links
def add_to_pyload(self, links, package_name):
"""Add links to pyLoad via API using HTTP Basic Auth."""
if not self.pyload_url:
log("No pyLoad URL configured, printing links:")
for link in links:
log(f" {link}")
return
log(f"Adding package to pyLoad at {self.pyload_url}")
# Build Basic Auth header
credentials = f"{self.pyload_user}:{self.pyload_pass}"
auth_header = base64.b64encode(credentials.encode()).decode()
# Add package using new API endpoint with HTTP Basic Auth
add_url = f"{self.pyload_url}/api/add_package"
add_data = json.dumps({
"name": package_name or "Click'n'Load",
"links": links
}).encode()
req = urllib.request.Request(
add_url,
data=add_data,
headers={
"Content-Type": "application/json",
"Authorization": f"Basic {auth_header}"
}
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
result = resp.read().decode()
log(f"Added package to pyLoad: {result}")
except urllib.error.HTTPError as e:
body = e.read().decode() if e.fp else ""
log(f"Failed to add package: {e.code} {e.reason} - {body}")
raise
except urllib.error.URLError as e:
log(f"Failed to add package: {e}")
raise
def main():
parser = argparse.ArgumentParser(description="Click'n'Load proxy for pyLoad")
parser.add_argument("--port", type=int, default=9666, help="Port to listen on")
parser.add_argument("--host", default="127.0.0.1", help="Host to bind to")
parser.add_argument("--pyload-url", help="pyLoad URL (e.g., https://pyload.example.com)")
parser.add_argument("--pyload-user", help="pyLoad username")
parser.add_argument("--pyload-pass", help="pyLoad password")
parser.add_argument("--config", help="Config file with pyLoad credentials (JSON)")
args = parser.parse_args()
# Load config from file if provided
if args.config:
with open(args.config) as f:
config = json.load(f)
args.pyload_url = config.get("pyloadUrl", args.pyload_url)
args.pyload_user = config.get("pyloadUser", args.pyload_user)
args.pyload_pass = config.get("pyloadPW", args.pyload_pass)
ClickNLoadHandler.pyload_url = args.pyload_url
ClickNLoadHandler.pyload_user = args.pyload_user
ClickNLoadHandler.pyload_pass = args.pyload_pass
server = HTTPServer((args.host, args.port), ClickNLoadHandler)
log(f"Click'n'Load proxy listening on {args.host}:{args.port}")
if args.pyload_url:
log(f"Forwarding to pyLoad at {args.pyload_url}")
else:
log("No pyLoad URL configured, will print links to stdout")
try:
server.serve_forever()
except KeyboardInterrupt:
log("Shutting down")
server.shutdown()
if __name__ == "__main__":
main()