24 Commits

Author SHA1 Message Date
d3d3c98f99 Moved wait_for_db function to utils module 2025-06-11 19:48:58 +00:00
9e010ed389 Implemented waiting for db readiness 2025-06-11 19:43:47 +00:00
636a382cf5 Deleted jenkins pipeline from main branch 2025-06-11 17:13:27 +00:00
76a351710f Added variable APP_PORT to customize application port 2025-05-04 16:42:43 +00:00
c1f0da4a9c Extended Goss sleep 2025-05-04 15:37:47 +00:00
eefc952ff0 Updated app port in Goss YAML config 2025-05-04 15:26:21 +00:00
8c35b3bd8c Changed development server to production 2025-05-04 15:23:05 +00:00
60011b1c72 Added GOSS_SLEEP flag to wait for container full start before tests 2025-05-04 11:03:12 +00:00
859a962c12 Corrected python command name in Goss config YAML 2025-05-04 11:02:09 +00:00
0e9df4f859 Corrected command to run tests in Goss 2025-05-04 10:14:36 +00:00
1554404657 Corrected port to check in Goss YAML config 2025-05-04 10:14:18 +00:00
925af7d314 Corrected commands to test python app 2025-05-04 06:55:10 +00:00
fb260a0f6d Corrected directory in jenkins pipeline 2025-05-03 20:21:02 +00:00
dcd9a39b46 Corrected shell commands in jenkins pipeline 2025-05-03 20:05:32 +00:00
8194e3e9fe Added Jenkins pipeline to test code and container 2025-05-03 19:47:27 +00:00
0006044ae4 Added goss tests 2025-05-03 19:45:40 +00:00
74a58879ce Refactored code responsible for finding user in database 2025-04-02 20:02:34 +00:00
d325a52222 Refactored test code 2025-04-02 19:28:20 +00:00
546d26ada0 Added test for user edit 2025-04-02 19:22:12 +00:00
acf4b1c26c Added test for user remove 2025-04-02 19:21:48 +00:00
37e89b60af Added test for user logout 2025-03-29 21:18:24 +00:00
2632bdf994 Changed deprecated query.get and query.get_or_404 methods to other equivalents 2025-03-29 21:12:00 +00:00
8637eaa96f Added revoking token during logout 2025-03-29 20:57:01 +00:00
99dd5148b1 Now fabric function creates app 2025-03-29 20:32:38 +00:00
8 changed files with 217 additions and 164 deletions

View File

@ -2,46 +2,67 @@ from dotenv import load_dotenv
from flask import Flask, jsonify from flask import Flask, jsonify
from flask_jwt_extended import JWTManager from flask_jwt_extended import JWTManager
from jwt import ExpiredSignatureError from jwt import ExpiredSignatureError
from models import db from models import db, RevokedToken
import os import os
from utils import init_db from utils import init_db, wait_for_db
from views import user_bp from views import user_bp
from werkzeug.exceptions import HTTPException from werkzeug.exceptions import HTTPException
# App initialization def create_app(config_name="default"):
load_dotenv() """Creates and returns a new instance of Flask app."""
app = Flask(__name__) load_dotenv()
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('SQLALCHEMY_DATABASE_URI') app = Flask(__name__)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['JWT_SECRET_KEY'] = os.getenv('JWT_SECRET_KEY', 'changeme') # Database settings
if config_name == "testing":
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:" # Database in memory
app.config["TESTING"] = True
else:
app.config["SQLALCHEMY_DATABASE_URI"] = os.getenv("SQLALCHEMY_DATABASE_URI")
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
# Blueprint registration # JWT settings
app.register_blueprint(user_bp) app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY", "changeme")
# Database and JWT initialization # Blueprints registration
db.init_app(app) app.register_blueprint(user_bp)
jwt = JWTManager(app)
# Global error handler # Database and JWT initialization
@app.errorhandler(Exception) db.init_app(app)
def global_error_handler(error): jwt = JWTManager(app)
if isinstance(error, HTTPException):
response = jsonify({"error": error.description}) # Function to check if JWT token is revoked
response.status_code = error.code @jwt.token_in_blocklist_loader
elif isinstance(error, ExpiredSignatureError): def check_if_token_revoked(jwt_header, jwt_payload):
response = jsonify({"error": "Token has expired"}) token = db.session.get(RevokedToken, jwt_payload["jti"])
response.status_code = 401 return token is not None
else: # Wszystkie inne błędy
response = jsonify({"error": str(error)}) # Global error handler
response.status_code = 500 @app.errorhandler(Exception)
return response def global_error_handler(error):
if isinstance(error, HTTPException):
response = jsonify({"error": error.description})
response.status_code = error.code
elif isinstance(error, ExpiredSignatureError):
response = jsonify({"error": "Token has expired"})
response.status_code = 401
else: # All other errors
response = jsonify({"error": str(error)})
response.status_code = 500
return response
# Fill database by initial values (only if we are not testing)
with app.app_context():
wait_for_db()
db.create_all()
if config_name != "testing":
init_db()
return app
# Fill database by initial values # Server start only if we run app directly
with app.app_context():
db.create_all()
init_db()
# Server start
if __name__ == "__main__": if __name__ == "__main__":
app.run(host='0.0.0.0') from waitress import serve
app = create_app()
port = os.getenv("APP_PORT", "80")
serve(app, host="0.0.0.0", port=port)

