30 Commits

Author SHA1 Message Date
c9b1dac864 Added endpoint to fetch app version 2025-07-03 22:47:21 +02:00
cd4ab3fd27 Handled more errors during db initialization 2025-06-12 18:42:07 +00:00
301cf5922e Changed docker image base to Alpine and added curl 2025-06-11 22:15:37 +00:00
479ec4f917 Added healthcheck 2025-06-11 22:04:35 +00:00
3f40a6126c Added more descriptions of functions 2025-06-11 20:04:04 +00:00
dd9e9ce110 Improved function body 2025-06-11 19:57:15 +00:00
d3d3c98f99 Moved wait_for_db function to utils module 2025-06-11 19:48:58 +00:00
9e010ed389 Implemented waiting for db readiness 2025-06-11 19:43:47 +00:00
636a382cf5 Deleted jenkins pipeline from main branch 2025-06-11 17:13:27 +00:00
76a351710f Added variable APP_PORT to customize application port 2025-05-04 16:42:43 +00:00
c1f0da4a9c Extended Goss sleep 2025-05-04 15:37:47 +00:00
eefc952ff0 Updated app port in Goss YAML config 2025-05-04 15:26:21 +00:00
8c35b3bd8c Changed development server to production 2025-05-04 15:23:05 +00:00
60011b1c72 Added GOSS_SLEEP flag to wait for container full start before tests 2025-05-04 11:03:12 +00:00
859a962c12 Corrected python command name in Goss config YAML 2025-05-04 11:02:09 +00:00
0e9df4f859 Corrected command to run tests in Goss 2025-05-04 10:14:36 +00:00
1554404657 Corrected port to check in Goss YAML config 2025-05-04 10:14:18 +00:00
925af7d314 Corrected commands to test python app 2025-05-04 06:55:10 +00:00
fb260a0f6d Corrected directory in jenkins pipeline 2025-05-03 20:21:02 +00:00
dcd9a39b46 Corrected shell commands in jenkins pipeline 2025-05-03 20:05:32 +00:00
8194e3e9fe Added Jenkins pipeline to test code and container 2025-05-03 19:47:27 +00:00
0006044ae4 Added goss tests 2025-05-03 19:45:40 +00:00
74a58879ce Refactored code responsible for finding user in database 2025-04-02 20:02:34 +00:00
d325a52222 Refactored test code 2025-04-02 19:28:20 +00:00
546d26ada0 Added test for user edit 2025-04-02 19:22:12 +00:00
acf4b1c26c Added test for user remove 2025-04-02 19:21:48 +00:00
37e89b60af Added test for user logout 2025-03-29 21:18:24 +00:00
2632bdf994 Changed deprecated query.get and query.get_or_404 methods to other equivalents 2025-03-29 21:12:00 +00:00
8637eaa96f Added revoking token during logout 2025-03-29 20:57:01 +00:00
99dd5148b1 Now fabric function creates app 2025-03-29 20:32:38 +00:00
11 changed files with 265 additions and 166 deletions

View File

@ -1,5 +1,6 @@
FROM python:3.11.7-slim-bookworm
FROM python:3.11.7-alpine
WORKDIR /app
COPY api .
RUN apk add --no-cache curl
RUN pip install -r requirements.txt
CMD python3 app.py

View File

