Compare commits
2 Commits
90a7a8a72c
...
3552c251bf
Author | SHA1 | Date | |
---|---|---|---|
|
3552c251bf | ||
|
31480c5c55 |
@ -4,7 +4,8 @@ from flask_jwt_extended import JWTManager
|
||||
from jwt import ExpiredSignatureError
|
||||
from models import db
|
||||
import os
|
||||
from views import user_bp, init_db
|
||||
from utils import init_db
|
||||
from views import user_bp
|
||||
from werkzeug.exceptions import HTTPException
|
||||
|
||||
# App initialization
|
||||
|
@ -15,24 +15,3 @@ class User(db.Model):
|
||||
@staticmethod
|
||||
def get_editable_fields():
|
||||
return {"username", "email", "role", "password"}
|
||||
|
||||
class Task(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
|
||||
title = db.Column(db.String(100), nullable=False)
|
||||
description = db.Column(db.Text)
|
||||
done = db.Column(db.Boolean, default=False)
|
||||
due_date = db.Column(db.DateTime)
|
||||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"id": self.id,
|
||||
"title": self.title,
|
||||
"description": self.description,
|
||||
"due_date": self.due_date,
|
||||
"done": self.done,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def get_editable_fields():
|
||||
return {"title", "description", "due_date", "done"}
|
||||
|
32
api/utils.py
Normal file
32
api/utils.py
Normal file
@ -0,0 +1,32 @@
|
||||
from flask import abort
|
||||
from flask_jwt_extended import get_jwt_identity
|
||||
from models import User, db
|
||||
import os
|
||||
from werkzeug.security import generate_password_hash
|
||||
|
||||
|
||||
def admin_required(user_id, message='Access denied.'):
|
||||
user = User.query.get(user_id)
|
||||
if user is None or user.role != "Administrator":
|
||||
abort(403, message)
|
||||
|
||||
|
||||
def validate_access(owner_id, message='Access denied.'):
|
||||
# Check if user try to access or edit resource that does not belong to them
|
||||
logged_user_id = int(get_jwt_identity())
|
||||
logged_user_role = User.query.get(logged_user_id).role
|
||||
if logged_user_role != "Administrator" and logged_user_id != owner_id:
|
||||
abort(403, message)
|
||||
|
||||
|
||||
def init_db():
|
||||
"""Create default admin account if database is empty"""
|
||||
with db.session.begin():
|
||||
if not User.query.first(): # Check if user table is empty
|
||||
admin_username = os.getenv("ADMIN_USERNAME", "admin")
|
||||
admin_email = os.getenv("ADMIN_EMAIL", "admin@example.pl")
|
||||
admin_password = os.getenv("ADMIN_PASSWORD", "admin")
|
||||
hashed_password = generate_password_hash(admin_password)
|
||||
admin = User(username=admin_username, email=admin_email, password=hashed_password, role='Administrator')
|
||||
db.session.add(admin)
|
||||
db.session.commit()
|
35
api/views.py
35
api/views.py
@ -1,13 +1,13 @@
|
||||
from flask import Blueprint, jsonify, request, abort
|
||||
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies
|
||||
from models import User, db
|
||||
import os
|
||||
from utils import admin_required, validate_access
|
||||
from werkzeug.security import check_password_hash, generate_password_hash
|
||||
|
||||
user_bp = Blueprint('user_bp', __name__)
|
||||
|
||||
# ============================================================
|
||||
# 🚀 1. API ENDPOINTS (ROUTES)
|
||||
# API ENDPOINTS (ROUTES)
|
||||
# ============================================================
|
||||
|
||||
@user_bp.route('/users', methods=['GET'])
|
||||
@ -105,34 +105,3 @@ def user_logout():
|
||||
response = jsonify({"msg": "User logged out successfully."})
|
||||
unset_jwt_cookies(response)
|
||||
return response
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 🔧 2. UTILITIES
|
||||
# ============================================================
|
||||
|
||||
def admin_required(user_id, message='Access denied.'):
|
||||
user = User.query.get(user_id)
|
||||
if user is None or user.role != "Administrator":
|
||||
abort(403, message)
|
||||
|
||||
|
||||
def validate_access(owner_id, message='Access denied.'):
|
||||
# Check if user try to access or edit resource that does not belong to them
|
||||
logged_user_id = int(get_jwt_identity())
|
||||
logged_user_role = User.query.get(logged_user_id).role
|
||||
if logged_user_role != "Administrator" and logged_user_id != owner_id:
|
||||
abort(403, message)
|
||||
|
||||
|
||||
def init_db():
|
||||
"""Create default admin account if database is empty"""
|
||||
with db.session.begin():
|
||||
if not User.query.first(): # Check if user table is empty
|
||||
admin_username = os.getenv("ADMIN_USERNAME", "admin")
|
||||
admin_email = os.getenv("ADMIN_EMAIL", "admin@example.pl")
|
||||
admin_password = os.getenv("ADMIN_PASSWORD", "admin")
|
||||
hashed_password = generate_password_hash(admin_password)
|
||||
admin = User(username=admin_username, email=admin_email, password=hashed_password, role='Administrator')
|
||||
db.session.add(admin)
|
||||
db.session.commit()
|
||||
|
Loading…
x
Reference in New Issue
Block a user