View File

@ -15,3 +15,6 @@ class User(db.Model):
@staticmethod @staticmethod
def get_editable_fields(): def get_editable_fields():
return {"username", "email", "role", "password"} return {"username", "email", "role", "password"}
class RevokedToken(db.Model):
jti = db.Column(db.String(100), primary_key=True)

View File

@ -11,4 +11,5 @@ mysql-connector-python==9.2.0
python-dotenv==1.0.0 python-dotenv==1.0.0
SQLAlchemy==2.0.23 SQLAlchemy==2.0.23
typing_extensions==4.8.0 typing_extensions==4.8.0
waitress==3.0.2
Werkzeug==3.0.1 Werkzeug==3.0.1

View File

@ -1,6 +1,8 @@
import pytest import pytest
from app import create_app from app import create_app
from models import db from flask_jwt_extended import create_access_token
from models import db, User
from werkzeug.security import generate_password_hash
@pytest.fixture @pytest.fixture
def test_client(): def test_client():
@ -13,3 +15,33 @@ def test_client():
yield client yield client
db.session.remove() db.session.remove()
db.drop_all() db.drop_all()
@pytest.fixture
def test_user():
"""Create a new user for testing."""
user = User(username="testuser", email="test@example.com", password=generate_password_hash("testpass"), role="User")
db.session.add(user)
db.session.commit()
return user
@pytest.fixture
def test_user2():
"""Create a user nr 2 for testing."""
user2 = User(username="testuser2", email="test2@example.com", password=generate_password_hash("testpass2"), role="User")
db.session.add(user2)
db.session.commit()
return user2
@pytest.fixture
def test_admin():
"""Create a new admin user for testing."""
admin = User(username="adminuser", email="admin@example.com", password=generate_password_hash("adminpass"), role="Administrator")
db.session.add(admin)
db.session.commit()
return admin
def login_test_user(identity):
"""Return Bearer auth header for user identified by provided id"""
access_token = create_access_token(identity=str(identity))
auth_header = {"Authorization": f"Bearer {access_token}"}
return auth_header

View File