@ -2,46 +2,69 @@ from dotenv import load_dotenv
from flask import Flask, jsonify
from flask_jwt_extended import JWTManager
from jwt import ExpiredSignatureError
from models import db
from models import db, RevokedToken
import os
from utils import init_db
from tech_views import tech_bp
from utils import init_db, wait_for_db
from views import user_bp
from werkzeug.exceptions import HTTPException
# App initialization
load_dotenv()
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('SQLALCHEMY_DATABASE_URI')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['JWT_SECRET_KEY'] = os.getenv('JWT_SECRET_KEY', 'changeme')
def create_app(config_name="default"):
"""Creates and returns a new instance of Flask app."""
load_dotenv()
app = Flask(__name__)
# Database settings
if config_name == "testing":
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:" # Database in memory
app.config["TESTING"] = True
else:
app.config["SQLALCHEMY_DATABASE_URI"] = os.getenv("SQLALCHEMY_DATABASE_URI")
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
# Blueprint registration
app.register_blueprint(user_bp)
# JWT settings
app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY", "changeme")
# Database and JWT initialization
db.init_app(app)
jwt = JWTManager(app)
# Blueprints registration
app.register_blueprint(user_bp)
app.register_blueprint(tech_bp)
# Global error handler
@app.errorhandler(Exception)
def global_error_handler(error):
if isinstance(error, HTTPException):
response = jsonify({"error": error.description})
response.status_code = error.code
elif isinstance(error, ExpiredSignatureError):
response = jsonify({"error": "Token has expired"})
response.status_code = 401
else: # Wszystkie inne błędy
response = jsonify({"error": str(error)})
response.status_code = 500
return response
# Database and JWT initialization
db.init_app(app)
jwt = JWTManager(app)
# Function to check if JWT token is revoked
@jwt.token_in_blocklist_loader
def check_if_token_revoked(jwt_header, jwt_payload):
token = db.session.get(RevokedToken, jwt_payload["jti"])
return token is not None
# Global error handler
@app.errorhandler(Exception)
def global_error_handler(error):
if isinstance(error, HTTPException):
response = jsonify({"error": error.description})
response.status_code = error.code
elif isinstance(error, ExpiredSignatureError):
response = jsonify({"error": "Token has expired"})
response.status_code = 401
else: # All other errors
response = jsonify({"error": str(error)})
response.status_code = 500
return response
# Fill database by initial values (only if we are not testing)
with app.app_context():
wait_for_db(max_retries=100)
db.create_all()
if config_name != "testing":
init_db()
return app
# Fill database by initial values
with app.app_context():
db.create_all()
init_db()
# Server start
# Server start only if we run app directly
if __name__ == "__main__":
app.run(host='0.0.0.0')
from waitress import serve
app = create_app()
port = os.getenv("APP_PORT", "80")
serve(app, host="0.0.0.0", port=port)

View File

@ -15,3 +15,6 @@ class User(db.Model):
@staticmethod
def get_editable_fields():
return {"username", "email", "role", "password"}
class RevokedToken(db.Model):
jti = db.Column(db.String(100), primary_key=True)

View File

@ -11,4 +11,5 @@ mysql-connector-python==9.2.0
python-dotenv==1.0.0
SQLAlchemy==2.0.23
typing_extensions==4.8.0
waitress==3.0.2
Werkzeug==3.0.1

20
api/tech_views.py Normal file
View File

@ -0,0 +1,20 @@
from flask import Blueprint, jsonify
from models import db
from sqlalchemy import text
from utils import db_ready
# Blueprint with technical endpoints
tech_bp = Blueprint('tech_bp', __name__)
@tech_bp.route('/health', methods=['GET'])
def health_check():
"Check if service works and database is functional"
try:
with db.engine.connect() as connection:
connection.execute(text("SELECT 1"))
return jsonify(status="healthy"), 200
except Exception:
if db_ready:
return jsonify(status="unhealthy"), 500
else:
return jsonify(status="starting"), 503

View File

@ -1,6 +1,8 @@
import pytest
from app import create_app
from models import db
from flask_jwt_extended import create_access_token
from models import db, User
from werkzeug.security import generate_password_hash
@pytest.fixture
def test_client():
@ -13,3 +15,33 @@ def test_client():
yield client
db.session.remove()
db.drop_all()
@pytest.fixture
def test_user():
"""Create a new user for testing."""
user = User(username="testuser", email="test@example.com", password=generate_password_hash("testpass"), role="User")
db.session.add(user)
db.session.commit()
return user
@pytest.fixture
def test_user2():
"""Create a user nr 2 for testing."""
user2 = User(username="testuser2", email="test2@example.com", password=generate_password_hash("testpass2"), role="User")
db.session.add(user2)
db.session.commit()
return user2
@pytest.fixture
def test_admin():
"""Create a new admin user for testing."""
admin = User(username="adminuser", email="admin@example.com", password=generate_password_hash("adminpass"), role="Administrator")
db.session.add(admin)
db.session.commit()
return admin
def login_test_user(identity):
"""Return Bearer auth header for user identified by provided id"""
access_token = create_access_token(identity=str(identity))
auth_header = {"Authorization": f"Bearer {access_token}"}
return auth_header

View File

