Improved task data validation and error handling
This commit is contained in:
parent
684b667f3a
commit
9d3ea22fe8
52
api/app.py
52
api/app.py
@ -1,22 +1,48 @@
|
|||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from flask import Flask
|
from flask import Flask, jsonify
|
||||||
from flask_jwt_extended import JWTManager
|
from flask_jwt_extended import JWTManager
|
||||||
|
from jwt import ExpiredSignatureError
|
||||||
from models import db
|
from models import db
|
||||||
import os
|
import os
|
||||||
from task_views import task_bp
|
from task_views import task_bp
|
||||||
from user_views import user_bp, init_db
|
from user_views import user_bp, init_db
|
||||||
|
from werkzeug.exceptions import HTTPException
|
||||||
|
|
||||||
|
# App initialization
|
||||||
|
load_dotenv()
|
||||||
|
app = Flask(__name__)
|
||||||
|
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('SQLALCHEMY_DATABASE_URI')
|
||||||
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
|
||||||
|
app.config['JWT_SECRET_KEY'] = os.getenv('JWT_SECRET_KEY', 'changeme')
|
||||||
|
|
||||||
|
# Blueprints registration
|
||||||
|
app.register_blueprint(user_bp)
|
||||||
|
app.register_blueprint(task_bp)
|
||||||
|
|
||||||
|
# Database and JWT initialization
|
||||||
|
db.init_app(app)
|
||||||
|
jwt = JWTManager(app)
|
||||||
|
|
||||||
|
# Global error handler
|
||||||
|
@app.errorhandler(Exception)
|
||||||
|
def global_error_handler(error):
|
||||||
|
if isinstance(error, HTTPException):
|
||||||
|
response = jsonify({"error": error.description})
|
||||||
|
response.status_code = error.code
|
||||||
|
elif isinstance(error, ExpiredSignatureError):
|
||||||
|
response = jsonify({"error": "Token has expired"})
|
||||||
|
response.status_code = 401
|
||||||
|
else: # Wszystkie inne błędy
|
||||||
|
response = jsonify({"error": str(error)})
|
||||||
|
response.status_code = 500
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
# Fill database by initial values
|
||||||
|
with app.app_context():
|
||||||
|
db.create_all()
|
||||||
|
init_db()
|
||||||
|
|
||||||
|
# Server start
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
load_dotenv()
|
|
||||||
app = Flask(__name__)
|
|
||||||
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('SQLALCHEMY_DATABASE_URI')
|
|
||||||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
|
|
||||||
app.config['JWT_SECRET_KEY'] = os.getenv('JWT_SECRET_KEY', 'changeme')
|
|
||||||
app.register_blueprint(user_bp)
|
|
||||||
app.register_blueprint(task_bp)
|
|
||||||
db.init_app(app)
|
|
||||||
jwt = JWTManager(app)
|
|
||||||
with app.app_context():
|
|
||||||
db.create_all()
|
|
||||||
init_db()
|
|
||||||
app.run(host='0.0.0.0')
|
app.run(host='0.0.0.0')
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
|
from datetime import datetime
|
||||||
from flask import Blueprint, jsonify, request, abort
|
from flask import Blueprint, jsonify, request, abort
|
||||||
from flask_jwt_extended import jwt_required, get_jwt_identity
|
from flask_jwt_extended import jwt_required, get_jwt_identity
|
||||||
from models import Task, db
|
from models import Task, db
|
||||||
from datetime import datetime
|
|
||||||
from user_views import admin_required, validate_access
|
from user_views import admin_required, validate_access
|
||||||
|
|
||||||
task_bp = Blueprint('task_bp', __name__)
|
task_bp = Blueprint('task_bp', __name__)
|
||||||
@ -39,6 +39,7 @@ def get_tasks_by_user(user_id):
|
|||||||
@jwt_required()
|
@jwt_required()
|
||||||
def create_task():
|
def create_task():
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
|
validate_task_data(data)
|
||||||
due_date = datetime.strptime(data['due_date'], '%d-%m-%Y %H:%M')
|
due_date = datetime.strptime(data['due_date'], '%d-%m-%Y %H:%M')
|
||||||
task = Task(title=data['title'], description=data['description'], due_date=due_date,
|
task = Task(title=data['title'], description=data['description'], due_date=due_date,
|
||||||
done=data['done'], user_id=get_jwt_identity())
|
done=data['done'], user_id=get_jwt_identity())
|
||||||
@ -55,13 +56,14 @@ def update_task(task_id):
|
|||||||
check_if_task_exists(task)
|
check_if_task_exists(task)
|
||||||
|
|
||||||
request_data = request.get_json()
|
request_data = request.get_json()
|
||||||
|
validate_task_data(request_data)
|
||||||
request_fields = set(request_data.keys())
|
request_fields = set(request_data.keys())
|
||||||
editable_fields = Task.get_editable_fields()
|
editable_fields = Task.get_editable_fields()
|
||||||
|
|
||||||
# PUT requires all values
|
# PUT requires all values
|
||||||
if request.method == 'PUT':
|
if request.method == 'PUT':
|
||||||
if request_fields != editable_fields:
|
if request_fields != editable_fields:
|
||||||
return jsonify({'error': 'Invalid request data structure.'}), 400
|
abort(400, "Invalid request data structure.")
|
||||||
|
|
||||||
for field_name in editable_fields:
|
for field_name in editable_fields:
|
||||||
requested_value = request_data.get(field_name)
|
requested_value = request_data.get(field_name)
|
||||||
@ -91,24 +93,17 @@ def delete_task(task_id):
|
|||||||
def check_if_task_exists(task):
|
def check_if_task_exists(task):
|
||||||
# Check if task exists or user has permissions to see it
|
# Check if task exists or user has permissions to see it
|
||||||
if task is None:
|
if task is None:
|
||||||
abort(404, {'error': 'Task not found.'})
|
abort(404, "Task not found.")
|
||||||
user_id = task.user_id
|
user_id = task.user_id
|
||||||
validate_access(user_id)
|
validate_access(user_id)
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
def validate_task_data(task):
|
||||||
# ❌ 3. ERROR HANDLERS
|
due_date = task.get('due_date')
|
||||||
# ============================================================
|
try:
|
||||||
|
datetime.strptime(due_date, '%d-%m-%Y %H:%M')
|
||||||
@task_bp.errorhandler(403)
|
except ValueError:
|
||||||
def forbidden_error(error):
|
abort(400, "Incorrect datetime format. Expected DD-MM-YYYY HH:MM")
|
||||||
response = jsonify(error.description)
|
done = task.get('done')
|
||||||
response.status_code = 403
|
if done not in (0, 1):
|
||||||
return response
|
abort(400, "Incorrect done field value. Expected 0 or 1")
|
||||||
|
|
||||||
|
|
||||||
@task_bp.errorhandler(404)
|
|
||||||
def not_found_error(error):
|
|
||||||
response = jsonify(error.description)
|
|
||||||
response.status_code = 404
|
|
||||||
return response
|
|
||||||
|
@ -54,7 +54,7 @@ def edit_user(user_id):
|
|||||||
# PUT requires all values
|
# PUT requires all values
|
||||||
if request.method == 'PUT':
|
if request.method == 'PUT':
|
||||||
if request_fields != editable_fields:
|
if request_fields != editable_fields:
|
||||||
return jsonify({'error': 'Invalid request data structure.'}), 400
|
abort(400, "Invalid request data structure.")
|
||||||
|
|
||||||
user_to_update = User.query.get_or_404(user_id)
|
user_to_update = User.query.get_or_404(user_id)
|
||||||
for field_name in editable_fields:
|
for field_name in editable_fields:
|
||||||
@ -88,7 +88,7 @@ def user_login():
|
|||||||
if user_from_db is not None:
|
if user_from_db is not None:
|
||||||
password_hash = user_from_db.password
|
password_hash = user_from_db.password
|
||||||
else:
|
else:
|
||||||
return jsonify({"msg": "User failed login"}), 401
|
abort(401, "User failed login")
|
||||||
|
|
||||||
if password_hash and check_password_hash(password_hash, password):
|
if password_hash and check_password_hash(password_hash, password):
|
||||||
access_token = create_access_token(identity=str(user_from_db.id))
|
access_token = create_access_token(identity=str(user_from_db.id))
|
||||||
@ -96,7 +96,7 @@ def user_login():
|
|||||||
set_access_cookies(response, access_token)
|
set_access_cookies(response, access_token)
|
||||||
return response
|
return response
|
||||||
else:
|
else:
|
||||||
return jsonify({"msg": "User failed login."}), 401
|
abort(401, "User failed login")
|
||||||
|
|
||||||
|
|
||||||
@user_bp.route('/logout', methods=['GET'])
|
@user_bp.route('/logout', methods=['GET'])
|
||||||
@ -114,7 +114,7 @@ def user_logout():
|
|||||||
def admin_required(user_id, message='Access denied.'):
|
def admin_required(user_id, message='Access denied.'):
|
||||||
user = User.query.get(user_id)
|
user = User.query.get(user_id)
|
||||||
if user is None or user.role != "Administrator":
|
if user is None or user.role != "Administrator":
|
||||||
abort(403, {'error': message})
|
abort(403, message)
|
||||||
|
|
||||||
|
|
||||||
def validate_access(owner_id, message='Access denied.'):
|
def validate_access(owner_id, message='Access denied.'):
|
||||||
@ -122,7 +122,7 @@ def validate_access(owner_id, message='Access denied.'):
|
|||||||
logged_user_id = int(get_jwt_identity())
|
logged_user_id = int(get_jwt_identity())
|
||||||
logged_user_role = User.query.get(logged_user_id).role
|
logged_user_role = User.query.get(logged_user_id).role
|
||||||
if logged_user_role != "Administrator" and logged_user_id != owner_id:
|
if logged_user_role != "Administrator" and logged_user_id != owner_id:
|
||||||
abort(403, {'error': message})
|
abort(403, message)
|
||||||
|
|
||||||
|
|
||||||
def init_db():
|
def init_db():
|
||||||
@ -136,14 +136,3 @@ def init_db():
|
|||||||
admin = User(username=admin_username, email=admin_email, password=hashed_password, role='Administrator')
|
admin = User(username=admin_username, email=admin_email, password=hashed_password, role='Administrator')
|
||||||
db.session.add(admin)
|
db.session.add(admin)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
|
||||||
# ❌ 3. ERROR HANDLERS
|
|
||||||
# ============================================================
|
|
||||||
|
|
||||||
@user_bp.errorhandler(403)
|
|
||||||
def forbidden_error(error):
|
|
||||||
response = jsonify(error.description)
|
|
||||||
response.status_code = 403
|
|
||||||
return response
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user