@ -1,113 +1,132 @@
from conftest import login_test_user
import json import json
from models import User, db
from flask_jwt_extended import create_access_token
from werkzeug.security import generate_password_hash
def test_create_user(test_client): def test_create_user(test_client, test_user, test_admin):
"""New user registration test""" """New user registration test"""
# Anonymous try to create common user # Anonymous try to create common user
test_user_data = {"username": "testuser", "email": "test@example.com", "password": "testpass", "role": "User"} test_user_data = {"username": "test", "email": "testemail@example.com", "password": "testpassword", "role": "User"}
response = test_client.post("/users", data=json.dumps(test_user_data), content_type="application/json") response = test_client.post("/users", data=json.dumps(test_user_data), content_type="application/json")
assert response.status_code == 201 # User should be created successfully assert response.status_code == 201, "Each should can register in the service"
data = response.get_json() data = response.get_json()
assert data["username"] == "testuser" assert data["username"] == "test"
# Anonymous try to create admin user # Anonymous try to create admin user
admin_user_data = {"username": "testadmin", "email": "testadmin@example.com", "password": "adminpass", "role": "Administrator"} admin_user_data = {"username": "testadmin", "email": "testadmin@example.com", "password": "adminpass", "role": "Administrator"}
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json") response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json")
assert response.status_code == 401 # Anonymous cannot create admin users assert response.status_code == 401, "Anonymous should cannot create admin users"
# Login common user and try to create admin user # Login common user and try to create admin user
access_token = create_access_token(identity='1') headers = login_test_user(test_user.id)
headers = {"Authorization": f"Bearer {access_token}"}
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json", headers=headers) response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json", headers=headers)
assert response.status_code == 403 # Common user cannot create admin users assert response.status_code == 403, "Common user should cannot create admin users"
# Try to create admin user using admin account # Try to create admin user using admin account
hashed_pass = generate_password_hash("adminpass") headers = login_test_user(test_admin.id)
user = User(username="admin", email="admin@example.com", password=hashed_pass, role="Administrator")
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json", headers=headers) response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json", headers=headers)
assert response.status_code == 201 # Logged administrators can create new admin users assert response.status_code == 201, "Logged administrators should can create new admin users"
def test_edit_user(test_client, test_user, test_admin):
"User edit test"
# Anonymous cannot edit any user
admin_data = test_admin.to_dict()
response = test_client.patch(f"/users/{test_admin.id}", data=json.dumps({"username": admin_data["username"], "password": "adminpass"}))
assert response.status_code == 401
def test_login(test_client): # Login users (get dict with auth header and merge it with dict with rest of headers)
admin_headers = login_test_user(test_admin.id) | {"Content-Type": "application/json"}
user_headers = login_test_user(test_user.id) | {"Content-Type": "application/json"}
# Check if PUT request contains all editable fields
response = test_client.put(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=user_headers)
assert response.status_code == 400, "PUT request data must have all editable fields"
# Check if user can edit their own data
response = test_client.patch(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=user_headers)
assert response.status_code == 200, "Common user should can edit own account data"
# Check if user cannot edit other user data
response = test_client.patch(f"/users/{test_admin.id}", data=json.dumps({"username": admin_data["username"], "password": "adminpass"}), headers=user_headers)
assert response.status_code == 403, "Common user should cannot edit other user data"
# Check if admin can edit other user data
response = test_client.patch(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=admin_headers)
assert response.status_code == 200, "Admin user should can edit other user data"
def test_remove_user(test_client, test_user, test_user2, test_admin):
"User remove test"
# Anonymous try to remove user
response = test_client.delete(f"/users/{test_user.id}")
assert response.status_code == 401, "Anonymous should cannot remove user account"
# Logged user try to remove other user account
headers = login_test_user(test_user.id)
response = test_client.delete(f"/users/{test_admin.id}", headers=headers)
assert response.status_code == 403, "Common user should cannot remove other user account"
# Logged user try to remove own account
response = test_client.delete(f"/users/{test_user.id}", headers=headers)
assert response.status_code == 200, "Common user should can remove their own account"
# Logged admin can remove other user account
admin_headers = login_test_user(test_admin.id)
response = test_client.delete(f"/users/{test_user2.id}", headers=admin_headers)
assert response.status_code == 200, "Admin user should can remove other user account"
def test_login(test_client, test_user):
"""User login test""" """User login test"""
hashed_pass = generate_password_hash("testpass")
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
db.session.add(user)
db.session.commit()
response = test_client.post( response = test_client.post(
"/login", "/login",
data=json.dumps({"username": "testuser", "password": "wrongpass"}), data=json.dumps({"username": "testuser", "password": "wrongpass"}),
content_type="application/json", content_type="application/json",
) )
assert response.status_code == 401 # User should not be logged - wrong password assert response.status_code == 401, "User should not become logged if provided wrong password"
response = test_client.post( response = test_client.post(
"/login", "/login",
data=json.dumps({"username": "testuser", "password": "testpass"}), data=json.dumps({"username": "testuser", "password": "testpass"}),
content_type="application/json", content_type="application/json",
) )
assert response.status_code == 200 # User should be logged - right password assert response.status_code == 200, "User should become logged if provided right password"
def test_get_users(test_client, test_user, test_admin):
def test_get_users(test_client):
"""Get all users test""" """Get all users test"""
response = test_client.get("/users") response = test_client.get("/users")
assert response.status_code == 401 # Anonymous cannot get all users data assert response.status_code == 401, "Anonymous should cannot get all users data"
# Common user try to get all users data # Common user try to get all users data
hashed_pass = generate_password_hash("testpass") headers = login_test_user(test_user.id)
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
response = test_client.get("/users", headers=headers) response = test_client.get("/users", headers=headers)
assert response.status_code == 403 # Common user cannot get all users data assert response.status_code == 403, "Common user should cannot get all users data"
# Admin user try to get all users data # Admin user try to get all users data
hashed_pass = generate_password_hash("adminpass") headers = login_test_user(test_admin.id)
user = User(username="testadmin", email="testadmin@example.com", password=hashed_pass, role="Administrator")
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
response = test_client.get("/users", headers=headers) response = test_client.get("/users", headers=headers)
assert response.status_code == 200 # Admin user should can get all users data assert response.status_code == 200, "Admin user should be able to get all users data"
def test_get_user_with_token(test_client): def test_get_user_with_token(test_client, test_user, test_admin):
"""Test to get user data before and after auth using JWT token""" """Test to get user data before and after auth using JWT token"""
admin_pass = generate_password_hash("admin_pass") response = test_client.get(f"/users/{test_admin.id}")
admin = User(username="admin", email="admin@example.com", password=admin_pass, role="Administrator") assert response.status_code == 401, "Anonymous should cannot get user data without login"
db.session.add(admin)
db.session.commit()
response = test_client.get(f"/users/{admin.id}") admin_headers = login_test_user(test_admin.id)
assert response.status_code == 401 # Try to get user data without login response = test_client.get(f"/users/{test_admin.id}", headers=admin_headers)
access_token = create_access_token(identity=str(admin.id))
admin_headers = {"Authorization": f"Bearer {access_token}"}
response = test_client.get(f"/users/{admin.id}", headers=admin_headers)
assert response.status_code == 200 assert response.status_code == 200
data = response.get_json() data = response.get_json()
assert data["username"] == "admin" assert data["username"] == "adminuser"
user_pass = generate_password_hash("test_pass") headers = login_test_user(test_user.id)
user = User(username="testuser", email="test@example.com", password=user_pass, role="User") response = test_client.get(f"/users/{test_user.id}", headers=headers)
db.session.add(user) assert response.status_code == 200, "Common user should can get own user data"
db.session.commit() response = test_client.get(f"/users/{test_admin.id}", headers=headers)
access_token = create_access_token(identity=str(user.id)) assert response.status_code == 403, "Common user should cannot get other user data"
headers = {"Authorization": f"Bearer {access_token}"} response = test_client.get(f"/users/{test_user.id}", headers=admin_headers)
response = test_client.get(f"/users/{user.id}", headers=headers) assert response.status_code == 200, "Admin should can access all user data"
assert response.status_code == 200 # Common user can get own user data
response = test_client.get(f"/users/{admin.id}", headers=headers) def test_user_logout(test_client, test_user):
assert response.status_code == 403 # Common user cannot get other user data """Test if logout works and JWT token is revoked"""
response = test_client.get(f"/users/{user.id}", headers=admin_headers) headers = login_test_user(test_user.id)
assert response.status_code == 200 # Admin can access all user data response = test_client.get(f"/logout", headers=headers)
assert response.status_code == 200, "Logged user should can logout"
response = test_client.get(f"/logout", headers=headers)
assert response.status_code == 401, "Token should be revoked after logout"

