#!/usr/bin/env python3 """ Multi-Provider VPN Gateway Backend Supports: Mullvad, Custom WireGuard, Imported Configs With permanent killswitch protection """ from flask import Flask, request, jsonify, send_from_directory from flask_cors import CORS import subprocess import json import os import re import requests import time import yaml import base64 from datetime import datetime import threading import logging from pathlib import Path from typing import Dict, List, Optional app = Flask(__name__) CORS(app) # Configuration CONFIG_FILE = '/opt/vpn-gateway/config.json' PROVIDERS_DIR = '/opt/vpn-gateway/providers' WIREGUARD_DIR = '/etc/wireguard' # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/log/vpn-gateway.log'), logging.StreamHandler() ] ) class VPNProvider: """Base class for VPN providers""" def __init__(self, name: str): self.name = name self.servers = {} def get_servers(self) -> Dict: raise NotImplementedError def generate_config(self, server: str) -> str: raise NotImplementedError class MullvadProvider(VPNProvider): """Mullvad VPN provider""" def __init__(self): super().__init__("mullvad") self.api_url = "https://api.mullvad.net/www/relays/all/" self.public_key = "g+9JNZp3SvLPvBb+PzXHyOPHhqNiUdATrz1YdNEPvWo=" def get_servers(self) -> Dict: try: response = requests.get(self.api_url, timeout=10) servers = response.json() organized = {} for server in servers: if server.get('type') == 'wireguard' and server.get('active'): country = server.get('country_name', 'Unknown') city = server.get('city_name', 'Unknown') if country not in organized: organized[country] = {} if city not in organized[country]: organized[country][city] = [] organized[country][city].append({ 'hostname': server['hostname'], 'ipv4': server['ipv4_addr_in'], 'ipv6': server.get('ipv6_addr_in'), 'type': 'WireGuard', 'provider': 'Mullvad' }) self.servers = organized return organized except Exception as e: logging.error(f"Failed to fetch Mullvad servers: {e}") return {} def generate_config(self, server_hostname: str) -> str: server_info = self._find_server(server_hostname) if not server_info: raise ValueError(f"Server {server_hostname} not found") private_key = self._get_or_generate_key() return f"""# Mullvad WireGuard Configuration # Server: {server_hostname} # Provider: Mullvad [Interface] PrivateKey = {private_key} Address = 10.64.0.2/32,fc00:bbbb:bbbb:bb01::2/128 DNS = 100.64.0.1 # PERMANENT KILLSWITCH - CANNOT BE DISABLED PreUp = iptables -F OUTPUT PreUp = iptables -F FORWARD PostUp = iptables -I OUTPUT ! -o %i -m mark ! --mark $(wg show %i fwmark) -m addrtype ! --dst-type LOCAL -j REJECT PostUp = iptables -I FORWARD -i eth0 -o %i -j ACCEPT PostUp = iptables -t nat -A POSTROUTING -o %i -j MASQUERADE PreDown = iptables -D OUTPUT ! -o %i -m mark ! --mark $(wg show %i fwmark) -m addrtype ! --dst-type LOCAL -j REJECT PostDown = iptables -t nat -D POSTROUTING -o %i -j MASQUERADE [Peer] PublicKey = {self.public_key} AllowedIPs = 0.0.0.0/0,::/0 Endpoint = {server_info['ipv4']}:51820 PersistentKeepalive = 25 """ def _find_server(self, hostname: str) -> Optional[Dict]: for country in self.servers.values(): for city in country.values(): for server in city: if server['hostname'] == hostname: return server return None def _get_or_generate_key(self) -> str: key_file = f"{WIREGUARD_DIR}/mullvad_private.key" if os.path.exists(key_file): with open(key_file, 'r') as f: return f.read().strip() else: private_key = subprocess.check_output(['wg', 'genkey'], text=True).strip() with open(key_file, 'w') as f: f.write(private_key) os.chmod(key_file, 0o600) return private_key class CustomWireGuardProvider(VPNProvider): """Custom WireGuard servers provider""" def __init__(self): super().__init__("custom") self.config_file = f"{PROVIDERS_DIR}/custom_servers.json" self.load_servers() def load_servers(self): """Load custom servers from config file""" if os.path.exists(self.config_file): try: with open(self.config_file, 'r') as f: self.servers = json.load(f) except Exception as e: logging.error(f"Failed to load custom servers: {e}") self.servers = {} else: self.servers = {} def save_servers(self): """Save custom servers to config file""" os.makedirs(PROVIDERS_DIR, exist_ok=True) with open(self.config_file, 'w') as f: json.dump(self.servers, f, indent=2) def add_server(self, name: str, config: Dict) -> bool: """Add a custom WireGuard server""" try: location = config.get('location', 'Custom') if location not in self.servers: self.servers[location] = {} if 'Custom' not in self.servers[location]: self.servers[location]['Custom'] = [] self.servers[location]['Custom'].append({ 'hostname': name, 'endpoint': config['endpoint'], 'public_key': config['public_key'], 'private_key': config.get('private_key'), 'address': config.get('address', '10.0.0.2/32'), 'dns': config.get('dns', '1.1.1.1,1.0.0.1'), 'allowed_ips': config.get('allowed_ips', '0.0.0.0/0,::/0'), 'keepalive': config.get('keepalive', 25), 'type': 'WireGuard', 'provider': 'Custom' }) self.save_servers() return True except Exception as e: logging.error(f"Failed to add custom server: {e}") return False def remove_server(self, name: str) -> bool: """Remove a custom server""" for location in self.servers.values(): for city in location.values(): for i, server in enumerate(city): if server['hostname'] == name: city.pop(i) self.save_servers() return True return False def generate_config(self, server_name: str) -> str: server_info = self._find_server(server_name) if not server_info: raise ValueError(f"Server {server_name} not found") private_key = server_info.get('private_key') if not private_key: private_key = self._get_or_generate_key(server_name) dns_servers = server_info.get('dns', '1.1.1.1,1.0.0.1') return f"""# Custom WireGuard Configuration # Server: {server_name} # Provider: Custom [Interface] PrivateKey = {private_key} Address = {server_info.get('address', '10.0.0.2/32')} DNS = {dns_servers} # PERMANENT KILLSWITCH - CANNOT BE DISABLED PreUp = iptables -F OUTPUT PreUp = iptables -F FORWARD PostUp = iptables -I OUTPUT ! -o %i -m mark ! --mark $(wg show %i fwmark) -m addrtype ! --dst-type LOCAL -j REJECT PostUp = iptables -I FORWARD -i eth0 -o %i -j ACCEPT PostUp = iptables -t nat -A POSTROUTING -o %i -j MASQUERADE PreDown = iptables -D OUTPUT ! -o %i -m mark ! --mark $(wg show %i fwmark) -m addrtype ! --dst-type LOCAL -j REJECT PostDown = iptables -t nat -D POSTROUTING -o %i -j MASQUERADE [Peer] PublicKey = {server_info['public_key']} AllowedIPs = {server_info.get('allowed_ips', '0.0.0.0/0,::/0')} Endpoint = {server_info['endpoint']} PersistentKeepalive = {server_info.get('keepalive', 25)} """ def _find_server(self, name: str) -> Optional[Dict]: for location in self.servers.values(): for city in location.values(): for server in city: if server['hostname'] == name: return server return None def _get_or_generate_key(self, name: str) -> str: key_file = f"{WIREGUARD_DIR}/custom_{name}_private.key" if os.path.exists(key_file): with open(key_file, 'r') as f: return f.read().strip() else: private_key = subprocess.check_output(['wg', 'genkey'], text=True).strip() with open(key_file, 'w') as f: f.write(private_key) os.chmod(key_file, 0o600) return private_key class ImportedConfigProvider(VPNProvider): """Provider for imported WireGuard configs""" def __init__(self): super().__init__("imported") self.configs_dir = f"{PROVIDERS_DIR}/imported" os.makedirs(self.configs_dir, exist_ok=True) self.load_configs() def load_configs(self): """Load all imported configs""" self.servers = {"Imported": {"Configs": []}} for config_file in Path(self.configs_dir).glob("*.conf"): name = config_file.stem self.servers["Imported"]["Configs"].append({ 'hostname': name, 'file': str(config_file), 'type': 'WireGuard', 'provider': 'Imported' }) def import_config(self, name: str, config_content: str) -> bool: """Import a WireGuard config""" try: # Validate config if '[Interface]' not in config_content or '[Peer]' not in config_content: raise ValueError("Invalid WireGuard configuration") # Add killswitch if not present if 'PostUp' not in config_content: config_content = self._add_killswitch(config_content) # Save config config_file = f"{self.configs_dir}/{name}.conf" with open(config_file, 'w') as f: f.write(config_content) os.chmod(config_file, 0o600) self.load_configs() return True except Exception as e: logging.error(f"Failed to import config: {e}") return False def _add_killswitch(self, config: str) -> str: """Add killswitch rules to imported config""" killswitch_rules = """ # PERMANENT KILLSWITCH - ADDED AUTOMATICALLY PreUp = iptables -F OUTPUT PreUp = iptables -F FORWARD PostUp = iptables -I OUTPUT ! -o %i -m mark ! --mark $(wg show %i fwmark) -m addrtype ! --dst-type LOCAL -j REJECT PostUp = iptables -I FORWARD -i eth0 -o %i -j ACCEPT PostUp = iptables -t nat -A POSTROUTING -o %i -j MASQUERADE PreDown = iptables -D OUTPUT ! -o %i -m mark ! --mark $(wg show %i fwmark) -m addrtype ! --dst-type LOCAL -j REJECT PostDown = iptables -t nat -D POSTROUTING -o %i -j MASQUERADE """ # Insert after [Interface] section lines = config.split('\n') for i, line in enumerate(lines): if line.strip() == '[Interface]': # Find next section or end for j in range(i+1, len(lines)): if lines[j].strip().startswith('['): lines.insert(j, killswitch_rules) break else: lines.insert(len(lines), killswitch_rules) break return '\n'.join(lines) def generate_config(self, name: str) -> str: for server in self.servers["Imported"]["Configs"]: if server['hostname'] == name: with open(server['file'], 'r') as f: return f.read() raise ValueError(f"Config {name} not found") # Global provider instances PROVIDERS = { 'mullvad': MullvadProvider(), 'custom': CustomWireGuardProvider(), 'imported': ImportedConfigProvider() } # Current provider CURRENT_PROVIDER = None # VPN Status VPN_STATUS = { 'connected': False, 'provider': None, 'server': None, 'ip': None, 'location': None, 'start_time': None } def load_config(): """Load application configuration""" global CURRENT_PROVIDER if os.path.exists(CONFIG_FILE): try: with open(CONFIG_FILE, 'r') as f: config = json.load(f) provider_name = config.get('provider', 'mullvad') CURRENT_PROVIDER = PROVIDERS.get(provider_name) logging.info(f"Loaded provider: {provider_name}") except Exception as e: logging.error(f"Failed to load config: {e}") CURRENT_PROVIDER = PROVIDERS['mullvad'] else: CURRENT_PROVIDER = PROVIDERS['mullvad'] def save_config(): """Save application configuration""" try: config = { 'provider': CURRENT_PROVIDER.name if CURRENT_PROVIDER else 'mullvad', 'timestamp': datetime.now().isoformat() } os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True) with open(CONFIG_FILE, 'w') as f: json.dump(config, f, indent=2) except Exception as e: logging.error(f"Failed to save config: {e}") def check_vpn_status(): """Check current VPN status""" global VPN_STATUS try: result = subprocess.run(['wg', 'show', 'wg0'], capture_output=True, text=True) if result.returncode == 0 and 'interface:' in result.stdout.lower(): VPN_STATUS['connected'] = True # Get public IP and location try: # Try Mullvad API first response = requests.get('https://am.i.mullvad.net/json', timeout=5) data = response.json() VPN_STATUS['ip'] = data.get('ip') if data.get('mullvad_exit_ip'): VPN_STATUS['location'] = f"{data.get('city')}, {data.get('country')}" else: # Fallback to ipinfo response = requests.get('https://ipinfo.io/json', timeout=5) data = response.json() VPN_STATUS['ip'] = data.get('ip') VPN_STATUS['location'] = f"{data.get('city')}, {data.get('country')}" except: pass else: VPN_STATUS['connected'] = False VPN_STATUS['server'] = None VPN_STATUS['ip'] = None VPN_STATUS['location'] = None VPN_STATUS['provider'] = None except Exception as e: logging.error(f"Status check error: {e}") VPN_STATUS['connected'] = False # Flask Routes @app.route('/') def index(): return send_from_directory('/opt/vpn-gateway/static', 'index.html') @app.route('/api/providers') def get_providers(): """Get available providers""" return jsonify({ 'providers': list(PROVIDERS.keys()), 'current': CURRENT_PROVIDER.name if CURRENT_PROVIDER else None }) @app.route('/api/provider/', methods=['POST']) def set_provider(provider_name): """Switch provider""" global CURRENT_PROVIDER if provider_name not in PROVIDERS: return jsonify({'success': False, 'error': 'Invalid provider'}), 400 # Disconnect if connected if VPN_STATUS['connected']: subprocess.run(['wg-quick', 'down', 'wg0'], capture_output=True) CURRENT_PROVIDER = PROVIDERS[provider_name] save_config() return jsonify({'success': True, 'provider': provider_name}) @app.route('/api/servers') def get_servers(): """Get servers for current provider""" if not CURRENT_PROVIDER: return jsonify({'servers': {}}) servers = CURRENT_PROVIDER.get_servers() return jsonify({ 'servers': servers, 'provider': CURRENT_PROVIDER.name }) @app.route('/api/custom/add', methods=['POST']) def add_custom_server(): """Add custom WireGuard server""" if not isinstance(CURRENT_PROVIDER, CustomWireGuardProvider): return jsonify({'success': False, 'error': 'Not in custom mode'}), 400 data = request.json name = data.get('name') config = { 'endpoint': data.get('endpoint'), 'public_key': data.get('public_key'), 'private_key': data.get('private_key'), 'address': data.get('address', '10.0.0.2/32'), 'dns': data.get('dns', '1.1.1.1,1.0.0.1'), 'allowed_ips': data.get('allowed_ips', '0.0.0.0/0,::/0'), 'location': data.get('location', 'Custom') } if CURRENT_PROVIDER.add_server(name, config): return jsonify({'success': True}) else: return jsonify({'success': False, 'error': 'Failed to add server'}), 500 @app.route('/api/custom/remove/', methods=['DELETE']) def remove_custom_server(name): """Remove custom server""" if not isinstance(CURRENT_PROVIDER, CustomWireGuardProvider): return jsonify({'success': False, 'error': 'Not in custom mode'}), 400 if CURRENT_PROVIDER.remove_server(name): return jsonify({'success': True}) else: return jsonify({'success': False, 'error': 'Server not found'}), 404 @app.route('/api/import', methods=['POST']) def import_config(): """Import WireGuard config""" data = request.json name = data.get('name') config_content = data.get('config') if not name or not config_content: return jsonify({'success': False, 'error': 'Missing name or config'}), 400 provider = PROVIDERS['imported'] if provider.import_config(name, config_content): return jsonify({'success': True}) else: return jsonify({'success': False, 'error': 'Failed to import config'}), 500 @app.route('/api/status') def get_status(): """Get VPN status""" check_vpn_status() uptime = None if VPN_STATUS['connected'] and VPN_STATUS['start_time']: uptime_seconds = int(time.time() - VPN_STATUS['start_time']) hours = uptime_seconds // 3600 minutes = (uptime_seconds % 3600) // 60 uptime = f"{hours}h {minutes}m" return jsonify({ 'connected': VPN_STATUS['connected'], 'provider': VPN_STATUS['provider'], 'server': VPN_STATUS['server'], 'ip': VPN_STATUS['ip'], 'location': VPN_STATUS['location'], 'uptime': uptime, 'killswitch_active': True # Always true }) @app.route('/api/connect', methods=['POST']) def connect_vpn(): """Connect to VPN""" data = request.json server = data.get('server') if not server or not CURRENT_PROVIDER: return jsonify({'success': False, 'error': 'No server or provider selected'}), 400 try: # Disconnect if connected subprocess.run(['wg-quick', 'down', 'wg0'], capture_output=True) time.sleep(1) # Generate config config = CURRENT_PROVIDER.generate_config(server) # Write config with open('/etc/wireguard/wg0.conf', 'w') as f: f.write(config) os.chmod('/etc/wireguard/wg0.conf', 0o600) # Add firewall exception for endpoint endpoint_match = re.search(r'Endpoint = ([\d.]+):', config) if endpoint_match: subprocess.run([ 'iptables', '-I', 'OUTPUT', '1', '-p', 'udp', '--dport', '51820', '-d', endpoint_match.group(1), '-j', 'ACCEPT' ]) # Connect result = subprocess.run(['wg-quick', 'up', 'wg0'], capture_output=True, text=True) if result.returncode == 0: VPN_STATUS['start_time'] = time.time() VPN_STATUS['server'] = server VPN_STATUS['provider'] = CURRENT_PROVIDER.name logging.info(f"Connected to {server} via {CURRENT_PROVIDER.name}") return jsonify({'success': True}) else: logging.error(f"Connection failed: {result.stderr}") return jsonify({'success': False, 'error': result.stderr}), 500 except Exception as e: logging.error(f"Connect error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/disconnect', methods=['POST']) def disconnect_vpn(): """Disconnect VPN""" try: result = subprocess.run(['wg-quick', 'down', 'wg0'], capture_output=True, text=True) VPN_STATUS['start_time'] = None VPN_STATUS['connected'] = False VPN_STATUS['provider'] = None return jsonify({ 'success': result.returncode == 0, 'message': 'Disconnected - No internet (killswitch active)' }) except Exception as e: logging.error(f"Disconnect error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/keypair', methods=['GET']) def generate_keypair(): """Generate WireGuard keypair""" try: private_key = subprocess.check_output(['wg', 'genkey'], text=True).strip() public_key = subprocess.check_output(['wg', 'pubkey'], input=private_key, text=True).strip() return jsonify({ 'private_key': private_key, 'public_key': public_key }) except Exception as e: return jsonify({'error': str(e)}), 500 if __name__ == '__main__': # Load configuration load_config() # Create directories os.makedirs('/opt/vpn-gateway/static', exist_ok=True) os.makedirs(PROVIDERS_DIR, exist_ok=True) # Start app app.run(host='0.0.0.0', port=5000, debug=False)