Added revoking token during logout
This commit is contained in:
parent
e0c8924384
commit
5d45fcf932
11
api/app.py
11
api/app.py
@ -1,8 +1,9 @@
|
|||||||
|
from datetime import timedelta
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from flask import Flask, jsonify
|
from flask import Flask, jsonify
|
||||||
from flask_jwt_extended import JWTManager
|
from flask_jwt_extended import JWTManager
|
||||||
from jwt import ExpiredSignatureError
|
from jwt import ExpiredSignatureError
|
||||||
from models import db
|
from models import db, revoked_tokens
|
||||||
import os
|
import os
|
||||||
from task_views import task_bp
|
from task_views import task_bp
|
||||||
from user_views import user_bp, init_db
|
from user_views import user_bp, init_db
|
||||||
@ -19,8 +20,9 @@ def create_app(config_name="default"):
|
|||||||
app.config["TESTING"] = True
|
app.config["TESTING"] = True
|
||||||
else:
|
else:
|
||||||
app.config["SQLALCHEMY_DATABASE_URI"] = os.getenv("SQLALCHEMY_DATABASE_URI")
|
app.config["SQLALCHEMY_DATABASE_URI"] = os.getenv("SQLALCHEMY_DATABASE_URI")
|
||||||
|
|
||||||
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
||||||
|
|
||||||
|
# JWT settings
|
||||||
app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY", "changeme")
|
app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY", "changeme")
|
||||||
|
|
||||||
# Blueprints registration
|
# Blueprints registration
|
||||||
@ -31,6 +33,11 @@ def create_app(config_name="default"):
|
|||||||
db.init_app(app)
|
db.init_app(app)
|
||||||
jwt = JWTManager(app)
|
jwt = JWTManager(app)
|
||||||
|
|
||||||
|
# Function to check if JWT token is revoked
|
||||||
|
@jwt.token_in_blocklist_loader
|
||||||
|
def check_if_token_revoked(jwt_header, jwt_payload):
|
||||||
|
return jwt_payload["jti"] in revoked_tokens
|
||||||
|
|
||||||
# Global error handler
|
# Global error handler
|
||||||
@app.errorhandler(Exception)
|
@app.errorhandler(Exception)
|
||||||
def global_error_handler(error):
|
def global_error_handler(error):
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
from flask_sqlalchemy import SQLAlchemy
|
from flask_sqlalchemy import SQLAlchemy
|
||||||
|
|
||||||
db = SQLAlchemy()
|
db = SQLAlchemy()
|
||||||
|
revoked_tokens = set()
|
||||||
|
|
||||||
class User(db.Model):
|
class User(db.Model):
|
||||||
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
|
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
|
||||||
|
@ -81,6 +81,7 @@ def test_get_users(test_client):
|
|||||||
response = test_client.get("/users", headers=headers)
|
response = test_client.get("/users", headers=headers)
|
||||||
assert response.status_code == 200 # Admin user should can get all users data
|
assert response.status_code == 200 # Admin user should can get all users data
|
||||||
|
|
||||||
|
|
||||||
def test_get_user_with_token(test_client):
|
def test_get_user_with_token(test_client):
|
||||||
"""Test to get user data before and after auth using JWT token"""
|
"""Test to get user data before and after auth using JWT token"""
|
||||||
admin_pass = generate_password_hash("admin_pass")
|
admin_pass = generate_password_hash("admin_pass")
|
||||||
@ -111,3 +112,19 @@ def test_get_user_with_token(test_client):
|
|||||||
assert response.status_code == 403 # Common user cannot get other user data
|
assert response.status_code == 403 # Common user cannot get other user data
|
||||||
response = test_client.get(f"/users/{user.id}", headers=admin_headers)
|
response = test_client.get(f"/users/{user.id}", headers=admin_headers)
|
||||||
assert response.status_code == 200 # Admin can access all user data
|
assert response.status_code == 200 # Admin can access all user data
|
||||||
|
|
||||||
|
|
||||||
|
def test_user_logout(test_client):
|
||||||
|
"""Test if logout work and JWT token is revoked"""
|
||||||
|
hashed_pass = generate_password_hash("testpass")
|
||||||
|
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
|
||||||
|
db.session.add(user)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
access_token = create_access_token(identity=str(user.id))
|
||||||
|
headers = {"Authorization": f"Bearer {access_token}"}
|
||||||
|
|
||||||
|
response = test_client.get(f"/logout", headers=headers)
|
||||||
|
assert response.status_code == 200 # Logged user can logout
|
||||||
|
response = test_client.get(f"/logout", headers=headers)
|
||||||
|
assert response.status_code == 401 # Token should be revoked after logout
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
from flask import Blueprint, jsonify, request, abort
|
from flask import Blueprint, jsonify, request, abort
|
||||||
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies
|
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, \
|
||||||
from models import User, db
|
verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies, get_jwt
|
||||||
|
from models import User, db, revoked_tokens
|
||||||
import os
|
import os
|
||||||
from werkzeug.security import check_password_hash, generate_password_hash
|
from werkzeug.security import check_password_hash, generate_password_hash
|
||||||
|
|
||||||
@ -102,6 +103,8 @@ def user_login():
|
|||||||
@user_bp.route('/logout', methods=['GET'])
|
@user_bp.route('/logout', methods=['GET'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
def user_logout():
|
def user_logout():
|
||||||
|
jti = get_jwt()["jti"]
|
||||||
|
revoked_tokens.add(jti)
|
||||||
response = jsonify({"msg": "User logged out successfully."})
|
response = jsonify({"msg": "User logged out successfully."})
|
||||||
unset_jwt_cookies(response)
|
unset_jwt_cookies(response)
|
||||||
return response
|
return response
|
||||||
|
Loading…
x
Reference in New Issue
Block a user