View File

@ -1,55 +0,0 @@
import json
from models import User, db
from flask_jwt_extended import create_access_token
from werkzeug.security import generate_password_hash
def test_create_user(test_client):
"""New user registration test"""
response = test_client.post(
"/users",
data=json.dumps({"username": "testuser", "email": "test@example.com", "password": "testpass", "role": "User"}),
content_type="application/json",
)
assert response.status_code == 201 # User should be created successfully
data = response.get_json()
assert data["username"] == "testuser"
def test_login(test_client):
"""User login test"""
hashed_pass = generate_password_hash("testpass")
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
db.session.add(user)
db.session.commit()
response = test_client.post(
"/login",
data=json.dumps({"username": "testuser", "password": "wrongpass"}),
content_type="application/json",
)
assert response.status_code == 401 # User should not be logged - wrong password
response = test_client.post(
"/login",
data=json.dumps({"username": "testuser", "password": "testpass"}),
content_type="application/json",
)
assert response.status_code == 200 # User should be logged - right password
def test_get_users(test_client):
"""Get all users test - JWT required"""
response = test_client.get("/users")
assert response.status_code == 401
def test_get_user_with_token(test_client):
"""Test to get user data after auth using JWT token"""
user = User(username="admin", email="admin@example.com", password="hashed_pass", role="Administrator")
print(user.id)
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
response = test_client.get(f"/users/{user.id}", headers=headers)
assert response.status_code == 200
data = response.get_json()
assert data["username"] == "admin"

View File

