2025-06-12 18:42:07 +00:00

60 lines
2.1 KiB
Python

from flask import abort
from flask_jwt_extended import get_jwt_identity
from models import User, db
import os
from sqlalchemy import text
from sqlalchemy.exc import DatabaseError, InterfaceError
import time
from werkzeug.security import generate_password_hash
db_ready = False
def admin_required(user_id, message='Access denied.'):
"Check if common user try to make administrative action."
user = db.session.get(User, user_id)
if user is None or user.role != "Administrator":
abort(403, message)
def validate_access(owner_id, message='Access denied.'):
"Check if user try to access or edit resource that does not belong to them."
logged_user_id = int(get_jwt_identity())
logged_user_role = db.session.get(User, logged_user_id).role
if logged_user_role != "Administrator" and logged_user_id != owner_id:
abort(403, message)
def get_user_or_404(user_id):
"Get user from database or abort 404"
user = db.session.get(User, user_id)
if user is None:
abort(404, "User not found")
return user
def wait_for_db(max_retries):
"Try to connect with database <max_retries> times."
global db_ready
for _ in range(max_retries):
try:
with db.engine.connect() as connection:
connection.execute(text("SELECT 1"))
db_ready = True
return
except DatabaseError | InterfaceError:
time.sleep(3)
raise Exception("Failed to connect to database.")
def init_db():
"""Create default admin account if database is empty"""
with db.session.begin():
if not User.query.first(): # Check if user table is empty
admin_username = os.getenv("ADMIN_USERNAME", "admin")
admin_email = os.getenv("ADMIN_EMAIL", "admin@example.pl")
admin_password = os.getenv("ADMIN_PASSWORD", "admin")
hashed_password = generate_password_hash(admin_password)
admin = User(username=admin_username, email=admin_email, password=hashed_password, role='Administrator')
db.session.add(admin)
db.session.commit()