Compare commits

..

No commits in common. "37e89b60afe03ec3fb98e127ff2b0b5bde7bd0da" and "3ad5f0ca94220a2a7f8fb7ae6a63c70b82b7dcbe" have entirely different histories.

6 changed files with 93 additions and 85 deletions

View File

@ -2,64 +2,46 @@ from dotenv import load_dotenv
from flask import Flask, jsonify
from flask_jwt_extended import JWTManager
from jwt import ExpiredSignatureError
from models import db, RevokedToken
from models import db
import os
from utils import init_db
from views import user_bp
from werkzeug.exceptions import HTTPException
def create_app(config_name="default"):
"""Creates and returns a new instance of Flask app."""
load_dotenv()
app = Flask(__name__)
# Database settings
if config_name == "testing":
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:" # Database in memory
app.config["TESTING"] = True
else:
app.config["SQLALCHEMY_DATABASE_URI"] = os.getenv("SQLALCHEMY_DATABASE_URI")
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
# App initialization
load_dotenv()
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('SQLALCHEMY_DATABASE_URI')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['JWT_SECRET_KEY'] = os.getenv('JWT_SECRET_KEY', 'changeme')
# JWT settings
app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY", "changeme")
# Blueprint registration
app.register_blueprint(user_bp)
# Blueprints registration
app.register_blueprint(user_bp)
# Database and JWT initialization
db.init_app(app)
jwt = JWTManager(app)
# Database and JWT initialization
db.init_app(app)
jwt = JWTManager(app)
# Function to check if JWT token is revoked
@jwt.token_in_blocklist_loader
def check_if_token_revoked(jwt_header, jwt_payload):
token = db.session.get(RevokedToken, jwt_payload["jti"])
return token is not None
# Global error handler
@app.errorhandler(Exception)
def global_error_handler(error):
if isinstance(error, HTTPException):
response = jsonify({"error": error.description})
response.status_code = error.code
elif isinstance(error, ExpiredSignatureError):
response = jsonify({"error": "Token has expired"})
response.status_code = 401
else: # All other errors
response = jsonify({"error": str(error)})
response.status_code = 500
return response
# Fill database by initial values (only if we are not testing)
with app.app_context():
db.create_all()
if config_name != "testing":
init_db()
return app
# Global error handler
@app.errorhandler(Exception)
def global_error_handler(error):
if isinstance(error, HTTPException):
response = jsonify({"error": error.description})
response.status_code = error.code
elif isinstance(error, ExpiredSignatureError):
response = jsonify({"error": "Token has expired"})
response.status_code = 401
else: # Wszystkie inne błędy
response = jsonify({"error": str(error)})
response.status_code = 500
return response
# Server start only if we run app directly
# Fill database by initial values
with app.app_context():
db.create_all()
init_db()
# Server start
if __name__ == "__main__":
app = create_app()
app.run(host="0.0.0.0")
app.run(host='0.0.0.0')

View File

@ -15,6 +15,3 @@ class User(db.Model):
@staticmethod
def get_editable_fields():
return {"username", "email", "role", "password"}
class RevokedToken(db.Model):
jti = db.Column(db.String(100), primary_key=True)

View File

@ -81,7 +81,6 @@ def test_get_users(test_client):
response = test_client.get("/users", headers=headers)
assert response.status_code == 200 # Admin user should can get all users data
def test_get_user_with_token(test_client):
"""Test to get user data before and after auth using JWT token"""
admin_pass = generate_password_hash("admin_pass")
@ -112,19 +111,3 @@ def test_get_user_with_token(test_client):
assert response.status_code == 403 # Common user cannot get other user data
response = test_client.get(f"/users/{user.id}", headers=admin_headers)
assert response.status_code == 200 # Admin can access all user data
def test_user_logout(test_client):
"""Test if logout work and JWT token is revoked"""
hashed_pass = generate_password_hash("testpass")
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
response = test_client.get(f"/logout", headers=headers)
assert response.status_code == 200 # Logged user can logout
response = test_client.get(f"/logout", headers=headers)
assert response.status_code == 401 # Token should be revoked after logout