@ -2,11 +2,14 @@ from flask import abort
from flask_jwt_extended import get_jwt_identity from flask_jwt_extended import get_jwt_identity
from models import User, db from models import User, db
import os import os
from sqlalchemy import text
from sqlalchemy.exc import DatabaseError
import time
from werkzeug.security import generate_password_hash from werkzeug.security import generate_password_hash
def admin_required(user_id, message='Access denied.'): def admin_required(user_id, message='Access denied.'):
user = User.query.get(user_id) user = db.session.get(User, user_id)
if user is None or user.role != "Administrator": if user is None or user.role != "Administrator":
abort(403, message) abort(403, message)
@ -14,11 +17,35 @@ def admin_required(user_id, message='Access denied.'):
def validate_access(owner_id, message='Access denied.'): def validate_access(owner_id, message='Access denied.'):
# Check if user try to access or edit resource that does not belong to them # Check if user try to access or edit resource that does not belong to them
logged_user_id = int(get_jwt_identity()) logged_user_id = int(get_jwt_identity())
logged_user_role = User.query.get(logged_user_id).role logged_user_role = db.session.get(User, logged_user_id).role
if logged_user_role != "Administrator" and logged_user_id != owner_id: if logged_user_role != "Administrator" and logged_user_id != owner_id:
abort(403, message) abort(403, message)
def get_user_or_404(user_id):
"Get user from database or abort 404"
user = db.session.get(User, user_id)
if user is None:
abort(404, "User not found")
return user
MAX_RETRIES = 100
def wait_for_db():
for retries in range(MAX_RETRIES):
try:
with db.engine.connect() as connection:
connection.execute(text("SELECT 1"))
print("Successfully connected with database.")
return
except DatabaseError:
print(f"Waiting for database... (retry {retries + 1})")
time.sleep(3)
print("Failed to connect to database.")
raise Exception("Database not ready after multiple retries.")
def init_db(): def init_db():
"""Create default admin account if database is empty""" """Create default admin account if database is empty"""
with db.session.begin(): with db.session.begin():

View File

@ -1,7 +1,8 @@
from flask import Blueprint, jsonify, request, abort from flask import Blueprint, jsonify, request, abort
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, \
from models import User, db verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies, get_jwt
from utils import admin_required, validate_access from models import db, RevokedToken, User
from utils import admin_required, validate_access, get_user_or_404
from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.security import check_password_hash, generate_password_hash
user_bp = Blueprint('user_bp', __name__) user_bp = Blueprint('user_bp', __name__)
@ -22,7 +23,7 @@ def get_all_users():
@jwt_required() @jwt_required()
def get_user(user_id): def get_user(user_id):
validate_access(user_id) # check if user tries to read other user account details validate_access(user_id) # check if user tries to read other user account details
user = User.query.get_or_404(user_id) user = get_user_or_404(user_id)
return jsonify(user.to_dict()) return jsonify(user.to_dict())
@ -56,7 +57,7 @@ def edit_user(user_id):
if request_fields != editable_fields: if request_fields != editable_fields:
abort(400, "Invalid request data structure.") abort(400, "Invalid request data structure.")
user_to_update = User.query.get_or_404(user_id) user_to_update = get_user_or_404(user_id)
for field_name in editable_fields: for field_name in editable_fields:
requested_value = request_data.get(field_name) requested_value = request_data.get(field_name)
if requested_value is None: if requested_value is None:
@ -72,7 +73,7 @@ def edit_user(user_id):
@jwt_required() @jwt_required()
def remove_user(user_id): def remove_user(user_id):
validate_access(user_id) # Only admin can remove other users accounts validate_access(user_id) # Only admin can remove other users accounts
user_to_delete = User.query.get_or_404(user_id) user_to_delete = get_user_or_404(user_id)
db.session.delete(user_to_delete) db.session.delete(user_to_delete)
db.session.commit() db.session.commit()
return jsonify({"msg": "User removed successfully."}) return jsonify({"msg": "User removed successfully."})
@ -102,6 +103,10 @@ def user_login():
@user_bp.route('/logout', methods=['GET']) @user_bp.route('/logout', methods=['GET'])
@jwt_required() @jwt_required()
def user_logout(): def user_logout():
jti = get_jwt()["jti"]
revoked_token = RevokedToken(jti=jti)
db.session.add(revoked_token)
db.session.commit()
response = jsonify({"msg": "User logged out successfully."}) response = jsonify({"msg": "User logged out successfully."})
unset_jwt_cookies(response) unset_jwt_cookies(response)
return response return response