diff --git a/api/app.py b/api/app.py index ec6c1c8..24abd20 100644 --- a/api/app.py +++ b/api/app.py @@ -1,8 +1,9 @@ +from datetime import timedelta from dotenv import load_dotenv from flask import Flask, jsonify from flask_jwt_extended import JWTManager from jwt import ExpiredSignatureError -from models import db +from models import db, revoked_tokens import os from task_views import task_bp from user_views import user_bp, init_db @@ -19,8 +20,9 @@ def create_app(config_name="default"): app.config["TESTING"] = True else: app.config["SQLALCHEMY_DATABASE_URI"] = os.getenv("SQLALCHEMY_DATABASE_URI") - app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False + + # JWT settings app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY", "changeme") # Blueprints registration @@ -31,6 +33,11 @@ def create_app(config_name="default"): db.init_app(app) jwt = JWTManager(app) + # Function to check if JWT token is revoked + @jwt.token_in_blocklist_loader + def check_if_token_revoked(jwt_header, jwt_payload): + return jwt_payload["jti"] in revoked_tokens + # Global error handler @app.errorhandler(Exception) def global_error_handler(error): diff --git a/api/models.py b/api/models.py index 35d10d8..1f46356 100644 --- a/api/models.py +++ b/api/models.py @@ -1,6 +1,7 @@ from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() +revoked_tokens = set() class User(db.Model): id = db.Column(db.Integer, primary_key=True, autoincrement=True) diff --git a/api/tests/test_users.py b/api/tests/test_users.py index ae5d645..4b18305 100644 --- a/api/tests/test_users.py +++ b/api/tests/test_users.py @@ -81,6 +81,7 @@ def test_get_users(test_client): response = test_client.get("/users", headers=headers) assert response.status_code == 200 # Admin user should can get all users data + def test_get_user_with_token(test_client): """Test to get user data before and after auth using JWT token""" admin_pass = generate_password_hash("admin_pass") @@ -111,3 +112,19 @@ def test_get_user_with_token(test_client): assert response.status_code == 403 # Common user cannot get other user data response = test_client.get(f"/users/{user.id}", headers=admin_headers) assert response.status_code == 200 # Admin can access all user data + + +def test_user_logout(test_client): + """Test if logout work and JWT token is revoked""" + hashed_pass = generate_password_hash("testpass") + user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User") + db.session.add(user) + db.session.commit() + + access_token = create_access_token(identity=str(user.id)) + headers = {"Authorization": f"Bearer {access_token}"} + + response = test_client.get(f"/logout", headers=headers) + assert response.status_code == 200 # Logged user can logout + response = test_client.get(f"/logout", headers=headers) + assert response.status_code == 401 # Token should be revoked after logout diff --git a/api/user_views.py b/api/user_views.py index 8a1315b..4c2d518 100644 --- a/api/user_views.py +++ b/api/user_views.py @@ -1,6 +1,7 @@ from flask import Blueprint, jsonify, request, abort -from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies -from models import User, db +from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, \ +verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies, get_jwt +from models import User, db, revoked_tokens import os from werkzeug.security import check_password_hash, generate_password_hash @@ -102,6 +103,8 @@ def user_login(): @user_bp.route('/logout', methods=['GET']) @jwt_required() def user_logout(): + jti = get_jwt()["jti"] + revoked_tokens.add(jti) response = jsonify({"msg": "User logged out successfully."}) unset_jwt_cookies(response) return response