Improved code readability

This commit is contained in:
Marcin-Ramotowski 2025-03-16 11:26:01 +00:00
parent 0e28559149
commit 0cd57c0973
4 changed files with 88 additions and 56 deletions

View File

@ -1,10 +1,10 @@
from dotenv import load_dotenv
from flask import Flask
from flask_jwt_extended import JWTManager
from models import db, init_db
from user_views import user_bp
from task_views import task_bp
from dotenv import load_dotenv
from models import db
import os
from task_views import task_bp
from user_views import user_bp, init_db
if __name__ == "__main__":
load_dotenv()

View File

@ -1,19 +1,7 @@
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash
import os
db = SQLAlchemy()
def init_db():
"""Create default admin account if database is empty"""
with db.session.begin():
if not User.query.first(): # Check if user table is empty
admin_password = os.getenv("TODOLIST_ADMIN_PASSWORD", "admin")
hashed_password = generate_password_hash(admin_password)
admin = User(username='admin', email='admin@example.pl', password=hashed_password, role='Administrator')
db.session.add(admin)
db.session.commit()
class User(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(20), unique=True, nullable=False)

View File

@ -6,28 +6,9 @@ from user_views import admin_required, validate_access
task_bp = Blueprint('task_bp', __name__)
@task_bp.errorhandler(403)
def forbidden_error(error):
response = jsonify(error.description)
response.status_code = 403
return response
@task_bp.errorhandler(404)
def not_found_error(error):
response = jsonify(error.description)
response.status_code = 404
return response
def check_if_task_exists(task):
# Check if task exists or user has permissions to see it
if task is None:
abort(404, {'error': 'Task not found.'})
user_id = task.user_id
validate_access(user_id)
# ============================================================
# 🚀 1⃣ API ENDPOINTS (ROUTES)
# ============================================================
@task_bp.route('/tasks', methods=['GET'])
@jwt_required()
@ -101,3 +82,33 @@ def delete_task(task_id):
db.session.delete(task)
db.session.commit()
return jsonify({})
# ============================================================
# 🔧 2⃣ UTILITIES
# ============================================================
def check_if_task_exists(task):
# Check if task exists or user has permissions to see it
if task is None:
abort(404, {'error': 'Task not found.'})
user_id = task.user_id
validate_access(user_id)
# ============================================================
# ❌ 3⃣ ERROR HANDLERS
# ============================================================
@task_bp.errorhandler(403)
def forbidden_error(error):
response = jsonify(error.description)
response.status_code = 403
return response
@task_bp.errorhandler(404)
def not_found_error(error):
response = jsonify(error.description)
response.status_code = 404
return response

View File

@ -1,27 +1,14 @@
from flask import Blueprint, jsonify, request, abort
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies
from models import User, db
import os
from werkzeug.security import check_password_hash, generate_password_hash
user_bp = Blueprint('user_bp', __name__)
def admin_required(user_id, message='Access denied.'):
user = User.query.get(user_id)
if user is None or user.role != "Administrator":
abort(403, {'error': message})
def validate_access(owner_id, message='Access denied.'):
# Check if user try to access or edit resource that does not belong to them
logged_user_id = int(get_jwt_identity())
logged_user_role = User.query.get(logged_user_id).role
if logged_user_role != "Administrator" and logged_user_id != owner_id:
abort(403, {'error': message})
@user_bp.errorhandler(403)
def forbidden_error(error):
response = jsonify(error.description)
response.status_code = 403
return response
# ============================================================
# 🚀 1. API ENDPOINTS (ROUTES)
# ============================================================
@user_bp.route('/users', methods=['GET'])
@jwt_required()
@ -30,6 +17,7 @@ def get_all_users():
users = User.query.all()
return jsonify([user.to_dict() for user in users])
@user_bp.route('/users/<int:user_id>', methods=['GET'])
@jwt_required()
def get_user(user_id):
@ -37,6 +25,7 @@ def get_user(user_id):
user = User.query.get_or_404(user_id)
return jsonify(user.to_dict())
@user_bp.route('/users', methods=['POST'])
def create_user():
data = request.get_json()
@ -50,6 +39,7 @@ def create_user():
db.session.commit()
return jsonify(user.to_dict()), 201
@user_bp.route('/users/<int:user_id>', methods=['PUT'])
@jwt_required()
def edit_user(user_id):
@ -66,6 +56,7 @@ def edit_user(user_id):
else:
return abort(400, {'error': 'Incomplete user data.'})
@user_bp.route('/users/<int:user_id>', methods=['DELETE'])
@jwt_required()
def remove_user(user_id):
@ -75,6 +66,7 @@ def remove_user(user_id):
db.session.commit()
return jsonify({"msg": "User removed successfully."})
@user_bp.route('/login', methods=['POST'])
def user_login():
request_data = request.get_json()
@ -95,9 +87,50 @@ def user_login():
else:
return jsonify({"msg": "User failed login."}), 401
@user_bp.route('/logout', methods=['GET'])
@jwt_required()
def user_logout():
response = jsonify({"msg": "User logged out successfully."})
unset_jwt_cookies(response)
return response
return response
# ============================================================
# 🔧 2. UTILITIES
# ============================================================
def admin_required(user_id, message='Access denied.'):
user = User.query.get(user_id)
if user is None or user.role != "Administrator":
abort(403, {'error': message})
def validate_access(owner_id, message='Access denied.'):
# Check if user try to access or edit resource that does not belong to them
logged_user_id = int(get_jwt_identity())
logged_user_role = User.query.get(logged_user_id).role
if logged_user_role != "Administrator" and logged_user_id != owner_id:
abort(403, {'error': message})
def init_db():
"""Create default admin account if database is empty"""
with db.session.begin():
if not User.query.first(): # Check if user table is empty
admin_password = os.getenv("TODOLIST_ADMIN_PASSWORD", "admin")
hashed_password = generate_password_hash(admin_password)
admin = User(username='admin', email='admin@example.pl', password=hashed_password, role='Administrator')
db.session.add(admin)
db.session.commit()
# ============================================================
# ❌ 3. ERROR HANDLERS
# ============================================================
@user_bp.errorhandler(403)
def forbidden_error(error):
response = jsonify(error.description)
response.status_code = 403
return response