@ -1,113 +1,132 @@
from conftest import login_test_user
import json
from models import User, db
from flask_jwt_extended import create_access_token
from werkzeug.security import generate_password_hash
def test_create_user(test_client):
def test_create_user(test_client, test_user, test_admin):
"""New user registration test"""
# Anonymous try to create common user
test_user_data = {"username": "testuser", "email": "test@example.com", "password": "testpass", "role": "User"}
test_user_data = {"username": "test", "email": "testemail@example.com", "password": "testpassword", "role": "User"}
response = test_client.post("/users", data=json.dumps(test_user_data), content_type="application/json")
assert response.status_code == 201 # User should be created successfully
assert response.status_code == 201, "Each should can register in the service"
data = response.get_json()
assert data["username"] == "testuser"
assert data["username"] == "test"
# Anonymous try to create admin user
admin_user_data = {"username": "testadmin", "email": "testadmin@example.com", "password": "adminpass", "role": "Administrator"}
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json")
assert response.status_code == 401 # Anonymous cannot create admin users
assert response.status_code == 401, "Anonymous should cannot create admin users"
# Login common user and try to create admin user
access_token = create_access_token(identity='1')
headers = {"Authorization": f"Bearer {access_token}"}
headers = login_test_user(test_user.id)
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json", headers=headers)
assert response.status_code == 403 # Common user cannot create admin users
assert response.status_code == 403, "Common user should cannot create admin users"
# Try to create admin user using admin account
hashed_pass = generate_password_hash("adminpass")
user = User(username="admin", email="admin@example.com", password=hashed_pass, role="Administrator")
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
headers = login_test_user(test_admin.id)
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json", headers=headers)
assert response.status_code == 201 # Logged administrators can create new admin users
assert response.status_code == 201, "Logged administrators should can create new admin users"
def test_edit_user(test_client, test_user, test_admin):
"User edit test"
# Anonymous cannot edit any user
admin_data = test_admin.to_dict()
response = test_client.patch(f"/users/{test_admin.id}", data=json.dumps({"username": admin_data["username"], "password": "adminpass"}))
assert response.status_code == 401
def test_login(test_client):
# Login users (get dict with auth header and merge it with dict with rest of headers)
admin_headers = login_test_user(test_admin.id) | {"Content-Type": "application/json"}
user_headers = login_test_user(test_user.id) | {"Content-Type": "application/json"}
# Check if PUT request contains all editable fields
response = test_client.put(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=user_headers)
assert response.status_code == 400, "PUT request data must have all editable fields"
# Check if user can edit their own data
response = test_client.patch(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=user_headers)
assert response.status_code == 200, "Common user should can edit own account data"
# Check if user cannot edit other user data
response = test_client.patch(f"/users/{test_admin.id}", data=json.dumps({"username": admin_data["username"], "password": "adminpass"}), headers=user_headers)
assert response.status_code == 403, "Common user should cannot edit other user data"
# Check if admin can edit other user data
response = test_client.patch(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=admin_headers)
assert response.status_code == 200, "Admin user should can edit other user data"
def test_remove_user(test_client, test_user, test_user2, test_admin):
"User remove test"
# Anonymous try to remove user
response = test_client.delete(f"/users/{test_user.id}")
assert response.status_code == 401, "Anonymous should cannot remove user account"
# Logged user try to remove other user account
headers = login_test_user(test_user.id)
response = test_client.delete(f"/users/{test_admin.id}", headers=headers)
assert response.status_code == 403, "Common user should cannot remove other user account"
# Logged user try to remove own account
response = test_client.delete(f"/users/{test_user.id}", headers=headers)
assert response.status_code == 200, "Common user should can remove their own account"
# Logged admin can remove other user account
admin_headers = login_test_user(test_admin.id)
response = test_client.delete(f"/users/{test_user2.id}", headers=admin_headers)
assert response.status_code == 200, "Admin user should can remove other user account"
def test_login(test_client, test_user):
"""User login test"""
hashed_pass = generate_password_hash("testpass")
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
db.session.add(user)
db.session.commit()
response = test_client.post(
"/login",
data=json.dumps({"username": "testuser", "password": "wrongpass"}),
content_type="application/json",
)
assert response.status_code == 401 # User should not be logged - wrong password
assert response.status_code == 401, "User should not become logged if provided wrong password"
response = test_client.post(
"/login",
data=json.dumps({"username": "testuser", "password": "testpass"}),
content_type="application/json",
)
assert response.status_code == 200 # User should be logged - right password
assert response.status_code == 200, "User should become logged if provided right password"
def test_get_users(test_client):
def test_get_users(test_client, test_user, test_admin):
"""Get all users test"""
response = test_client.get("/users")
assert response.status_code == 401 # Anonymous cannot get all users data
assert response.status_code == 401, "Anonymous should cannot get all users data"
# Common user try to get all users data
hashed_pass = generate_password_hash("testpass")
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
headers = login_test_user(test_user.id)
response = test_client.get("/users", headers=headers)
assert response.status_code == 403 # Common user cannot get all users data
assert response.status_code == 403, "Common user should cannot get all users data"
# Admin user try to get all users data
hashed_pass = generate_password_hash("adminpass")
user = User(username="testadmin", email="testadmin@example.com", password=hashed_pass, role="Administrator")
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
headers = login_test_user(test_admin.id)
response = test_client.get("/users", headers=headers)
assert response.status_code == 200 # Admin user should can get all users data
assert response.status_code == 200, "Admin user should be able to get all users data"
def test_get_user_with_token(test_client):
def test_get_user_with_token(test_client, test_user, test_admin):
"""Test to get user data before and after auth using JWT token"""
admin_pass = generate_password_hash("admin_pass")
admin = User(username="admin", email="admin@example.com", password=admin_pass, role="Administrator")
db.session.add(admin)
db.session.commit()
response = test_client.get(f"/users/{test_admin.id}")
assert response.status_code == 401, "Anonymous should cannot get user data without login"
response = test_client.get(f"/users/{admin.id}")
assert response.status_code == 401 # Try to get user data without login
access_token = create_access_token(identity=str(admin.id))
admin_headers = {"Authorization": f"Bearer {access_token}"}
response = test_client.get(f"/users/{admin.id}", headers=admin_headers)
admin_headers = login_test_user(test_admin.id)
response = test_client.get(f"/users/{test_admin.id}", headers=admin_headers)
assert response.status_code == 200
data = response.get_json()
assert data["username"] == "admin"
assert data["username"] == "adminuser"
user_pass = generate_password_hash("test_pass")
user = User(username="testuser", email="test@example.com", password=user_pass, role="User")
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
response = test_client.get(f"/users/{user.id}", headers=headers)
assert response.status_code == 200 # Common user can get own user data
response = test_client.get(f"/users/{admin.id}", headers=headers)
assert response.status_code == 403 # Common user cannot get other user data
response = test_client.get(f"/users/{user.id}", headers=admin_headers)
assert response.status_code == 200 # Admin can access all user data
headers = login_test_user(test_user.id)
response = test_client.get(f"/users/{test_user.id}", headers=headers)
assert response.status_code == 200, "Common user should can get own user data"
response = test_client.get(f"/users/{test_admin.id}", headers=headers)
assert response.status_code == 403, "Common user should cannot get other user data"
response = test_client.get(f"/users/{test_user.id}", headers=admin_headers)
assert response.status_code == 200, "Admin should can access all user data"
def test_user_logout(test_client, test_user):
"""Test if logout works and JWT token is revoked"""
headers = login_test_user(test_user.id)
response = test_client.get(f"/logout", headers=headers)
assert response.status_code == 200, "Logged user should can logout"
response = test_client.get(f"/logout", headers=headers)
assert response.status_code == 401, "Token should be revoked after logout"