55
api/tests/tests.py Normal file
View File

@ -0,0 +1,55 @@
import json
from models import User, db
from flask_jwt_extended import create_access_token
from werkzeug.security import generate_password_hash
def test_create_user(test_client):
"""New user registration test"""
response = test_client.post(
"/users",
data=json.dumps({"username": "testuser", "email": "test@example.com", "password": "testpass", "role": "User"}),
content_type="application/json",
)
assert response.status_code == 201 # User should be created successfully
data = response.get_json()
assert data["username"] == "testuser"
def test_login(test_client):
"""User login test"""
hashed_pass = generate_password_hash("testpass")
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
db.session.add(user)
db.session.commit()
response = test_client.post(
"/login",
data=json.dumps({"username": "testuser", "password": "wrongpass"}),
content_type="application/json",
)
assert response.status_code == 401 # User should not be logged - wrong password
response = test_client.post(
"/login",
data=json.dumps({"username": "testuser", "password": "testpass"}),
content_type="application/json",
)
assert response.status_code == 200 # User should be logged - right password
def test_get_users(test_client):
"""Get all users test - JWT required"""
response = test_client.get("/users")
assert response.status_code == 401
def test_get_user_with_token(test_client):
"""Test to get user data after auth using JWT token"""
user = User(username="admin", email="admin@example.com", password="hashed_pass", role="Administrator")
print(user.id)
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
response = test_client.get(f"/users/{user.id}", headers=headers)
assert response.status_code == 200
data = response.get_json()
assert data["username"] == "admin"

View File

@ -6,7 +6,7 @@ from werkzeug.security import generate_password_hash
def admin_required(user_id, message='Access denied.'):
user = db.session.get(User, user_id)
user = User.query.get(user_id)
if user is None or user.role != "Administrator":
abort(403, message)
@ -14,7 +14,7 @@ def admin_required(user_id, message='Access denied.'):
def validate_access(owner_id, message='Access denied.'):
# Check if user try to access or edit resource that does not belong to them
logged_user_id = int(get_jwt_identity())
logged_user_role = db.session.get(User, logged_user_id).role
logged_user_role = User.query.get(logged_user_id).role
if logged_user_role != "Administrator" and logged_user_id != owner_id:
abort(403, message)

View File

@ -1,7 +1,6 @@
from flask import Blueprint, jsonify, request, abort
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, \
verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies, get_jwt
from models import db, RevokedToken, User
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies
from models import User, db
from utils import admin_required, validate_access
from werkzeug.security import check_password_hash, generate_password_hash
@ -23,9 +22,7 @@ def get_all_users():
@jwt_required()
def get_user(user_id):
validate_access(user_id) # check if user tries to read other user account details
user = db.session.get(User, user_id)
if user is None:
abort(404, "User not found.")
user = User.query.get_or_404(user_id)
return jsonify(user.to_dict())
@ -75,9 +72,7 @@ def edit_user(user_id):
@jwt_required()
def remove_user(user_id):
validate_access(user_id) # Only admin can remove other users accounts
user_to_delete = db.session.get(User, user_id)
if user_to_delete is None:
abort(404, "User not found.")
user_to_delete = User.query.get_or_404(user_id)
db.session.delete(user_to_delete)
db.session.commit()
return jsonify({"msg": "User removed successfully."})
@ -107,10 +102,6 @@ def user_login():
@user_bp.route('/logout', methods=['GET'])
@jwt_required()
def user_logout():
jti = get_jwt()["jti"]
revoked_token = RevokedToken(jti=jti)
db.session.add(revoked_token)
db.session.commit()
response = jsonify({"msg": "User logged out successfully."})
unset_jwt_cookies(response)
return response