diff --git a/api/app.py b/api/app.py index 58b9139..338e739 100644 --- a/api/app.py +++ b/api/app.py @@ -4,7 +4,8 @@ from flask_jwt_extended import JWTManager from jwt import ExpiredSignatureError from models import db import os -from views import user_bp, init_db +from utils import init_db +from views import user_bp from werkzeug.exceptions import HTTPException # App initialization diff --git a/api/utils.py b/api/utils.py new file mode 100644 index 0000000..3efee09 --- /dev/null +++ b/api/utils.py @@ -0,0 +1,32 @@ +from flask import abort +from flask_jwt_extended import get_jwt_identity +from models import User, db +import os +from werkzeug.security import generate_password_hash + + +def admin_required(user_id, message='Access denied.'): + user = User.query.get(user_id) + if user is None or user.role != "Administrator": + abort(403, message) + + +def validate_access(owner_id, message='Access denied.'): + # Check if user try to access or edit resource that does not belong to them + logged_user_id = int(get_jwt_identity()) + logged_user_role = User.query.get(logged_user_id).role + if logged_user_role != "Administrator" and logged_user_id != owner_id: + abort(403, message) + + +def init_db(): + """Create default admin account if database is empty""" + with db.session.begin(): + if not User.query.first(): # Check if user table is empty + admin_username = os.getenv("ADMIN_USERNAME", "admin") + admin_email = os.getenv("ADMIN_EMAIL", "admin@example.pl") + admin_password = os.getenv("ADMIN_PASSWORD", "admin") + hashed_password = generate_password_hash(admin_password) + admin = User(username=admin_username, email=admin_email, password=hashed_password, role='Administrator') + db.session.add(admin) + db.session.commit() diff --git a/api/views.py b/api/views.py index 45f3dad..ac6bb9d 100644 --- a/api/views.py +++ b/api/views.py @@ -1,13 +1,13 @@ from flask import Blueprint, jsonify, request, abort from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies from models import User, db -import os +from utils import admin_required, validate_access from werkzeug.security import check_password_hash, generate_password_hash user_bp = Blueprint('user_bp', __name__) # ============================================================ -# 🚀 1. API ENDPOINTS (ROUTES) +# API ENDPOINTS (ROUTES) # ============================================================ @user_bp.route('/users', methods=['GET']) @@ -105,34 +105,3 @@ def user_logout(): response = jsonify({"msg": "User logged out successfully."}) unset_jwt_cookies(response) return response - - -# ============================================================ -# 🔧 2. UTILITIES -# ============================================================ - -def admin_required(user_id, message='Access denied.'): - user = User.query.get(user_id) - if user is None or user.role != "Administrator": - abort(403, message) - - -def validate_access(owner_id, message='Access denied.'): - # Check if user try to access or edit resource that does not belong to them - logged_user_id = int(get_jwt_identity()) - logged_user_role = User.query.get(logged_user_id).role - if logged_user_role != "Administrator" and logged_user_id != owner_id: - abort(403, message) - - -def init_db(): - """Create default admin account if database is empty""" - with db.session.begin(): - if not User.query.first(): # Check if user table is empty - admin_username = os.getenv("ADMIN_USERNAME", "admin") - admin_email = os.getenv("ADMIN_EMAIL", "admin@example.pl") - admin_password = os.getenv("ADMIN_PASSWORD", "admin") - hashed_password = generate_password_hash(admin_password) - admin = User(username=admin_username, email=admin_email, password=hashed_password, role='Administrator') - db.session.add(admin) - db.session.commit()