View File

@ -1,55 +0,0 @@
import json
from models import User, db
from flask_jwt_extended import create_access_token
from werkzeug.security import generate_password_hash
def test_create_user(test_client):
"""New user registration test"""
response = test_client.post(
"/users",
data=json.dumps({"username": "testuser", "email": "test@example.com", "password": "testpass", "role": "User"}),
content_type="application/json",
)
assert response.status_code == 201 # User should be created successfully
data = response.get_json()
assert data["username"] == "testuser"
def test_login(test_client):
"""User login test"""
hashed_pass = generate_password_hash("testpass")
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
db.session.add(user)
db.session.commit()
response = test_client.post(
"/login",
data=json.dumps({"username": "testuser", "password": "wrongpass"}),
content_type="application/json",
)
assert response.status_code == 401 # User should not be logged - wrong password
response = test_client.post(
"/login",
data=json.dumps({"username": "testuser", "password": "testpass"}),
content_type="application/json",
)
assert response.status_code == 200 # User should be logged - right password
def test_get_users(test_client):
"""Get all users test - JWT required"""
response = test_client.get("/users")
assert response.status_code == 401
def test_get_user_with_token(test_client):
"""Test to get user data after auth using JWT token"""
user = User(username="admin", email="admin@example.com", password="hashed_pass", role="Administrator")
print(user.id)
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
response = test_client.get(f"/users/{user.id}", headers=headers)
assert response.status_code == 200
data = response.get_json()
assert data["username"] == "admin"

