Compare commits
2 Commits
90a7a8a72c
...
3552c251bf
Author | SHA1 | Date | |
---|---|---|---|
|
3552c251bf | ||
|
31480c5c55 |
@ -4,7 +4,8 @@ from flask_jwt_extended import JWTManager
|
|||||||
from jwt import ExpiredSignatureError
|
from jwt import ExpiredSignatureError
|
||||||
from models import db
|
from models import db
|
||||||
import os
|
import os
|
||||||
from views import user_bp, init_db
|
from utils import init_db
|
||||||
|
from views import user_bp
|
||||||
from werkzeug.exceptions import HTTPException
|
from werkzeug.exceptions import HTTPException
|
||||||
|
|
||||||
# App initialization
|
# App initialization
|
||||||
|
@ -15,24 +15,3 @@ class User(db.Model):
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def get_editable_fields():
|
def get_editable_fields():
|
||||||
return {"username", "email", "role", "password"}
|
return {"username", "email", "role", "password"}
|
||||||
|
|
||||||
class Task(db.Model):
|
|
||||||
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
|
|
||||||
title = db.Column(db.String(100), nullable=False)
|
|
||||||
description = db.Column(db.Text)
|
|
||||||
done = db.Column(db.Boolean, default=False)
|
|
||||||
due_date = db.Column(db.DateTime)
|
|
||||||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
|
||||||
|
|
||||||
def to_dict(self):
|
|
||||||
return {
|
|
||||||
"id": self.id,
|
|
||||||
"title": self.title,
|
|
||||||
"description": self.description,
|
|
||||||
"due_date": self.due_date,
|
|
||||||
"done": self.done,
|
|
||||||
}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get_editable_fields():
|
|
||||||
return {"title", "description", "due_date", "done"}
|
|
||||||
|
32
api/utils.py
Normal file
32
api/utils.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
from flask import abort
|
||||||
|
from flask_jwt_extended import get_jwt_identity
|
||||||
|
from models import User, db
|
||||||
|
import os
|
||||||
|
from werkzeug.security import generate_password_hash
|
||||||
|
|
||||||
|
|
||||||
|
def admin_required(user_id, message='Access denied.'):
|
||||||
|
user = User.query.get(user_id)
|
||||||
|
if user is None or user.role != "Administrator":
|
||||||
|
abort(403, message)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_access(owner_id, message='Access denied.'):
|
||||||
|
# Check if user try to access or edit resource that does not belong to them
|
||||||
|
logged_user_id = int(get_jwt_identity())
|
||||||
|
logged_user_role = User.query.get(logged_user_id).role
|
||||||
|
if logged_user_role != "Administrator" and logged_user_id != owner_id:
|
||||||
|
abort(403, message)
|
||||||
|
|
||||||
|
|
||||||
|
def init_db():
|
||||||
|
"""Create default admin account if database is empty"""
|
||||||
|
with db.session.begin():
|
||||||
|
if not User.query.first(): # Check if user table is empty
|
||||||
|
admin_username = os.getenv("ADMIN_USERNAME", "admin")
|
||||||
|
admin_email = os.getenv("ADMIN_EMAIL", "admin@example.pl")
|
||||||
|
admin_password = os.getenv("ADMIN_PASSWORD", "admin")
|
||||||
|
hashed_password = generate_password_hash(admin_password)
|
||||||
|
admin = User(username=admin_username, email=admin_email, password=hashed_password, role='Administrator')
|
||||||
|
db.session.add(admin)
|
||||||
|
db.session.commit()
|
35
api/views.py
35
api/views.py
@ -1,13 +1,13 @@
|
|||||||
from flask import Blueprint, jsonify, request, abort
|
from flask import Blueprint, jsonify, request, abort
|
||||||
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies
|
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies
|
||||||
from models import User, db
|
from models import User, db
|
||||||
import os
|
from utils import admin_required, validate_access
|
||||||
from werkzeug.security import check_password_hash, generate_password_hash
|
from werkzeug.security import check_password_hash, generate_password_hash
|
||||||
|
|
||||||
user_bp = Blueprint('user_bp', __name__)
|
user_bp = Blueprint('user_bp', __name__)
|
||||||
|
|
||||||
# ============================================================
|
# ============================================================
|
||||||
# 🚀 1. API ENDPOINTS (ROUTES)
|
# API ENDPOINTS (ROUTES)
|
||||||
# ============================================================
|
# ============================================================
|
||||||
|
|
||||||
@user_bp.route('/users', methods=['GET'])
|
@user_bp.route('/users', methods=['GET'])
|
||||||
@ -105,34 +105,3 @@ def user_logout():
|
|||||||
response = jsonify({"msg": "User logged out successfully."})
|
response = jsonify({"msg": "User logged out successfully."})
|
||||||
unset_jwt_cookies(response)
|
unset_jwt_cookies(response)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
|
||||||
# 🔧 2. UTILITIES
|
|
||||||
# ============================================================
|
|
||||||
|
|
||||||
def admin_required(user_id, message='Access denied.'):
|
|
||||||
user = User.query.get(user_id)
|
|
||||||
if user is None or user.role != "Administrator":
|
|
||||||
abort(403, message)
|
|
||||||
|
|
||||||
|
|
||||||
def validate_access(owner_id, message='Access denied.'):
|
|
||||||
# Check if user try to access or edit resource that does not belong to them
|
|
||||||
logged_user_id = int(get_jwt_identity())
|
|
||||||
logged_user_role = User.query.get(logged_user_id).role
|
|
||||||
if logged_user_role != "Administrator" and logged_user_id != owner_id:
|
|
||||||
abort(403, message)
|
|
||||||
|
|
||||||
|
|
||||||
def init_db():
|
|
||||||
"""Create default admin account if database is empty"""
|
|
||||||
with db.session.begin():
|
|
||||||
if not User.query.first(): # Check if user table is empty
|
|
||||||
admin_username = os.getenv("ADMIN_USERNAME", "admin")
|
|
||||||
admin_email = os.getenv("ADMIN_EMAIL", "admin@example.pl")
|
|
||||||
admin_password = os.getenv("ADMIN_PASSWORD", "admin")
|
|
||||||
hashed_password = generate_password_hash(admin_password)
|
|
||||||
admin = User(username=admin_username, email=admin_email, password=hashed_password, role='Administrator')
|
|
||||||
db.session.add(admin)
|
|
||||||
db.session.commit()
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user