Moved utils functions to separate file

This commit is contained in:
Marcin-Ramotowski 2025-03-19 21:02:02 +01:00
parent 31480c5c55
commit 3552c251bf
3 changed files with 36 additions and 34 deletions

View File

@ -4,7 +4,8 @@ from flask_jwt_extended import JWTManager
from jwt import ExpiredSignatureError
from models import db
import os
from views import user_bp, init_db
from utils import init_db
from views import user_bp
from werkzeug.exceptions import HTTPException
# App initialization

32
api/utils.py Normal file
View File

@ -0,0 +1,32 @@
from flask import abort
from flask_jwt_extended import get_jwt_identity
from models import User, db
import os
from werkzeug.security import generate_password_hash
def admin_required(user_id, message='Access denied.'):
user = User.query.get(user_id)
if user is None or user.role != "Administrator":
abort(403, message)
def validate_access(owner_id, message='Access denied.'):
# Check if user try to access or edit resource that does not belong to them
logged_user_id = int(get_jwt_identity())
logged_user_role = User.query.get(logged_user_id).role
if logged_user_role != "Administrator" and logged_user_id != owner_id:
abort(403, message)
def init_db():
"""Create default admin account if database is empty"""
with db.session.begin():
if not User.query.first(): # Check if user table is empty
admin_username = os.getenv("ADMIN_USERNAME", "admin")
admin_email = os.getenv("ADMIN_EMAIL", "admin@example.pl")
admin_password = os.getenv("ADMIN_PASSWORD", "admin")
hashed_password = generate_password_hash(admin_password)
admin = User(username=admin_username, email=admin_email, password=hashed_password, role='Administrator')
db.session.add(admin)
db.session.commit()

View File

@ -1,13 +1,13 @@
from flask import Blueprint, jsonify, request, abort
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies
from models import User, db
import os
from utils import admin_required, validate_access
from werkzeug.security import check_password_hash, generate_password_hash
user_bp = Blueprint('user_bp', __name__)
# ============================================================
# 🚀 1. API ENDPOINTS (ROUTES)
# API ENDPOINTS (ROUTES)
# ============================================================
@user_bp.route('/users', methods=['GET'])
@ -105,34 +105,3 @@ def user_logout():
response = jsonify({"msg": "User logged out successfully."})
unset_jwt_cookies(response)
return response
# ============================================================
# 🔧 2. UTILITIES
# ============================================================
def admin_required(user_id, message='Access denied.'):
user = User.query.get(user_id)
if user is None or user.role != "Administrator":
abort(403, message)
def validate_access(owner_id, message='Access denied.'):
# Check if user try to access or edit resource that does not belong to them
logged_user_id = int(get_jwt_identity())
logged_user_role = User.query.get(logged_user_id).role
if logged_user_role != "Administrator" and logged_user_id != owner_id:
abort(403, message)
def init_db():
"""Create default admin account if database is empty"""
with db.session.begin():
if not User.query.first(): # Check if user table is empty
admin_username = os.getenv("ADMIN_USERNAME", "admin")
admin_email = os.getenv("ADMIN_EMAIL", "admin@example.pl")
admin_password = os.getenv("ADMIN_PASSWORD", "admin")
hashed_password = generate_password_hash(admin_password)
admin = User(username=admin_username, email=admin_email, password=hashed_password, role='Administrator')
db.session.add(admin)
db.session.commit()