View File

@ -2,23 +2,50 @@ from flask import abort
from flask_jwt_extended import get_jwt_identity
from models import User, db
import os
from sqlalchemy import text
from sqlalchemy.exc import DatabaseError, InterfaceError
import time
from werkzeug.security import generate_password_hash
db_ready = False
def admin_required(user_id, message='Access denied.'):
user = User.query.get(user_id)
"Check if common user try to make administrative action."
user = db.session.get(User, user_id)
if user is None or user.role != "Administrator":
abort(403, message)
def validate_access(owner_id, message='Access denied.'):
# Check if user try to access or edit resource that does not belong to them
"Check if user try to access or edit resource that does not belong to them."
logged_user_id = int(get_jwt_identity())
logged_user_role = User.query.get(logged_user_id).role
logged_user_role = db.session.get(User, logged_user_id).role
if logged_user_role != "Administrator" and logged_user_id != owner_id:
abort(403, message)
def get_user_or_404(user_id):
"Get user from database or abort 404"
user = db.session.get(User, user_id)
if user is None:
abort(404, "User not found")
return user
def wait_for_db(max_retries):
"Try to connect with database <max_retries> times."
global db_ready
for _ in range(max_retries):
try:
with db.engine.connect() as connection:
connection.execute(text("SELECT 1"))
db_ready = True
return
except DatabaseError | InterfaceError:
time.sleep(3)
raise Exception("Failed to connect to database.")
def init_db():
"""Create default admin account if database is empty"""
with db.session.begin():

View File

@ -1,7 +1,9 @@
from flask import Blueprint, jsonify, request, abort
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies
from models import User, db
from utils import admin_required, validate_access
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, \
verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies, get_jwt
from models import db, RevokedToken, User
import os
from utils import admin_required, validate_access, get_user_or_404
from werkzeug.security import check_password_hash, generate_password_hash
user_bp = Blueprint('user_bp', __name__)
@ -22,7 +24,7 @@ def get_all_users():
@jwt_required()
def get_user(user_id):
validate_access(user_id) # check if user tries to read other user account details
user = User.query.get_or_404(user_id)
user = get_user_or_404(user_id)
return jsonify(user.to_dict())
@ -56,7 +58,7 @@ def edit_user(user_id):
if request_fields != editable_fields:
abort(400, "Invalid request data structure.")
user_to_update = User.query.get_or_404(user_id)
user_to_update = get_user_or_404(user_id)
for field_name in editable_fields:
requested_value = request_data.get(field_name)
if requested_value is None:
@ -72,7 +74,7 @@ def edit_user(user_id):
@jwt_required()
def remove_user(user_id):
validate_access(user_id) # Only admin can remove other users accounts
user_to_delete = User.query.get_or_404(user_id)
user_to_delete = get_user_or_404(user_id)
db.session.delete(user_to_delete)
db.session.commit()
return jsonify({"msg": "User removed successfully."})
@ -102,6 +104,17 @@ def user_login():
@user_bp.route('/logout', methods=['GET'])
@jwt_required()
def user_logout():
jti = get_jwt()["jti"]
revoked_token = RevokedToken(jti=jti)
db.session.add(revoked_token)
db.session.commit()
response = jsonify({"msg": "User logged out successfully."})
unset_jwt_cookies(response)
return response
@user_bp.route('/version', methods=['GET'])
def version():
return jsonify({
"version": os.getenv("APP_VERSION", "unknown"),
"build_time": os.getenv("BUILD_DATE", "unknown")
})

View File

@ -7,9 +7,24 @@ services:
build: .
env_file:
- api/.env
ports:
- 80:80
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost/health"]
interval: 10s
timeout: 5s
retries: 5
start_period: 15s
db:
container_name: db
hostname: db
image: mysql:latest
env_file:
- db/.env
ports:
- 3306:3306
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
interval: 10s
timeout: 5s
retries: 5