Compare commits
4 Commits
3ad5f0ca94
...
37e89b60af
Author | SHA1 | Date | |
---|---|---|---|
|
37e89b60af | ||
|
2632bdf994 | ||
|
8637eaa96f | ||
|
99dd5148b1 |
82
api/app.py
82
api/app.py
@ -2,46 +2,64 @@ from dotenv import load_dotenv
|
|||||||
from flask import Flask, jsonify
|
from flask import Flask, jsonify
|
||||||
from flask_jwt_extended import JWTManager
|
from flask_jwt_extended import JWTManager
|
||||||
from jwt import ExpiredSignatureError
|
from jwt import ExpiredSignatureError
|
||||||
from models import db
|
from models import db, RevokedToken
|
||||||
import os
|
import os
|
||||||
from utils import init_db
|
from utils import init_db
|
||||||
from views import user_bp
|
from views import user_bp
|
||||||
from werkzeug.exceptions import HTTPException
|
from werkzeug.exceptions import HTTPException
|
||||||
|
|
||||||
# App initialization
|
def create_app(config_name="default"):
|
||||||
load_dotenv()
|
"""Creates and returns a new instance of Flask app."""
|
||||||
app = Flask(__name__)
|
load_dotenv()
|
||||||
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('SQLALCHEMY_DATABASE_URI')
|
app = Flask(__name__)
|
||||||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
|
|
||||||
app.config['JWT_SECRET_KEY'] = os.getenv('JWT_SECRET_KEY', 'changeme')
|
# Database settings
|
||||||
|
if config_name == "testing":
|
||||||
|
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:" # Database in memory
|
||||||
|
app.config["TESTING"] = True
|
||||||
|
else:
|
||||||
|
app.config["SQLALCHEMY_DATABASE_URI"] = os.getenv("SQLALCHEMY_DATABASE_URI")
|
||||||
|
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
||||||
|
|
||||||
# Blueprint registration
|
# JWT settings
|
||||||
app.register_blueprint(user_bp)
|
app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY", "changeme")
|
||||||
|
|
||||||
# Database and JWT initialization
|
# Blueprints registration
|
||||||
db.init_app(app)
|
app.register_blueprint(user_bp)
|
||||||
jwt = JWTManager(app)
|
|
||||||
|
|
||||||
# Global error handler
|
# Database and JWT initialization
|
||||||
@app.errorhandler(Exception)
|
db.init_app(app)
|
||||||
def global_error_handler(error):
|
jwt = JWTManager(app)
|
||||||
if isinstance(error, HTTPException):
|
|
||||||
response = jsonify({"error": error.description})
|
# Function to check if JWT token is revoked
|
||||||
response.status_code = error.code
|
@jwt.token_in_blocklist_loader
|
||||||
elif isinstance(error, ExpiredSignatureError):
|
def check_if_token_revoked(jwt_header, jwt_payload):
|
||||||
response = jsonify({"error": "Token has expired"})
|
token = db.session.get(RevokedToken, jwt_payload["jti"])
|
||||||
response.status_code = 401
|
return token is not None
|
||||||
else: # Wszystkie inne błędy
|
|
||||||
response = jsonify({"error": str(error)})
|
# Global error handler
|
||||||
response.status_code = 500
|
@app.errorhandler(Exception)
|
||||||
return response
|
def global_error_handler(error):
|
||||||
|
if isinstance(error, HTTPException):
|
||||||
|
response = jsonify({"error": error.description})
|
||||||
|
response.status_code = error.code
|
||||||
|
elif isinstance(error, ExpiredSignatureError):
|
||||||
|
response = jsonify({"error": "Token has expired"})
|
||||||
|
response.status_code = 401
|
||||||
|
else: # All other errors
|
||||||
|
response = jsonify({"error": str(error)})
|
||||||
|
response.status_code = 500
|
||||||
|
return response
|
||||||
|
|
||||||
|
# Fill database by initial values (only if we are not testing)
|
||||||
|
with app.app_context():
|
||||||
|
db.create_all()
|
||||||
|
if config_name != "testing":
|
||||||
|
init_db()
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
# Fill database by initial values
|
# Server start only if we run app directly
|
||||||
with app.app_context():
|
|
||||||
db.create_all()
|
|
||||||
init_db()
|
|
||||||
|
|
||||||
# Server start
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
app.run(host='0.0.0.0')
|
app = create_app()
|
||||||
|
app.run(host="0.0.0.0")
|
||||||
|
@ -15,3 +15,6 @@ class User(db.Model):
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def get_editable_fields():
|
def get_editable_fields():
|
||||||
return {"username", "email", "role", "password"}
|
return {"username", "email", "role", "password"}
|
||||||
|
|
||||||
|
class RevokedToken(db.Model):
|
||||||
|
jti = db.Column(db.String(100), primary_key=True)
|
||||||
|
@ -81,6 +81,7 @@ def test_get_users(test_client):
|
|||||||
response = test_client.get("/users", headers=headers)
|
response = test_client.get("/users", headers=headers)
|
||||||
assert response.status_code == 200 # Admin user should can get all users data
|
assert response.status_code == 200 # Admin user should can get all users data
|
||||||
|
|
||||||
|
|
||||||
def test_get_user_with_token(test_client):
|
def test_get_user_with_token(test_client):
|
||||||
"""Test to get user data before and after auth using JWT token"""
|
"""Test to get user data before and after auth using JWT token"""
|
||||||
admin_pass = generate_password_hash("admin_pass")
|
admin_pass = generate_password_hash("admin_pass")
|
||||||
@ -111,3 +112,19 @@ def test_get_user_with_token(test_client):
|
|||||||
assert response.status_code == 403 # Common user cannot get other user data
|
assert response.status_code == 403 # Common user cannot get other user data
|
||||||
response = test_client.get(f"/users/{user.id}", headers=admin_headers)
|
response = test_client.get(f"/users/{user.id}", headers=admin_headers)
|
||||||
assert response.status_code == 200 # Admin can access all user data
|
assert response.status_code == 200 # Admin can access all user data
|
||||||
|
|
||||||
|
|
||||||
|
def test_user_logout(test_client):
|
||||||
|
"""Test if logout work and JWT token is revoked"""
|
||||||
|
hashed_pass = generate_password_hash("testpass")
|
||||||
|
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
|
||||||
|
db.session.add(user)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
access_token = create_access_token(identity=str(user.id))
|
||||||
|
headers = {"Authorization": f"Bearer {access_token}"}
|
||||||
|
|
||||||
|
response = test_client.get(f"/logout", headers=headers)
|
||||||
|
assert response.status_code == 200 # Logged user can logout
|
||||||
|
response = test_client.get(f"/logout", headers=headers)
|
||||||
|
assert response.status_code == 401 # Token should be revoked after logout
|
||||||
|
@ -1,55 +0,0 @@
|
|||||||
import json
|
|
||||||
from models import User, db
|
|
||||||
from flask_jwt_extended import create_access_token
|
|
||||||
from werkzeug.security import generate_password_hash
|
|
||||||
|
|
||||||
def test_create_user(test_client):
|
|
||||||
"""New user registration test"""
|
|
||||||
response = test_client.post(
|
|
||||||
"/users",
|
|
||||||
data=json.dumps({"username": "testuser", "email": "test@example.com", "password": "testpass", "role": "User"}),
|
|
||||||
content_type="application/json",
|
|
||||||
)
|
|
||||||
assert response.status_code == 201 # User should be created successfully
|
|
||||||
data = response.get_json()
|
|
||||||
assert data["username"] == "testuser"
|
|
||||||
|
|
||||||
def test_login(test_client):
|
|
||||||
"""User login test"""
|
|
||||||
hashed_pass = generate_password_hash("testpass")
|
|
||||||
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
|
|
||||||
db.session.add(user)
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
response = test_client.post(
|
|
||||||
"/login",
|
|
||||||
data=json.dumps({"username": "testuser", "password": "wrongpass"}),
|
|
||||||
content_type="application/json",
|
|
||||||
)
|
|
||||||
assert response.status_code == 401 # User should not be logged - wrong password
|
|
||||||
response = test_client.post(
|
|
||||||
"/login",
|
|
||||||
data=json.dumps({"username": "testuser", "password": "testpass"}),
|
|
||||||
content_type="application/json",
|
|
||||||
)
|
|
||||||
assert response.status_code == 200 # User should be logged - right password
|
|
||||||
|
|
||||||
def test_get_users(test_client):
|
|
||||||
"""Get all users test - JWT required"""
|
|
||||||
response = test_client.get("/users")
|
|
||||||
assert response.status_code == 401
|
|
||||||
|
|
||||||
def test_get_user_with_token(test_client):
|
|
||||||
"""Test to get user data after auth using JWT token"""
|
|
||||||
user = User(username="admin", email="admin@example.com", password="hashed_pass", role="Administrator")
|
|
||||||
print(user.id)
|
|
||||||
db.session.add(user)
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
access_token = create_access_token(identity=str(user.id))
|
|
||||||
headers = {"Authorization": f"Bearer {access_token}"}
|
|
||||||
|
|
||||||
response = test_client.get(f"/users/{user.id}", headers=headers)
|
|
||||||
assert response.status_code == 200
|
|
||||||
data = response.get_json()
|
|
||||||
assert data["username"] == "admin"
|
|
@ -6,7 +6,7 @@ from werkzeug.security import generate_password_hash
|
|||||||
|
|
||||||
|
|
||||||
def admin_required(user_id, message='Access denied.'):
|
def admin_required(user_id, message='Access denied.'):
|
||||||
user = User.query.get(user_id)
|
user = db.session.get(User, user_id)
|
||||||
if user is None or user.role != "Administrator":
|
if user is None or user.role != "Administrator":
|
||||||
abort(403, message)
|
abort(403, message)
|
||||||
|
|
||||||
@ -14,7 +14,7 @@ def admin_required(user_id, message='Access denied.'):
|
|||||||
def validate_access(owner_id, message='Access denied.'):
|
def validate_access(owner_id, message='Access denied.'):
|
||||||
# Check if user try to access or edit resource that does not belong to them
|
# Check if user try to access or edit resource that does not belong to them
|
||||||
logged_user_id = int(get_jwt_identity())
|
logged_user_id = int(get_jwt_identity())
|
||||||
logged_user_role = User.query.get(logged_user_id).role
|
logged_user_role = db.session.get(User, logged_user_id).role
|
||||||
if logged_user_role != "Administrator" and logged_user_id != owner_id:
|
if logged_user_role != "Administrator" and logged_user_id != owner_id:
|
||||||
abort(403, message)
|
abort(403, message)
|
||||||
|
|
||||||
|
17
api/views.py
17
api/views.py
@ -1,6 +1,7 @@
|
|||||||
from flask import Blueprint, jsonify, request, abort
|
from flask import Blueprint, jsonify, request, abort
|
||||||
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies
|
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, \
|
||||||
from models import User, db
|
verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies, get_jwt
|
||||||
|
from models import db, RevokedToken, User
|
||||||
from utils import admin_required, validate_access
|
from utils import admin_required, validate_access
|
||||||
from werkzeug.security import check_password_hash, generate_password_hash
|
from werkzeug.security import check_password_hash, generate_password_hash
|
||||||
|
|
||||||
@ -22,7 +23,9 @@ def get_all_users():
|
|||||||
@jwt_required()
|
@jwt_required()
|
||||||
def get_user(user_id):
|
def get_user(user_id):
|
||||||
validate_access(user_id) # check if user tries to read other user account details
|
validate_access(user_id) # check if user tries to read other user account details
|
||||||
user = User.query.get_or_404(user_id)
|
user = db.session.get(User, user_id)
|
||||||
|
if user is None:
|
||||||
|
abort(404, "User not found.")
|
||||||
return jsonify(user.to_dict())
|
return jsonify(user.to_dict())
|
||||||
|
|
||||||
|
|
||||||
@ -72,7 +75,9 @@ def edit_user(user_id):
|
|||||||
@jwt_required()
|
@jwt_required()
|
||||||
def remove_user(user_id):
|
def remove_user(user_id):
|
||||||
validate_access(user_id) # Only admin can remove other users accounts
|
validate_access(user_id) # Only admin can remove other users accounts
|
||||||
user_to_delete = User.query.get_or_404(user_id)
|
user_to_delete = db.session.get(User, user_id)
|
||||||
|
if user_to_delete is None:
|
||||||
|
abort(404, "User not found.")
|
||||||
db.session.delete(user_to_delete)
|
db.session.delete(user_to_delete)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return jsonify({"msg": "User removed successfully."})
|
return jsonify({"msg": "User removed successfully."})
|
||||||
@ -102,6 +107,10 @@ def user_login():
|
|||||||
@user_bp.route('/logout', methods=['GET'])
|
@user_bp.route('/logout', methods=['GET'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
def user_logout():
|
def user_logout():
|
||||||
|
jti = get_jwt()["jti"]
|
||||||
|
revoked_token = RevokedToken(jti=jti)
|
||||||
|
db.session.add(revoked_token)
|
||||||
|
db.session.commit()
|
||||||
response = jsonify({"msg": "User logged out successfully."})
|
response = jsonify({"msg": "User logged out successfully."})
|
||||||
unset_jwt_cookies(response)
|
unset_jwt_cookies(response)
|
||||||
return response
|
return response
|
||||||
|
Loading…
x
Reference in New Issue
Block a user