diff --git a/api/app.py b/api/app.py index 753751e..4a63dae 100644 --- a/api/app.py +++ b/api/app.py @@ -1,10 +1,10 @@ +from dotenv import load_dotenv from flask import Flask from flask_jwt_extended import JWTManager -from models import db, init_db -from user_views import user_bp -from task_views import task_bp -from dotenv import load_dotenv +from models import db import os +from task_views import task_bp +from user_views import user_bp, init_db if __name__ == "__main__": load_dotenv() diff --git a/api/models.py b/api/models.py index a44cb26..0ad9319 100644 --- a/api/models.py +++ b/api/models.py @@ -1,19 +1,7 @@ from flask_sqlalchemy import SQLAlchemy -from werkzeug.security import generate_password_hash -import os db = SQLAlchemy() -def init_db(): - """Create default admin account if database is empty""" - with db.session.begin(): - if not User.query.first(): # Check if user table is empty - admin_password = os.getenv("TODOLIST_ADMIN_PASSWORD", "admin") - hashed_password = generate_password_hash(admin_password) - admin = User(username='admin', email='admin@example.pl', password=hashed_password, role='Administrator') - db.session.add(admin) - db.session.commit() - class User(db.Model): id = db.Column(db.Integer, primary_key=True, autoincrement=True) username = db.Column(db.String(20), unique=True, nullable=False) diff --git a/api/task_views.py b/api/task_views.py index b67054f..37a665d 100644 --- a/api/task_views.py +++ b/api/task_views.py @@ -6,28 +6,9 @@ from user_views import admin_required, validate_access task_bp = Blueprint('task_bp', __name__) - -@task_bp.errorhandler(403) -def forbidden_error(error): - response = jsonify(error.description) - response.status_code = 403 - return response - - -@task_bp.errorhandler(404) -def not_found_error(error): - response = jsonify(error.description) - response.status_code = 404 - return response - - -def check_if_task_exists(task): - # Check if task exists or user has permissions to see it - if task is None: - abort(404, {'error': 'Task not found.'}) - user_id = task.user_id - validate_access(user_id) - +# ============================================================ +# 🚀 1️⃣ API ENDPOINTS (ROUTES) +# ============================================================ @task_bp.route('/tasks', methods=['GET']) @jwt_required() @@ -101,3 +82,33 @@ def delete_task(task_id): db.session.delete(task) db.session.commit() return jsonify({}) + + +# ============================================================ +# 🔧 2️⃣ UTILITIES +# ============================================================ + +def check_if_task_exists(task): + # Check if task exists or user has permissions to see it + if task is None: + abort(404, {'error': 'Task not found.'}) + user_id = task.user_id + validate_access(user_id) + + +# ============================================================ +# ❌ 3️⃣ ERROR HANDLERS +# ============================================================ + +@task_bp.errorhandler(403) +def forbidden_error(error): + response = jsonify(error.description) + response.status_code = 403 + return response + + +@task_bp.errorhandler(404) +def not_found_error(error): + response = jsonify(error.description) + response.status_code = 404 + return response \ No newline at end of file diff --git a/api/user_views.py b/api/user_views.py index 24f2d8d..ec4d639 100644 --- a/api/user_views.py +++ b/api/user_views.py @@ -1,27 +1,14 @@ from flask import Blueprint, jsonify, request, abort from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies from models import User, db +import os from werkzeug.security import check_password_hash, generate_password_hash user_bp = Blueprint('user_bp', __name__) -def admin_required(user_id, message='Access denied.'): - user = User.query.get(user_id) - if user is None or user.role != "Administrator": - abort(403, {'error': message}) - -def validate_access(owner_id, message='Access denied.'): - # Check if user try to access or edit resource that does not belong to them - logged_user_id = int(get_jwt_identity()) - logged_user_role = User.query.get(logged_user_id).role - if logged_user_role != "Administrator" and logged_user_id != owner_id: - abort(403, {'error': message}) - -@user_bp.errorhandler(403) -def forbidden_error(error): - response = jsonify(error.description) - response.status_code = 403 - return response +# ============================================================ +# 🚀 1. API ENDPOINTS (ROUTES) +# ============================================================ @user_bp.route('/users', methods=['GET']) @jwt_required() @@ -30,6 +17,7 @@ def get_all_users(): users = User.query.all() return jsonify([user.to_dict() for user in users]) + @user_bp.route('/users/', methods=['GET']) @jwt_required() def get_user(user_id): @@ -37,6 +25,7 @@ def get_user(user_id): user = User.query.get_or_404(user_id) return jsonify(user.to_dict()) + @user_bp.route('/users', methods=['POST']) def create_user(): data = request.get_json() @@ -50,6 +39,7 @@ def create_user(): db.session.commit() return jsonify(user.to_dict()), 201 + @user_bp.route('/users/', methods=['PUT']) @jwt_required() def edit_user(user_id): @@ -66,6 +56,7 @@ def edit_user(user_id): else: return abort(400, {'error': 'Incomplete user data.'}) + @user_bp.route('/users/', methods=['DELETE']) @jwt_required() def remove_user(user_id): @@ -75,6 +66,7 @@ def remove_user(user_id): db.session.commit() return jsonify({"msg": "User removed successfully."}) + @user_bp.route('/login', methods=['POST']) def user_login(): request_data = request.get_json() @@ -95,9 +87,50 @@ def user_login(): else: return jsonify({"msg": "User failed login."}), 401 + @user_bp.route('/logout', methods=['GET']) @jwt_required() def user_logout(): response = jsonify({"msg": "User logged out successfully."}) unset_jwt_cookies(response) - return response \ No newline at end of file + return response + + +# ============================================================ +# 🔧 2. UTILITIES +# ============================================================ + +def admin_required(user_id, message='Access denied.'): + user = User.query.get(user_id) + if user is None or user.role != "Administrator": + abort(403, {'error': message}) + + +def validate_access(owner_id, message='Access denied.'): + # Check if user try to access or edit resource that does not belong to them + logged_user_id = int(get_jwt_identity()) + logged_user_role = User.query.get(logged_user_id).role + if logged_user_role != "Administrator" and logged_user_id != owner_id: + abort(403, {'error': message}) + + +def init_db(): + """Create default admin account if database is empty""" + with db.session.begin(): + if not User.query.first(): # Check if user table is empty + admin_password = os.getenv("TODOLIST_ADMIN_PASSWORD", "admin") + hashed_password = generate_password_hash(admin_password) + admin = User(username='admin', email='admin@example.pl', password=hashed_password, role='Administrator') + db.session.add(admin) + db.session.commit() + + +# ============================================================ +# ❌ 3. ERROR HANDLERS +# ============================================================ + +@user_bp.errorhandler(403) +def forbidden_error(error): + response = jsonify(error.description) + response.status_code = 403 + return response