New branch

This commit is contained in:
nocci 2025-08-10 15:34:34 +02:00
commit 58d70409b5
31 changed files with 9093 additions and 0 deletions

648
backend/app.py Normal file
View file

@ -0,0 +1,648 @@
#!/usr/bin/env python3
"""
Multi-Provider VPN Gateway Backend
Supports: Mullvad, Custom WireGuard, Imported Configs
With permanent killswitch protection
"""
from flask import Flask, request, jsonify, send_from_directory
from flask_cors import CORS
import subprocess
import json
import os
import re
import requests
import time
import yaml
import base64
from datetime import datetime
import threading
import logging
from pathlib import Path
from typing import Dict, List, Optional
app = Flask(__name__)
CORS(app)
# Configuration
CONFIG_FILE = '/opt/vpn-gateway/config.json'
PROVIDERS_DIR = '/opt/vpn-gateway/providers'
WIREGUARD_DIR = '/etc/wireguard'
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/vpn-gateway.log'),
logging.StreamHandler()
]
)
class VPNProvider:
"""Base class for VPN providers"""
def __init__(self, name: str):
self.name = name
self.servers = {}
def get_servers(self) -> Dict:
raise NotImplementedError
def generate_config(self, server: str) -> str:
raise NotImplementedError
class MullvadProvider(VPNProvider):
"""Mullvad VPN provider"""
def __init__(self):
super().__init__("mullvad")
self.api_url = "https://api.mullvad.net/www/relays/all/"
self.public_key = "g+9JNZp3SvLPvBb+PzXHyOPHhqNiUdATrz1YdNEPvWo="
def get_servers(self) -> Dict:
try:
response = requests.get(self.api_url, timeout=10)
servers = response.json()
organized = {}
for server in servers:
if server.get('type') == 'wireguard' and server.get('active'):
country = server.get('country_name', 'Unknown')
city = server.get('city_name', 'Unknown')
if country not in organized:
organized[country] = {}
if city not in organized[country]:
organized[country][city] = []
organized[country][city].append({
'hostname': server['hostname'],
'ipv4': server['ipv4_addr_in'],
'ipv6': server.get('ipv6_addr_in'),
'type': 'WireGuard',
'provider': 'Mullvad'
})
self.servers = organized
return organized
except Exception as e:
logging.error(f"Failed to fetch Mullvad servers: {e}")
return {}
def generate_config(self, server_hostname: str) -> str:
server_info = self._find_server(server_hostname)
if not server_info:
raise ValueError(f"Server {server_hostname} not found")
private_key = self._get_or_generate_key()
return f"""# Mullvad WireGuard Configuration
# Server: {server_hostname}
# Provider: Mullvad
[Interface]
PrivateKey = {private_key}
Address = 10.64.0.2/32,fc00:bbbb:bbbb:bb01::2/128
DNS = 100.64.0.1
# PERMANENT KILLSWITCH - CANNOT BE DISABLED
PreUp = iptables -F OUTPUT
PreUp = iptables -F FORWARD
PostUp = iptables -I OUTPUT ! -o %i -m mark ! --mark $(wg show %i fwmark) -m addrtype ! --dst-type LOCAL -j REJECT
PostUp = iptables -I FORWARD -i eth0 -o %i -j ACCEPT
PostUp = iptables -t nat -A POSTROUTING -o %i -j MASQUERADE
PreDown = iptables -D OUTPUT ! -o %i -m mark ! --mark $(wg show %i fwmark) -m addrtype ! --dst-type LOCAL -j REJECT
PostDown = iptables -t nat -D POSTROUTING -o %i -j MASQUERADE
[Peer]
PublicKey = {self.public_key}
AllowedIPs = 0.0.0.0/0,::/0
Endpoint = {server_info['ipv4']}:51820
PersistentKeepalive = 25
"""
def _find_server(self, hostname: str) -> Optional[Dict]:
for country in self.servers.values():
for city in country.values():
for server in city:
if server['hostname'] == hostname:
return server
return None
def _get_or_generate_key(self) -> str:
key_file = f"{WIREGUARD_DIR}/mullvad_private.key"
if os.path.exists(key_file):
with open(key_file, 'r') as f:
return f.read().strip()
else:
private_key = subprocess.check_output(['wg', 'genkey'], text=True).strip()
with open(key_file, 'w') as f:
f.write(private_key)
os.chmod(key_file, 0o600)
return private_key
class CustomWireGuardProvider(VPNProvider):
"""Custom WireGuard servers provider"""
def __init__(self):
super().__init__("custom")
self.config_file = f"{PROVIDERS_DIR}/custom_servers.json"
self.load_servers()
def load_servers(self):
"""Load custom servers from config file"""
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r') as f:
self.servers = json.load(f)
except Exception as e:
logging.error(f"Failed to load custom servers: {e}")
self.servers = {}
else:
self.servers = {}
def save_servers(self):
"""Save custom servers to config file"""
os.makedirs(PROVIDERS_DIR, exist_ok=True)
with open(self.config_file, 'w') as f:
json.dump(self.servers, f, indent=2)
def add_server(self, name: str, config: Dict) -> bool:
"""Add a custom WireGuard server"""
try:
location = config.get('location', 'Custom')
if location not in self.servers:
self.servers[location] = {}
if 'Custom' not in self.servers[location]:
self.servers[location]['Custom'] = []
self.servers[location]['Custom'].append({
'hostname': name,
'endpoint': config['endpoint'],
'public_key': config['public_key'],
'private_key': config.get('private_key'),
'address': config.get('address', '10.0.0.2/32'),
'dns': config.get('dns', '1.1.1.1,1.0.0.1'),
'allowed_ips': config.get('allowed_ips', '0.0.0.0/0,::/0'),
'keepalive': config.get('keepalive', 25),
'type': 'WireGuard',
'provider': 'Custom'
})
self.save_servers()
return True
except Exception as e:
logging.error(f"Failed to add custom server: {e}")
return False
def remove_server(self, name: str) -> bool:
"""Remove a custom server"""
for location in self.servers.values():
for city in location.values():
for i, server in enumerate(city):
if server['hostname'] == name:
city.pop(i)
self.save_servers()
return True
return False
def generate_config(self, server_name: str) -> str:
server_info = self._find_server(server_name)
if not server_info:
raise ValueError(f"Server {server_name} not found")
private_key = server_info.get('private_key')
if not private_key:
private_key = self._get_or_generate_key(server_name)
dns_servers = server_info.get('dns', '1.1.1.1,1.0.0.1')
return f"""# Custom WireGuard Configuration
# Server: {server_name}
# Provider: Custom
[Interface]
PrivateKey = {private_key}
Address = {server_info.get('address', '10.0.0.2/32')}
DNS = {dns_servers}
# PERMANENT KILLSWITCH - CANNOT BE DISABLED
PreUp = iptables -F OUTPUT
PreUp = iptables -F FORWARD
PostUp = iptables -I OUTPUT ! -o %i -m mark ! --mark $(wg show %i fwmark) -m addrtype ! --dst-type LOCAL -j REJECT
PostUp = iptables -I FORWARD -i eth0 -o %i -j ACCEPT
PostUp = iptables -t nat -A POSTROUTING -o %i -j MASQUERADE
PreDown = iptables -D OUTPUT ! -o %i -m mark ! --mark $(wg show %i fwmark) -m addrtype ! --dst-type LOCAL -j REJECT
PostDown = iptables -t nat -D POSTROUTING -o %i -j MASQUERADE
[Peer]
PublicKey = {server_info['public_key']}
AllowedIPs = {server_info.get('allowed_ips', '0.0.0.0/0,::/0')}
Endpoint = {server_info['endpoint']}
PersistentKeepalive = {server_info.get('keepalive', 25)}
"""
def _find_server(self, name: str) -> Optional[Dict]:
for location in self.servers.values():
for city in location.values():
for server in city:
if server['hostname'] == name:
return server
return None
def _get_or_generate_key(self, name: str) -> str:
key_file = f"{WIREGUARD_DIR}/custom_{name}_private.key"
if os.path.exists(key_file):
with open(key_file, 'r') as f:
return f.read().strip()
else:
private_key = subprocess.check_output(['wg', 'genkey'], text=True).strip()
with open(key_file, 'w') as f:
f.write(private_key)
os.chmod(key_file, 0o600)
return private_key
class ImportedConfigProvider(VPNProvider):
"""Provider for imported WireGuard configs"""
def __init__(self):
super().__init__("imported")
self.configs_dir = f"{PROVIDERS_DIR}/imported"
os.makedirs(self.configs_dir, exist_ok=True)
self.load_configs()
def load_configs(self):
"""Load all imported configs"""
self.servers = {"Imported": {"Configs": []}}
for config_file in Path(self.configs_dir).glob("*.conf"):
name = config_file.stem
self.servers["Imported"]["Configs"].append({
'hostname': name,
'file': str(config_file),
'type': 'WireGuard',
'provider': 'Imported'
})
def import_config(self, name: str, config_content: str) -> bool:
"""Import a WireGuard config"""
try:
# Validate config
if '[Interface]' not in config_content or '[Peer]' not in config_content:
raise ValueError("Invalid WireGuard configuration")
# Add killswitch if not present
if 'PostUp' not in config_content:
config_content = self._add_killswitch(config_content)
# Save config
config_file = f"{self.configs_dir}/{name}.conf"
with open(config_file, 'w') as f:
f.write(config_content)
os.chmod(config_file, 0o600)
self.load_configs()
return True
except Exception as e:
logging.error(f"Failed to import config: {e}")
return False
def _add_killswitch(self, config: str) -> str:
"""Add killswitch rules to imported config"""
killswitch_rules = """
# PERMANENT KILLSWITCH - ADDED AUTOMATICALLY
PreUp = iptables -F OUTPUT
PreUp = iptables -F FORWARD
PostUp = iptables -I OUTPUT ! -o %i -m mark ! --mark $(wg show %i fwmark) -m addrtype ! --dst-type LOCAL -j REJECT
PostUp = iptables -I FORWARD -i eth0 -o %i -j ACCEPT
PostUp = iptables -t nat -A POSTROUTING -o %i -j MASQUERADE
PreDown = iptables -D OUTPUT ! -o %i -m mark ! --mark $(wg show %i fwmark) -m addrtype ! --dst-type LOCAL -j REJECT
PostDown = iptables -t nat -D POSTROUTING -o %i -j MASQUERADE
"""
# Insert after [Interface] section
lines = config.split('\n')
for i, line in enumerate(lines):
if line.strip() == '[Interface]':
# Find next section or end
for j in range(i+1, len(lines)):
if lines[j].strip().startswith('['):
lines.insert(j, killswitch_rules)
break
else:
lines.insert(len(lines), killswitch_rules)
break
return '\n'.join(lines)
def generate_config(self, name: str) -> str:
for server in self.servers["Imported"]["Configs"]:
if server['hostname'] == name:
with open(server['file'], 'r') as f:
return f.read()
raise ValueError(f"Config {name} not found")
# Global provider instances
PROVIDERS = {
'mullvad': MullvadProvider(),
'custom': CustomWireGuardProvider(),
'imported': ImportedConfigProvider()
}
# Current provider
CURRENT_PROVIDER = None
# VPN Status
VPN_STATUS = {
'connected': False,
'provider': None,
'server': None,
'ip': None,
'location': None,
'start_time': None
}
def load_config():
"""Load application configuration"""
global CURRENT_PROVIDER
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
provider_name = config.get('provider', 'mullvad')
CURRENT_PROVIDER = PROVIDERS.get(provider_name)
logging.info(f"Loaded provider: {provider_name}")
except Exception as e:
logging.error(f"Failed to load config: {e}")
CURRENT_PROVIDER = PROVIDERS['mullvad']
else:
CURRENT_PROVIDER = PROVIDERS['mullvad']
def save_config():
"""Save application configuration"""
try:
config = {
'provider': CURRENT_PROVIDER.name if CURRENT_PROVIDER else 'mullvad',
'timestamp': datetime.now().isoformat()
}
os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True)
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=2)
except Exception as e:
logging.error(f"Failed to save config: {e}")
def check_vpn_status():
"""Check current VPN status"""
global VPN_STATUS
try:
result = subprocess.run(['wg', 'show', 'wg0'], capture_output=True, text=True)
if result.returncode == 0 and 'interface:' in result.stdout.lower():
VPN_STATUS['connected'] = True
# Get public IP and location
try:
# Try Mullvad API first
response = requests.get('https://am.i.mullvad.net/json', timeout=5)
data = response.json()
VPN_STATUS['ip'] = data.get('ip')
if data.get('mullvad_exit_ip'):
VPN_STATUS['location'] = f"{data.get('city')}, {data.get('country')}"
else:
# Fallback to ipinfo
response = requests.get('https://ipinfo.io/json', timeout=5)
data = response.json()
VPN_STATUS['ip'] = data.get('ip')
VPN_STATUS['location'] = f"{data.get('city')}, {data.get('country')}"
except:
pass
else:
VPN_STATUS['connected'] = False
VPN_STATUS['server'] = None
VPN_STATUS['ip'] = None
VPN_STATUS['location'] = None
VPN_STATUS['provider'] = None
except Exception as e:
logging.error(f"Status check error: {e}")
VPN_STATUS['connected'] = False
# Flask Routes
@app.route('/')
def index():
return send_from_directory('/opt/vpn-gateway/static', 'index.html')
@app.route('/api/providers')
def get_providers():
"""Get available providers"""
return jsonify({
'providers': list(PROVIDERS.keys()),
'current': CURRENT_PROVIDER.name if CURRENT_PROVIDER else None
})
@app.route('/api/provider/<provider_name>', methods=['POST'])
def set_provider(provider_name):
"""Switch provider"""
global CURRENT_PROVIDER
if provider_name not in PROVIDERS:
return jsonify({'success': False, 'error': 'Invalid provider'}), 400
# Disconnect if connected
if VPN_STATUS['connected']:
subprocess.run(['wg-quick', 'down', 'wg0'], capture_output=True)
CURRENT_PROVIDER = PROVIDERS[provider_name]
save_config()
return jsonify({'success': True, 'provider': provider_name})
@app.route('/api/servers')
def get_servers():
"""Get servers for current provider"""
if not CURRENT_PROVIDER:
return jsonify({'servers': {}})
servers = CURRENT_PROVIDER.get_servers()
return jsonify({
'servers': servers,
'provider': CURRENT_PROVIDER.name
})
@app.route('/api/custom/add', methods=['POST'])
def add_custom_server():
"""Add custom WireGuard server"""
if not isinstance(CURRENT_PROVIDER, CustomWireGuardProvider):
return jsonify({'success': False, 'error': 'Not in custom mode'}), 400
data = request.json
name = data.get('name')
config = {
'endpoint': data.get('endpoint'),
'public_key': data.get('public_key'),
'private_key': data.get('private_key'),
'address': data.get('address', '10.0.0.2/32'),
'dns': data.get('dns', '1.1.1.1,1.0.0.1'),
'allowed_ips': data.get('allowed_ips', '0.0.0.0/0,::/0'),
'location': data.get('location', 'Custom')
}
if CURRENT_PROVIDER.add_server(name, config):
return jsonify({'success': True})
else:
return jsonify({'success': False, 'error': 'Failed to add server'}), 500
@app.route('/api/custom/remove/<name>', methods=['DELETE'])
def remove_custom_server(name):
"""Remove custom server"""
if not isinstance(CURRENT_PROVIDER, CustomWireGuardProvider):
return jsonify({'success': False, 'error': 'Not in custom mode'}), 400
if CURRENT_PROVIDER.remove_server(name):
return jsonify({'success': True})
else:
return jsonify({'success': False, 'error': 'Server not found'}), 404
@app.route('/api/import', methods=['POST'])
def import_config():
"""Import WireGuard config"""
data = request.json
name = data.get('name')
config_content = data.get('config')
if not name or not config_content:
return jsonify({'success': False, 'error': 'Missing name or config'}), 400
provider = PROVIDERS['imported']
if provider.import_config(name, config_content):
return jsonify({'success': True})
else:
return jsonify({'success': False, 'error': 'Failed to import config'}), 500
@app.route('/api/status')
def get_status():
"""Get VPN status"""
check_vpn_status()
uptime = None
if VPN_STATUS['connected'] and VPN_STATUS['start_time']:
uptime_seconds = int(time.time() - VPN_STATUS['start_time'])
hours = uptime_seconds // 3600
minutes = (uptime_seconds % 3600) // 60
uptime = f"{hours}h {minutes}m"
return jsonify({
'connected': VPN_STATUS['connected'],
'provider': VPN_STATUS['provider'],
'server': VPN_STATUS['server'],
'ip': VPN_STATUS['ip'],
'location': VPN_STATUS['location'],
'uptime': uptime,
'killswitch_active': True # Always true
})
@app.route('/api/connect', methods=['POST'])
def connect_vpn():
"""Connect to VPN"""
data = request.json
server = data.get('server')
if not server or not CURRENT_PROVIDER:
return jsonify({'success': False, 'error': 'No server or provider selected'}), 400
try:
# Disconnect if connected
subprocess.run(['wg-quick', 'down', 'wg0'], capture_output=True)
time.sleep(1)
# Generate config
config = CURRENT_PROVIDER.generate_config(server)
# Write config
with open('/etc/wireguard/wg0.conf', 'w') as f:
f.write(config)
os.chmod('/etc/wireguard/wg0.conf', 0o600)
# Add firewall exception for endpoint
endpoint_match = re.search(r'Endpoint = ([\d.]+):', config)
if endpoint_match:
subprocess.run([
'iptables', '-I', 'OUTPUT', '1', '-p', 'udp',
'--dport', '51820', '-d', endpoint_match.group(1), '-j', 'ACCEPT'
])
# Connect
result = subprocess.run(['wg-quick', 'up', 'wg0'],
capture_output=True, text=True)
if result.returncode == 0:
VPN_STATUS['start_time'] = time.time()
VPN_STATUS['server'] = server
VPN_STATUS['provider'] = CURRENT_PROVIDER.name
logging.info(f"Connected to {server} via {CURRENT_PROVIDER.name}")
return jsonify({'success': True})
else:
logging.error(f"Connection failed: {result.stderr}")
return jsonify({'success': False, 'error': result.stderr}), 500
except Exception as e:
logging.error(f"Connect error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/disconnect', methods=['POST'])
def disconnect_vpn():
"""Disconnect VPN"""
try:
result = subprocess.run(['wg-quick', 'down', 'wg0'],
capture_output=True, text=True)
VPN_STATUS['start_time'] = None
VPN_STATUS['connected'] = False
VPN_STATUS['provider'] = None
return jsonify({
'success': result.returncode == 0,
'message': 'Disconnected - No internet (killswitch active)'
})
except Exception as e:
logging.error(f"Disconnect error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/keypair', methods=['GET'])
def generate_keypair():
"""Generate WireGuard keypair"""
try:
private_key = subprocess.check_output(['wg', 'genkey'], text=True).strip()
public_key = subprocess.check_output(['wg', 'pubkey'], input=private_key, text=True).strip()
return jsonify({
'private_key': private_key,
'public_key': public_key
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# Load configuration
load_config()
# Create directories
os.makedirs('/opt/vpn-gateway/static', exist_ok=True)
os.makedirs(PROVIDERS_DIR, exist_ok=True)
# Start app
app.run(host='0.0.0.0', port=5000, debug=False)

5
backend/requirements.txt Normal file
View file

@ -0,0 +1,5 @@
flask==2.3.3
flask-cors==4.0.0
requests==2.31.0
gunicorn==21.2.0
pyyaml==6.0.1