GameKeyManager/steam-gift-manager/app.py

1338 lines
44 KiB
Python
Raw Permalink Normal View History

# Standard library imports
import atexit
import csv
2025-04-26 12:32:07 +00:00
import io
import locale
import logging
import os
import random
2025-04-26 12:32:07 +00:00
import re
import secrets
import sqlite3
import time
import traceback
from datetime import datetime, timedelta
from functools import wraps
from io import BytesIO
from time import sleep
from urllib.parse import urlparse
from zoneinfo import ZoneInfo
import warnings
# 3rd-Provider-Modules
import pytz
2025-04-26 12:32:07 +00:00
import requests
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
2025-04-26 12:32:07 +00:00
from apscheduler.schedulers.background import BackgroundScheduler
from dotenv import load_dotenv
from flask import (
Flask,
Markup,
abort,
flash,
g,
jsonify,
make_response,
redirect,
render_template,
request,
send_file,
session,
url_for
)
from flask_login import (
LoginManager,
UserMixin,
current_user,
login_required,
login_user,
logout_user
)
2025-04-26 12:32:07 +00:00
from flask_migrate import Migrate
from flask_session import Session
from flask_sqlalchemy import SQLAlchemy
from flask_wtf import CSRFProtect, FlaskForm
from redis import Redis
from reportlab.lib import colors
2025-04-26 12:32:07 +00:00
from reportlab.lib.pagesizes import A4, landscape, letter
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
from reportlab.lib.units import cm, inch, mm
from reportlab.lib.utils import ImageReader
from reportlab.pdfgen import canvas
2025-04-26 12:32:07 +00:00
from reportlab.platypus import (
Image,
Paragraph,
SimpleDocTemplate,
Spacer,
Table,
TableStyle
2025-04-26 12:32:07 +00:00
)
from sqlalchemy import MetaData, UniqueConstraint, event
from sqlalchemy.engine import Engine
from sqlalchemy.exc import IntegrityError, LegacyAPIWarning
from sqlalchemy.orm import joinedload
from werkzeug.security import check_password_hash, generate_password_hash
2025-05-10 14:14:22 +00:00
from werkzeug.middleware.proxy_fix import ProxyFix
from wtforms import SelectField, StringField, TextAreaField, validators
# Config
load_dotenv(override=True)
warnings.simplefilter("ignore", category=LegacyAPIWarning)
# Logging-Config
logging.basicConfig(level=logging.INFO)
logging.getLogger('apscheduler').setLevel(logging.WARNING)
@event.listens_for(Engine, "connect")
def enable_foreign_keys(dbapi_connection, connection_record):
if isinstance(dbapi_connection, sqlite3.Connection):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON;")
cursor.close()
ITAD_API_KEY_PLACEHOLDER = "your_api_key_here"
TZ = os.getenv('TZ', 'UTC')
os.environ['TZ'] = TZ
2025-04-21 11:56:29 +00:00
app = Flask(__name__)
app.jinja_env.globals['getattr'] = getattr
@app.errorhandler(404)
def not_found_error(error):
return render_template('404.html'), 404
2025-05-10 14:14:22 +00:00
app.wsgi_app = ProxyFix(
app.wsgi_app,
x_proto=1, # Trust X-Forwarded-Proto Header
x_host=1 # Trust X-Forwarded-Host Header
)
# UNIX-Systems (Linux, Docker)
try:
time.tzset()
except AttributeError:
pass # tzset not availabe on Windows
local_tz = pytz.timezone(TZ)
# Load Languages
import os
import json
TRANSLATION_DIR = os.path.join(os.getcwd(), 'translations')
SUPPORTED_LANGUAGES = ['de', 'en']
TRANSLATIONS = {}
for lang in SUPPORTED_LANGUAGES:
try:
with open(os.path.join(TRANSLATION_DIR, f'{lang}.json'), encoding='utf-8') as f:
TRANSLATIONS[lang] = json.load(f)
print(f"✅ Loaded {lang} translations")
except Exception:
print(f"❌ Failed loading {lang}.json: {str(e)}")
TRANSLATIONS[lang] = {}
def translate(key, lang=None, **kwargs):
lang = lang or session.get('lang', 'en')
fallback_lang = app.config.get('DEFAULT_LANGUAGE', 'en')
translations = TRANSLATIONS.get(lang, {})
fallback_translations = TRANSLATIONS.get(fallback_lang, {})
value = translations.get(key) or fallback_translations.get(key) or key
return value.format(**kwargs) if isinstance(value, str) else value
## DEBUG Translations
if app.debug:
print(f"Loaded translations for 'de': {TRANSLATIONS.get('de', {})}")
### Admin decorator
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
abort(403)
if not current_user.is_admin:
abort(403)
return f(*args, **kwargs)
return decorated_function
2025-04-26 12:32:07 +00:00
csrf = CSRFProtect(app)
convention = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s"
}
metadata = MetaData(naming_convention=convention)
load_dotenv(override=True)
# load variables from .env with override
2025-04-26 12:32:07 +00:00
load_dotenv(override=True)
2025-04-29 13:19:59 +00:00
# App-Configuration
2025-04-26 12:32:07 +00:00
app.config.update(
# Most Important
2025-04-26 12:32:07 +00:00
SECRET_KEY=os.getenv('SECRET_KEY'),
SQLALCHEMY_DATABASE_URI = 'sqlite:////app/data/games.db',
SQLALCHEMY_TRACK_MODIFICATIONS = False,
DEFAULT_LANGUAGE='en',
ITAD_COUNTRY = os.getenv("ITAD_COUNTRY", "DE"),
# SESSION-HANDLING (In Production: Use Redis!)
SESSION_TYPE='redis',
SESSION_PERMANENT = False,
SESSION_USE_SIGNER = True,
SESSION_REDIS=Redis.from_url(os.getenv("REDIS_URL", "redis://redis:6379/0")),
SESSION_FILE_DIR = '/app/data/flask-sessions',
SESSION_COOKIE_NAME = 'gamekeys_session',
SESSION_COOKIE_SECURE = os.getenv('SESSION_COOKIE_SECURE', 'False').lower() == 'true',
SESSION_COOKIE_HTTPONLY = True,
SESSION_COOKIE_SAMESITE = 'Lax',
PERMANENT_SESSION_LIFETIME = timedelta(days=30),
# LOGIN COOKIE STUFF
REMEMBER_COOKIE_DURATION=timedelta(days=30),
REMEMBER_COOKIE_HTTPONLY=True,
REMEMBER_COOKIE_SECURE=True if os.getenv('FORCE_HTTPS', 'False').lower() == 'true' else False,
REMEMBER_COOKIE_SAMESITE='Lax',
# CSRF-PROTECTION
WTF_CSRF_ENABLED = True,
WTF_CSRF_SECRET_KEY = os.getenv('CSRF_SECRET_KEY', os.urandom(32).hex()),
WTF_CSRF_TIME_LIMIT = 3600,
# SECURITYsa & PERFORMANCE
REGISTRATION_ENABLED = os.getenv('REGISTRATION_ENABLED', 'True').lower() == 'true',
SEND_FILE_MAX_AGE_DEFAULT = int(os.getenv('SEND_FILE_MAX_AGE_DEFAULT', 0)),
TEMPLATES_AUTO_RELOAD = os.getenv('TEMPLATES_AUTO_RELOAD', 'True').lower() == 'true',
PREFERRED_URL_SCHEME = 'https' if os.getenv('FORCE_HTTPS') else 'http'
2025-04-26 12:32:07 +00:00
)
Session(app)
2025-04-26 12:32:07 +00:00
interval_hours = int(os.getenv('CHECK_EXPIRING_KEYS_INTERVAL_HOURS', 12))
# Init
2025-04-26 12:32:07 +00:00
db = SQLAlchemy(app, metadata=metadata)
migrate = Migrate(app, db)
2025-04-21 11:56:29 +00:00
login_manager = LoginManager(app)
login_manager.login_view = 'login'
2025-04-26 12:32:07 +00:00
# Logging
app.logger.addHandler(logging.StreamHandler())
app.logger.setLevel(logging.DEBUG)
@app.errorhandler(403)
def forbidden_error(error):
return render_template('403.html'), 403
2025-04-26 12:32:07 +00:00
@app.before_request
def set_language():
if 'lang' not in session or not session['lang']:
session['lang'] = app.config.get('DEFAULT_LANGUAGE', 'en')
g.lang = session['lang']
def enforce_https():
if os.getenv('FORCE_HTTPS', 'False').lower() == 'true' and not app.debug:
proto = request.headers.get('X-Forwarded-Proto', 'http')
if proto != 'https' and not request.is_secure:
url = request.url.replace('http://', 'https://', 1)
return redirect(url, code=301)
def debug_translations():
if app.debug:
app.logger.debug(f"Lang: {session.get('lang')}")
app.before_request(enforce_https)
2025-04-21 11:56:29 +00:00
@app.context_processor
def inject_template_globals():
return {
'_': lambda key, **kwargs: translate(key, lang=session.get('lang', 'en'), **kwargs),
'now': datetime.now(local_tz),
'app_version': os.getenv('APP_VERSION', '1.0.0'),
'local_tz': local_tz
}
@app.template_filter('strftime')
def _jinja2_filter_datetime(date, fmt='%d.%m.%Y'):
if date is None:
return ''
return date.strftime(fmt)
@app.errorhandler(403)
def forbidden(e):
return render_template('403.html'), 403
2025-04-21 11:56:29 +00:00
2025-04-29 13:19:59 +00:00
# DB Models
class ActivityLog(db.Model):
__tablename__ = 'activity_logs'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
action = db.Column(db.String(100), nullable=False)
details = db.Column(db.Text)
timestamp = db.Column(db.DateTime, default=lambda: datetime.now(local_tz))
user = db.relationship('User', backref='activities')
class User(UserMixin, db.Model):
2025-04-26 12:32:07 +00:00
__tablename__ = 'users'
2025-04-21 11:56:29 +00:00
id = db.Column(db.Integer, primary_key=True)
2025-04-26 12:32:07 +00:00
username = db.Column(db.String(80), unique=True, nullable=False)
password = db.Column(db.String(256), nullable=False)
is_admin = db.Column(db.Boolean, default=False)
games = db.relationship(
'Game',
back_populates='owner',
cascade='all, delete-orphan',
passive_deletes=True
)
2025-04-21 11:56:29 +00:00
class Game(db.Model):
__tablename__ = 'games'
__table_args__ = (
UniqueConstraint('steam_key', 'user_id', name='uq_steam_key_user'),
)
2025-04-21 11:56:29 +00:00
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
2025-04-26 12:32:07 +00:00
steam_key = db.Column(db.String(100), nullable=False, unique=True)
2025-04-21 11:56:29 +00:00
status = db.Column(db.String(50), nullable=False)
recipient = db.Column(db.String(100))
notes = db.Column(db.Text)
url = db.Column(db.String(200))
created_at = db.Column(db.DateTime, default=lambda: datetime.now(local_tz))
2025-04-21 11:56:29 +00:00
redeem_date = db.Column(db.DateTime)
steam_appid = db.Column(db.String(20))
platform = db.Column(db.String(50), default='pc')
current_price = db.Column(db.Float)
current_price_shop = db.Column(db.String(100))
historical_low = db.Column(db.Float)
release_date = db.Column(db.DateTime)
release_date = db.Column(db.DateTime)
itad_slug = db.Column(db.String(200))
steam_description_en = db.Column(db.Text)
steam_description_de = db.Column(db.Text)
# with users.id
user_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'), nullable=False)
owner = db.relationship(
'User',
back_populates='games'
)
redeem_tokens = db.relationship(
'RedeemToken',
back_populates='game',
cascade='all, delete-orphan',
passive_deletes=True
)
2025-04-21 11:56:29 +00:00
2025-04-26 12:32:07 +00:00
class RedeemToken(db.Model):
__tablename__ = 'redeem_tokens'
2025-04-26 12:32:07 +00:00
id = db.Column(db.Integer, primary_key=True)
token = db.Column(db.String(17), unique=True, nullable=False)
expires = db.Column(db.DateTime(timezone=True), nullable=False)
2025-04-26 12:32:07 +00:00
total_hours = db.Column(db.Integer, nullable=False)
# ForeignKey with CASCADE
game_id = db.Column(
db.Integer,
db.ForeignKey('games.id', ondelete='CASCADE'),
nullable=False
)
game = db.relationship('Game', back_populates='redeem_tokens')
def is_expired(self):
# use timeszone (from .env)
local_tz = pytz.timezone(os.getenv('TZ', 'UTC'))
now = datetime.now(local_tz)
return now > self.expires.astimezone(local_tz)
class GameForm(FlaskForm):
name = StringField('Name', [validators.DataRequired()])
steam_key = StringField('Steam Key')
status = SelectField('Status', choices=[
('nicht eingelöst', 'Nicht eingelöst'),
('eingelöst', 'Eingelöst'),
('geschenkt', 'Geschenkt')
])
recipient = StringField('Empfänger')
notes = TextAreaField('Notizen')
url = StringField('Store URL')
redeem_date = StringField('Einlösedatum')
steam_appid = StringField('Steam App ID')
PLATFORM_CHOICES = [
('steam', 'Steam'),
('gog', 'GOG'),
('xbox', 'XBox'),
('playstation', 'PlayStation'),
('switch', 'Nintendo Switch'),
('other', 'Other'),
('pc', 'PC')
]
STATUS_CHOICES = [
('nicht eingelöst', 'Nicht eingelöst'),
('eingelöst', 'Eingelöst'),
('geschenkt', 'Geschenkt')
]
2025-04-26 12:32:07 +00:00
with app.app_context():
db.create_all()
2025-04-21 11:56:29 +00:00
@login_manager.user_loader
def load_user(user_id):
return db.session.get(User, int(user_id))
2025-04-26 12:32:07 +00:00
def extract_steam_appid(url):
match = re.search(r'store\.steampowered\.com/app/(\d+)', url or '')
return match.group(1) if match else ''
# 404
def get_or_404(model, id):
instance = db.session.get(model, id)
if not instance:
abort(404)
return instance
# Admin Audit Helper
def log_activity(user_id, action, details=None):
"""
Store an activity log entry for auditing purposes.
"""
log = ActivityLog(
user_id=user_id,
action=action,
details=details
)
db.session.add(log)
db.session.commit()
# Game Infos Helper
def fetch_steam_data(appid, lang='en'):
lang_map = {
'en': 'english',
'de': 'german'
}
steam_lang = lang_map.get(lang, 'english')
try:
response = requests.get(
"https://store.steampowered.com/api/appdetails",
params={"appids": appid, "l": steam_lang},
timeout=15
)
data = response.json().get(str(appid), {})
if data.get("success"):
return {
"name": data["data"].get("name"),
"detailed_description": data["data"].get("detailed_description"),
"release_date": data["data"].get("release_date", {}).get("date"),
}
except Exception as e:
app.logger.error(f"Steam API error: {str(e)}")
return None
def parse_steam_release_date(date_str):
"""Parsing Steam-Release-Date (the german us thingy, you know)"""
import locale
from datetime import datetime
# try german format
try:
locale.setlocale(locale.LC_TIME, "de_DE.UTF-8")
return datetime.strptime(date_str, "%d. %b. %Y")
except Exception:
pass
# Fallback: okay lets try the english one
try:
locale.setlocale(locale.LC_TIME, "en_US.UTF-8")
return datetime.strptime(date_str, "%d %b, %Y")
except Exception:
pass
return None
def fetch_itad_slug(steam_appid: int) -> str | None:
api_key = os.getenv("ITAD_API_KEY")
if not api_key or api_key.strip() == "your-secret-key-here":
app.logger.warning("ITAD-API-Key ist nicht gesetzt oder ist ein Platzhalter.")
return None
try:
response = requests.get(
"https://api.isthereanydeal.com/games/lookup/v1",
params={"key": api_key, "appid": steam_appid, "platform": "steam"},
timeout=10
)
data = response.json()
return data.get("game", {}).get("slug")
except Exception as e:
app.logger.error(f"ITAD Error: {str(e)}")
return None
def fetch_itad_game_id(steam_appid: int) -> str | None:
api_key = os.getenv("ITAD_API_KEY")
if not api_key or api_key.strip() == "your-secret-key-here":
app.logger.warning("ITAD-API-Key ist nicht gesetzt oder ist ein Platzhalter.")
return None
try:
response = requests.get(
"https://api.isthereanydeal.com/games/lookup/v1",
params={"key": api_key, "appid": steam_appid, "platform": "steam"},
timeout=10
)
response.raise_for_status()
data = response.json()
if data.get("found") and data.get("game") and data["game"].get("id"):
return data["game"]["id"]
app.logger.error(f"ITAD Response Error: {data}")
return None
except Exception as e:
app.logger.error(f"ITAD Error: {str(e)}")
return None
def fetch_itad_prices(game_id: str) -> dict | None:
api_key = os.getenv("ITAD_API_KEY")
country = os.getenv("ITAD_COUNTRY", "DE")
if not api_key or api_key.strip() == "your-secret-key-here":
app.logger.warning("ITAD-API-Key ist nicht gesetzt oder ist ein Platzhalter.")
return None
try:
response = requests.post(
"https://api.isthereanydeal.com/games/prices/v3",
params={
"key": api_key,
"country": country,
"shops": "steam",
"vouchers": "false"
},
json=[game_id],
headers={"Content-Type": "application/json"},
timeout=15
)
response.raise_for_status()
return response.json()[0]
except Exception as e:
app.logger.error(f"ITAD-Preisabfrage fehlgeschlagen: {str(e)}")
return None
2025-04-21 11:56:29 +00:00
@app.route('/')
@login_required
def index():
search_query = request.args.get('q', '')
2025-04-26 12:32:07 +00:00
query = Game.query.filter_by(user_id=current_user.id)
2025-04-21 11:56:29 +00:00
if search_query:
query = query.filter(Game.name.ilike(f'%{search_query}%'))
2025-04-26 12:32:07 +00:00
2025-04-21 11:56:29 +00:00
games = query.order_by(Game.created_at.desc()).all()
return render_template('index.html',
games=games,
format_date=lambda dt: dt.strftime('%d.%m.%Y') if dt else '',
search_query=search_query)
@app.route('/set-lang/<lang>')
def set_lang(lang):
if lang in SUPPORTED_LANGUAGES:
2025-04-21 11:56:29 +00:00
session['lang'] = lang
session.permanent = True
2025-04-21 11:56:29 +00:00
return redirect(request.referrer or url_for('index'))
2025-04-21 11:56:29 +00:00
@app.route('/set-theme/<theme>')
def set_theme(theme):
resp = make_response('', 204)
resp.set_cookie('theme', theme, max_age=60*60*24*365)
2025-04-21 11:56:29 +00:00
return resp
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated: # Prevent already logged-in users from accessing login page
return redirect(url_for('index'))
2025-04-21 11:56:29 +00:00
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
remember = request.form.get('remember_me') == 'true'
2025-04-21 11:56:29 +00:00
user = User.query.filter_by(username=username).first()
2025-04-21 11:56:29 +00:00
if user and check_password_hash(user.password, password):
# Pass remember=True to login_user and set duration
# The duration will be taken from app.config['REMEMBER_COOKIE_DURATION']
login_user(user, remember=remember)
# Log activity
log_activity(user.id, 'user_login', f"User '{user.username}' logged in.")
next_page = request.args.get('next')
# Add security check for next_page to prevent open redirect
if not next_page or urlparse(next_page).netloc != '':
next_page = url_for('index')
flash(translate('Logged in successfully.'), 'success')
return redirect(next_page)
else:
flash(translate('Invalid username or password.'), 'danger')
2025-04-21 11:56:29 +00:00
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
2025-04-26 12:32:07 +00:00
if not app.config['REGISTRATION_ENABLED']:
abort(403)
2025-04-21 11:56:29 +00:00
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
existing_user = User.query.filter_by(username=username).first()
if existing_user:
flash(translate('Username already exists'), 'error')
2025-04-21 11:56:29 +00:00
return redirect(url_for('register'))
# make the first user admin
is_admin = User.query.count() == 0
new_user = User(
username=username,
password=generate_password_hash(password),
is_admin=is_admin
)
2025-04-21 11:56:29 +00:00
db.session.add(new_user)
db.session.commit()
login_user(new_user)
flash(translate('Registration successful'), 'success')
2025-04-21 11:56:29 +00:00
return redirect(url_for('index'))
2025-04-21 11:56:29 +00:00
return render_template('register.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('login'))
2025-04-26 12:32:07 +00:00
@app.route('/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
if request.method == 'POST':
current_password = request.form['current_password']
new_password = request.form['new_password']
confirm_password = request.form['confirm_password']
2025-04-21 11:56:29 +00:00
2025-04-26 12:32:07 +00:00
if not check_password_hash(current_user.password, current_password):
flash(translate('Current passwort is wrong'), 'danger')
2025-04-26 12:32:07 +00:00
return redirect(url_for('change_password'))
if new_password != confirm_password:
flash(translate('New Passwords are not matching'), 'danger')
2025-04-26 12:32:07 +00:00
return redirect(url_for('change_password'))
current_user.password = generate_password_hash(new_password)
db.session.commit()
flash(translate('Password changed successfully', session.get('lang', 'en')), 'success')
2025-04-26 12:32:07 +00:00
return redirect(url_for('index'))
return render_template('change_password.html')
2025-04-21 11:56:29 +00:00
@app.route('/add', methods=['GET', 'POST'])
@login_required
def add_game():
if request.method == 'POST':
try:
url = request.form.get('url', '')
steam_appid = request.form.get('steam_appid', '').strip()
2025-04-26 12:32:07 +00:00
2025-04-21 11:56:29 +00:00
if not steam_appid:
steam_appid = extract_steam_appid(url)
steam_key = request.form['steam_key']
if Game.query.filter_by(steam_key=steam_key).first():
flash(translate('Steam Key already exists!'), 'error')
return redirect(url_for('add_game'))
2025-04-21 11:56:29 +00:00
new_game = Game(
name=request.form['name'],
steam_key=steam_key,
2025-04-21 11:56:29 +00:00
status=request.form['status'],
recipient=request.form.get('recipient', ''),
notes=request.form.get('notes', ''),
url=url,
2025-04-22 11:45:13 +00:00
steam_appid=steam_appid,
2025-04-21 11:56:29 +00:00
redeem_date=datetime.strptime(request.form['redeem_date'], '%Y-%m-%d') if request.form['redeem_date'] else None,
user_id=current_user.id
)
2025-04-26 12:32:07 +00:00
2025-04-21 11:56:29 +00:00
db.session.add(new_game)
db.session.commit()
flash(translate('Game added successfully!'), 'success')
2025-04-21 11:56:29 +00:00
return redirect(url_for('index'))
2025-04-26 12:32:07 +00:00
except IntegrityError as e:
2025-04-26 12:32:07 +00:00
db.session.rollback()
if "UNIQUE constraint failed: game.steam_key" in str(e):
flash(translate('Steam Key already exists!'), 'error')
else:
flash(translate('Database error: %(error)s', error=str(e)), 'error')
2025-04-21 11:56:29 +00:00
except Exception as e:
db.session.rollback()
flash(translate('Error: %(error)s', error=str(e)), 'error')
2025-04-26 12:32:07 +00:00
return render_template(
'add_game.html',
platforms=PLATFORM_CHOICES,
statuses=STATUS_CHOICES
)
2025-04-21 11:56:29 +00:00
@app.route('/edit/<int:game_id>', methods=['GET', 'POST'])
@login_required
def edit_game(game_id):
# Eager Loading für Tokens
game = Game.query.options(joinedload(Game.redeem_tokens)).get_or_404(game_id)
def safe_parse_date(date_str):
try:
naive = datetime.strptime(date_str, '%Y-%m-%d') if date_str else None
return local_tz.localize(naive) if naive else None
except ValueError:
return None
2025-04-22 11:45:13 +00:00
2025-04-21 11:56:29 +00:00
if request.method == 'POST':
try:
# Validation
if not request.form.get('name') or not request.form.get('steam_key'):
flash(translate('Name and Steam Key are required'), 'error')
return redirect(url_for('edit_game', game_id=game_id))
# Duplicate check
existing = Game.query.filter(
Game.steam_key == request.form['steam_key'],
Game.id != game.id,
Game.user_id == current_user.id
).first()
if existing:
flash(translate('Steam Key already exists'), 'error')
return redirect(url_for('edit_game', game_id=game_id))
# Update fields
2025-04-21 11:56:29 +00:00
game.name = request.form['name']
game.steam_key = request.form['steam_key']
game.status = request.form['status']
game.platform = request.form.get('platform', 'pc')
2025-04-21 11:56:29 +00:00
game.recipient = request.form.get('recipient', '')
game.notes = request.form.get('notes', '')
game.url = request.form.get('url', '')
game.steam_appid = request.form.get('steam_appid', '')
game.redeem_date = safe_parse_date(request.form.get('redeem_date', ''))
2025-04-26 12:32:07 +00:00
# Token-Logic
if game.status == 'geschenkt':
# Vorhandene Tokens löschen
RedeemToken.query.filter_by(game_id=game.id).delete()
# Generate new Token
token = secrets.token_urlsafe(12)[:17]
expires = datetime.now(local_tz) + timedelta(hours=24)
new_token = RedeemToken(
token=token,
game_id=game.id,
expires=expires,
total_hours=24
)
db.session.add(new_token)
2025-04-21 11:56:29 +00:00
db.session.commit()
flash(translate('Changes saved successfully'), 'success')
2025-04-21 11:56:29 +00:00
return redirect(url_for('index'))
except IntegrityError as e:
db.session.rollback()
app.logger.error(f"IntegrityError: {traceback.format_exc()}")
flash(translate('Database error: {error}', error=str(e.orig)), 'error')
2025-04-21 11:56:29 +00:00
except Exception as e:
db.session.rollback()
app.logger.error(f"Unexpected error: {traceback.format_exc()}")
flash(translate('Unexpected error: {error}', error=str(e)), 'error')
return render_template(
'edit_game.html',
game=game,
platforms=PLATFORM_CHOICES,
statuses=STATUS_CHOICES,
redeem_date=game.redeem_date.strftime('%Y-%m-%d') if game.redeem_date else ''
)
2025-04-26 12:32:07 +00:00
2025-04-21 11:56:29 +00:00
@app.route('/delete/<int:game_id>', methods=['POST'])
@login_required
def delete_game(game_id):
game = Game.query.get_or_404(game_id)
db.session.delete(game)
db.session.commit()
flash(translate('Game deleted successfully'), 'success')
2025-04-21 11:56:29 +00:00
return redirect(url_for('index'))
2025-04-22 11:45:13 +00:00
@app.route('/export', methods=['GET'])
@login_required
def export_games():
games = Game.query.filter_by(user_id=current_user.id).all()
output = io.StringIO()
writer = csv.writer(output)
2025-04-26 12:32:07 +00:00
2025-04-22 11:45:13 +00:00
writer.writerow(['Name', 'Steam Key', 'Status', 'Recipient', 'Notes', 'URL', 'Created', 'Redeem by', 'Steam AppID'])
2025-04-26 12:32:07 +00:00
2025-04-22 11:45:13 +00:00
for game in games:
writer.writerow([
2025-04-26 12:32:07 +00:00
game.name,
game.steam_key,
game.status,
game.recipient,
game.notes,
game.url,
game.created_at.strftime('%Y-%m-%d %H:%M:%S') if game.created_at else '',
2025-04-22 11:45:13 +00:00
game.redeem_date.strftime('%Y-%m-%d') if game.redeem_date else '',
game.steam_appid
])
2025-04-26 12:32:07 +00:00
2025-04-22 11:45:13 +00:00
output.seek(0)
return send_file(
io.BytesIO(output.getvalue().encode('utf-8')),
mimetype='text/csv',
as_attachment=True,
download_name='games_export.csv'
)
2025-04-26 12:32:07 +00:00
@app.route('/export_pdf')
@login_required
def export_pdf():
excluded_statuses = ['eingelöst', 'verschenkt']
games = Game.query.filter(
Game.user_id == current_user.id,
Game.status.notin_(excluded_statuses)
).order_by(Game.created_at.desc()).all()
buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer,
pagesize=landscape(A4),
leftMargin=40,
rightMargin=40,
topMargin=40,
bottomMargin=40
)
styles = getSampleStyleSheet()
elements = []
img_height = 2*cm
# Title
elements.append(Paragraph(
translate("Game List (without Keys)", lang=session.get('lang', 'en')),
styles['Title']
))
2025-04-26 12:32:07 +00:00
elements.append(Spacer(1, 12))
# Table header
2025-04-26 12:32:07 +00:00
col_widths = [
5*cm, 10*cm, 6*cm, 3*cm
]
data = [[
Paragraph('<b>Cover</b>', styles['Normal']),
Paragraph('<b>Name</b>', styles['Normal']),
Paragraph('<b>Shop-Link</b>', styles['Normal']),
Paragraph('<b>Einlösen bis</b>', styles['Normal'])
]]
for game in games:
img = None
if game.steam_appid:
try:
img_url = f"https://cdn.cloudflare.steamstatic.com/steam/apps/{game.steam_appid}/header.jpg"
img_data = io.BytesIO(requests.get(img_url, timeout=5).content)
img = Image(img_data, width=3*cm, height=img_height)
except Exception:
img = Paragraph('', styles['Normal'])
elif game.url and 'gog.com' in game.url:
try:
img_path = os.path.join(app.root_path, 'static', 'gog_logo.webp')
img = Image(img_path, width=3*cm, height=img_height)
except Exception:
img = Paragraph('', styles['Normal'])
2025-04-26 12:32:07 +00:00
data.append([
img or '',
Paragraph(game.name, styles['Normal']),
Paragraph(game.url or '', styles['Normal']),
game.redeem_date.strftime('%d.%m.%y') if game.redeem_date else ''
])
# Table format
2025-04-26 12:32:07 +00:00
table = Table(data, colWidths=col_widths, repeatRows=1)
table.setStyle(TableStyle([
('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold'),
('FONTSIZE', (0,0), (-1,0), 8),
('FONTSIZE', (0,1), (-1,-1), 8),
('VALIGN', (0,0), (-1,-1), 'MIDDLE'),
('ALIGN', (0,0), (-1,-1), 'LEFT'),
('GRID', (0,0), (-1,-1), 0.5, colors.lightgrey),
('WORDWRAP', (1,1), (1,-1), 'CJK'),
]))
elements.append(table)
doc.build(elements)
buffer.seek(0)
return send_file(
2025-04-26 12:32:07 +00:00
buffer,
mimetype='application/pdf',
as_attachment=True,
download_name=f'game_export_{datetime.now().strftime("%Y%m%d")}.pdf'
)
2025-04-22 11:45:13 +00:00
@app.route('/import', methods=['GET', 'POST'])
@login_required
def import_games():
if request.method == 'POST':
file = request.files.get('file')
2025-04-26 12:32:07 +00:00
2025-04-22 11:45:13 +00:00
if file and file.filename.endswith('.csv'):
2025-04-26 12:32:07 +00:00
stream = io.StringIO(file.stream.read().decode("UTF8"))
2025-04-22 11:45:13 +00:00
reader = csv.DictReader(stream)
2025-04-26 12:32:07 +00:00
new_games = 0
duplicates = 0
try:
with db.session.begin_nested():
for row in reader:
steam_key = row['Steam Key'].strip()
if Game.query.filter_by(steam_key=steam_key).first():
duplicates += 1
continue
game = Game(
name=row['Name'],
steam_key=steam_key,
status=row['Status'],
recipient=row.get('Recipient', ''),
notes=row.get('Notes', ''),
url=row.get('URL', ''),
created_at=datetime.strptime(row['Created'], '%Y-%m-%d %H:%M:%S') if row.get('Created') else datetime.utcnow(),
redeem_date=datetime.strptime(row['Redeem by'], '%Y-%m-%d') if row.get('Redeem by') else None,
steam_appid=row.get('Steam AppID', ''),
user_id=current_user.id
)
db.session.add(game)
new_games += 1
db.session.commit()
flash(translate("new_games_imported", new=new_games, dup=duplicates), 'success')
2025-04-26 12:32:07 +00:00
except Exception as e:
db.session.rollback()
flash(translate('Import error: {error}', error=str(e)), 'danger')
2025-04-26 12:32:07 +00:00
2025-04-22 11:45:13 +00:00
return redirect(url_for('index'))
2025-04-26 12:32:07 +00:00
flash(translate('Please upload a valid CSV file.'), 'danger')
2025-04-26 12:32:07 +00:00
2025-04-22 11:45:13 +00:00
return render_template('import.html')
2025-04-21 11:56:29 +00:00
2025-04-26 12:32:07 +00:00
@app.route('/generate_redeem/<int:game_id>', methods=['POST'])
@login_required
def generate_redeem(game_id):
game = Game.query.get_or_404(game_id)
if game.user_id != current_user.id or game.status != 'geschenkt':
return jsonify({'error': translate('Forbidden')}), 403
2025-04-26 12:32:07 +00:00
try:
RedeemToken.query.filter_by(game_id=game_id).delete()
token = secrets.token_urlsafe(12)[:17]
expires = datetime.now(local_tz) + timedelta(hours=24)
2025-04-26 12:32:07 +00:00
new_token = RedeemToken(
token=token,
game_id=game_id,
expires=expires,
total_hours=24
)
db.session.add(new_token)
db.session.commit()
2025-05-10 14:14:22 +00:00
redeem_url = url_for('redeem', token=token, _external=True, _scheme='https')
message = translate(
'Redeem link generated: <a href="{url}" target="_blank">{url}</a>',
url=redeem_url
)
return jsonify({'url': redeem_url, 'message': message})
2025-04-26 12:32:07 +00:00
except Exception as e:
db.session.rollback()
2025-04-26 12:32:07 +00:00
return jsonify({'error': str(e)}), 500
@app.route('/redeem/<token>', endpoint='redeem')
2025-04-26 12:32:07 +00:00
def redeem_page(token):
redeem_token = RedeemToken.query.filter_by(token=token).first()
if not redeem_token:
abort(404)
expires_utc = redeem_token.expires.astimezone(pytz.UTC)
if datetime.now(pytz.UTC) > expires_utc:
2025-04-26 12:32:07 +00:00
db.session.delete(redeem_token)
db.session.commit()
abort(404)
game = Game.query.get(redeem_token.game_id)
redeem_token.used = True
db.session.commit()
2025-05-10 14:14:22 +00:00
if game.platform == 'steam':
platform_link = 'https://store.steampowered.com/account/registerkey?key='
2025-05-10 14:14:22 +00:00
platform_name = 'Steam'
elif game.platform == 'gog':
platform_link = 'https://www.gog.com/redeem/'
2025-05-10 14:14:22 +00:00
platform_name = 'GOG'
elif game.platform == 'xbox':
platform_link = 'https://redeem.microsoft.com/'
2025-05-10 14:14:22 +00:00
platform_name = 'Xbox'
elif game.platform == 'playstation':
platform_link = 'https://redeem.playstation.com/'
platform_name = 'PlayStation'
elif game.platform == 'switch':
platform_link = 'https://ec.nintendo.com/redeem/'
platform_name = 'Nintendo Switch'
else:
2025-05-10 14:14:22 +00:00
# Fallback für benutzerdefinierte Keys
platform_link = ''
platform_name = 'Key'
return render_template(
'redeem.html',
game=game,
redeem_token=redeem_token,
expires_timestamp=int(expires_utc.timestamp() * 1000),
platform_link=platform_link,
2025-05-10 14:14:22 +00:00
platform_name=platform_name
)
2025-05-10 14:14:22 +00:00
@app.route('/admin/users')
@login_required
@admin_required
def admin_users():
users = User.query.all()
return render_template('admin_users.html', users=users)
@app.route('/admin/users/delete/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def admin_delete_user(user_id):
if current_user.id == user_id:
flash(translate('You cannot delete yourself'), 'error')
return redirect(url_for('admin_users'))
user = User.query.get_or_404(user_id)
db.session.delete(user)
db.session.commit()
log_activity(
current_user.id,
'user_deleted',
f"Deleted user: {user.username} (ID: {user.id})"
)
flash(translate('User deleted successfully'), 'success')
return redirect(url_for('admin_users'))
@app.route('/admin/users/reset_password/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def admin_reset_password(user_id):
user = User.query.get_or_404(user_id)
new_password = secrets.token_urlsafe(8)
user.password = generate_password_hash(new_password)
db.session.commit()
log_activity(
current_user.id,
'user_newpassword',
f"New password for user: {user.username} (ID: {user.id})"
)
flash(
translate('New password for {username}: {password}',
username=user.username,
password=new_password),
'info'
)
return redirect(url_for('admin_users'))
@app.route('/admin/audit-logs')
@login_required
@admin_required
def admin_audit_logs():
page = request.args.get('page', 1, type=int)
logs = ActivityLog.query.order_by(ActivityLog.timestamp.desc()).paginate(page=page, per_page=20)
return render_template('admin_audit_logs.html', logs=logs)
@app.route('/game/<int:game_id>/update', methods=['POST'])
@login_required
def update_game_data(game_id):
game = Game.query.get_or_404(game_id)
# 1. Getting Steam AppID
steam_appid = request.form.get('steam_appid', '').strip()
app.logger.info(f"🚀 Update gestartet für Game {game_id} mit AppID: {steam_appid}")
# 2. Steam-Data (Multilingual)
if steam_appid:
try:
app.logger.debug(f"🔍 Fetching Steam data for AppID: {steam_appid}")
for lang in ['en', 'de']:
steam_data = fetch_steam_data(steam_appid, lang=lang)
if steam_data:
if lang == 'en' and steam_data.get("name"):
game.name = steam_data.get("name", game.name)
setattr(game, f'steam_description_{lang}', steam_data.get("detailed_description") or "No Infos available")
if lang == 'en':
date_str = steam_data.get("release_date", {})
if date_str:
parsed_date = parse_steam_release_date(date_str)
if parsed_date:
game.release_date = local_tz.localize(parsed_date)
else:
app.logger.warning(f"Could not parse Steam release date: {date_str}")
app.logger.info("✅ Steam data successfully updated")
except Exception as e:
app.logger.error(f"💥 Kritischer Steam-Fehler: {str(e)}", exc_info=True)
flash(translate('Error during Steam query'), 'danger')
else:
app.logger.warning("⚠️ Keine Steam-AppID vorhanden, Steam-Daten werden nicht aktualisiert")
flash(translate('Steam-AppID missing, no Steam Data transferred'), 'warning')
# ITAD-Slug doings and such
itad_slug = fetch_itad_slug(steam_appid)
if itad_slug:
game.itad_slug = itad_slug
# 4. ITAD-Prices
price_data = None
if steam_appid:
try:
app.logger.debug("🔄 Starte ITAD-Abfrage...")
game.itad_game_id = fetch_itad_game_id(steam_appid)
if game.itad_game_id:
app.logger.info(f"🔑 ITAD Game ID: {game.itad_game_id}")
price_data = fetch_itad_prices(game.itad_game_id)
if price_data:
# Best price right now
all_deals = price_data.get("deals", [])
if all_deals:
best_deal = min(
all_deals,
key=lambda deal: deal.get("price", {}).get("amount", float('inf'))
)
game.current_price = best_deal.get("price", {}).get("amount")
game.current_price_shop = best_deal.get("shop", {}).get("name")
app.logger.info(f"💶 Current Best: {game.current_price}€ at {game.current_price_shop}")
else:
game.current_price = None
game.current_price_shop = None
app.logger.info(f"💶 Current Best: {game.current_price}")
game.historical_low = price_data.get("historyLow", {}).get("all", {}).get("amount")
app.logger.info(f"📉 Historical Low: {game.historical_low}")
else:
app.logger.warning("⚠️ Keine ITAD-Preisdaten erhalten")
else:
app.logger.warning("⚠️ Keine ITAD Game ID erhalten")
except Exception as e:
app.logger.error(f"💥 ITAD-API-Fehler: {str(e)}", exc_info=True)
flash(translate('Fehler bei Preisabfrage'), 'danger')
try:
db.session.commit()
flash(translate('Externe Daten erfolgreich aktualisiert!'), 'success')
app.logger.info("💾 Datenbank-Update erfolgreich")
except Exception as e:
db.session.rollback()
app.logger.error(f"💥 Datenbank-Fehler: {str(e)}", exc_info=True)
flash(translate('Fehler beim Speichern der Daten'), 'danger')
return redirect(url_for('edit_game', game_id=game_id))
@app.route('/game/<int:game_id>')
@login_required
def game_details(game_id):
game = Game.query.get_or_404(game_id)
return render_template('game_details.html', game=game)
@app.route('/debug-session')
def debug_session():
return jsonify(dict(session))
2025-04-26 12:32:07 +00:00
# Apprise Notifications
import apprise
2025-04-26 12:32:07 +00:00
def send_apprise_notification(user, game):
apprise_urls = os.getenv('APPRISE_URLS', '').strip()
if not apprise_urls:
app.logger.error("No APPRISE_URLS configured")
2025-04-26 12:32:07 +00:00
return False
apobj = apprise.Apprise()
for url in apprise_urls.replace(',', '\n').splitlines():
if url.strip():
apobj.add(url.strip())
edit_url = url_for('edit_game', game_id=game.id, _external=True)
result = apobj.notify(
title="Steam-Key läuft ab!",
body=f"Dein Key für '{game.name}' läuft in weniger als 48 Stunden ab!\n\nLink: {edit_url}",
)
return result
2025-04-26 12:32:07 +00:00
def send_notification(user, game):
return send_apprise_notification(user, game)
2025-04-26 12:32:07 +00:00
def check_expiring_keys():
now = datetime.now(local_tz)
expiry_threshold = now + timedelta(hours=48)
stmt = select(Game).where(
Game.status != 'eingelöst',
Game.redeem_date <= expiry_threshold,
Game.redeem_date > now
)
expiring_games = db.session.execute(stmt).scalars().all()
for game in expiring_games:
user = User.query.get(game.user_id)
if user.notification_service and user.notification_service != 'none':
send_notification(user, game)
2025-04-26 12:32:07 +00:00
2025-04-29 13:19:59 +00:00
# Optional: cleaning up old tokens
2025-04-26 12:32:07 +00:00
def cleanup_expired_tokens():
with app.app_context():
try:
now = datetime.now(local_tz)
expired = RedeemToken.query.filter(RedeemToken.expires < now).all()
for token in expired:
db.session.delete(token)
db.session.commit()
app.logger.info(f"Cleaned up {len(expired)} expired tokens.")
except Exception as e:
app.logger.error(f"Error during cleanup_expired_tokens: {e}")
db.session.rollback()
2025-04-26 12:32:07 +00:00
2025-04-29 13:19:59 +00:00
# Scheduler start
scheduler = BackgroundScheduler(timezone=str(local_tz))
def check_expiring_keys_job():
with app.app_context():
check_expiring_keys()
def cleanup_expired_tokens_job():
with app.app_context():
cleanup_expired_tokens()
# Add Jobs
scheduler.add_job(
check_expiring_keys_job,
'interval',
hours=int(os.getenv('CHECK_EXPIRING_KEYS_INTERVAL_HOURS', 12)),
id='check_expiring_keys'
)
scheduler.add_job(
cleanup_expired_tokens_job,
'interval',
hours=1,
id='cleanup_expired_tokens'
)
# price updates
def update_prices_job():
with app.app_context():
games = Game.query.filter(Game.steam_appid.isnot(None)).all()
for game in games:
# just update prices
itad_data = fetch_itad_data(f"app/{game.steam_appid}")
if itad_data:
game.current_price = itad_data.get('price_new')
game.historical_low = itad_data.get('price_low', {}).get('amount')
db.session.commit()
scheduler.add_job(
update_prices_job,
'interval',
hours=12,
id='update_prices'
)
def update_missing_steam_descriptions_job():
with app.app_context():
games = Game.query.filter(
(Game.steam_description_en == None) | (Game.steam_description_en == '') |
(Game.steam_description_de == None) | (Game.steam_description_de == '')
).all()
for game in games:
for lang in ['en', 'de']:
if not getattr(game, f'steam_description_{lang}', None):
steam_data = fetch_steam_data(game.steam_appid, lang=lang)
if steam_data:
setattr(game, f'steam_description_{lang}', steam_data.get('detailed_description'))
db.session.commit()
scheduler.add_job(
update_missing_steam_descriptions_job,
'interval',
hours=24,
id='update_missing_steam_descriptions'
)
2025-04-26 12:32:07 +00:00
# start Scheduler
scheduler.start()
atexit.register(lambda: scheduler.shutdown(wait=False))
2025-04-26 12:32:07 +00:00
2025-04-21 11:56:29 +00:00
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True, host='0.0.0.0', port=5000)
2025-04-26 12:32:07 +00:00