Compare commits
41 Commits
69cd246946
...
woodpecker
Author | SHA1 | Date | |
---|---|---|---|
cefb8eba4d | |||
1e54ba614a | |||
7469609600 | |||
b7a1fcfe49 | |||
a9743ecfbe | |||
76f33f50f5 | |||
203a81573d | |||
383e906102 | |||
e7330f07ee | |||
c996be5953 | |||
bf60011948 | |||
82d5962020 | |||
1254b036f5 | |||
6d84bf694e | |||
c2df9d136e | |||
76a351710f | |||
c1f0da4a9c | |||
eefc952ff0 | |||
8c35b3bd8c | |||
60011b1c72 | |||
859a962c12 | |||
0e9df4f859 | |||
1554404657 | |||
925af7d314 | |||
fb260a0f6d | |||
dcd9a39b46 | |||
8194e3e9fe | |||
0006044ae4 | |||
74a58879ce | |||
d325a52222 | |||
546d26ada0 | |||
acf4b1c26c | |||
37e89b60af | |||
2632bdf994 | |||
8637eaa96f | |||
99dd5148b1 | |||
3ad5f0ca94 | |||
0891ba7d87 | |||
3552c251bf | |||
31480c5c55 | |||
90a7a8a72c |
49
.woodpecker/build.yaml
Normal file
49
.woodpecker/build.yaml
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
when:
|
||||||
|
- event: push
|
||||||
|
branch: woodpecker
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: code-tests
|
||||||
|
image: python:3.11.7-alpine
|
||||||
|
commands:
|
||||||
|
- cd api
|
||||||
|
- python3 -m venv env
|
||||||
|
- source env/bin/activate
|
||||||
|
- pip install -r requirements.txt pytest
|
||||||
|
- python3 -m pytest --junit-xml=pytest_junit.xml
|
||||||
|
|
||||||
|
- name: build
|
||||||
|
image: docker:dind
|
||||||
|
privileged: true
|
||||||
|
environment:
|
||||||
|
ACR_USERNAME: marcin00
|
||||||
|
ACR_PASSWORD:
|
||||||
|
from_secret: acr_password
|
||||||
|
commands:
|
||||||
|
- dockerd-entrypoint.sh &
|
||||||
|
- sleep 10 # czas na uruchomienie usługi Docker
|
||||||
|
|
||||||
|
- DOCKER_IMAGE=marcin00.azurecr.io/user-microservice:$CI_COMMIT_SHA
|
||||||
|
|
||||||
|
- echo "===> Building Docker image"
|
||||||
|
- docker build -t $DOCKER_IMAGE .
|
||||||
|
|
||||||
|
- echo "===> Installing bash"
|
||||||
|
- apk add --no-cache bash
|
||||||
|
|
||||||
|
- echo "===> Installing goss"
|
||||||
|
- wget https://github.com/aelsabbahy/goss/releases/latest/download/goss-linux-amd64 -O goss
|
||||||
|
- wget https://github.com/aelsabbahy/goss/releases/latest/download/dgoss -O dgoss
|
||||||
|
- chmod +x *goss
|
||||||
|
|
||||||
|
- echo "===> Starting container for test"
|
||||||
|
- export GOSS_OPTS="-f junit"
|
||||||
|
- export GOSS_PATH=./goss
|
||||||
|
- export GOSS_SLEEP=3
|
||||||
|
- "./dgoss run -e SQLALCHEMY_DATABASE_URI=sqlite:///:memory: $DOCKER_IMAGE > goss_junit.xml"
|
||||||
|
|
||||||
|
- echo "===> Logging in to ACR"
|
||||||
|
- echo "$ACR_PASSWORD" | docker login marcin00.azurecr.io -u $ACR_USERNAME --password-stdin
|
||||||
|
|
||||||
|
- echo "===> Pushing image to ACR"
|
||||||
|
- docker push $DOCKER_IMAGE
|
72
Jenkinsfile
vendored
Normal file
72
Jenkinsfile
vendored
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
pipeline {
|
||||||
|
agent any
|
||||||
|
environment {
|
||||||
|
DOCKER_REGISTRY_URL = 'marcin00.azurecr.io'
|
||||||
|
DOCKER_IMAGE = "${DOCKER_REGISTRY_URL}/user-microservice:${GIT_COMMIT}"
|
||||||
|
ACR_NAME = 'marcin00'
|
||||||
|
}
|
||||||
|
stages {
|
||||||
|
stage('Checkout') {
|
||||||
|
steps {
|
||||||
|
checkout scm
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stage('Test python app') {
|
||||||
|
steps {
|
||||||
|
script {
|
||||||
|
dir('api') {
|
||||||
|
sh '''
|
||||||
|
python3 -m venv env
|
||||||
|
source env/bin/activate
|
||||||
|
pip install -r requirements.txt pytest
|
||||||
|
python3 -m pytest --junit-xml=pytest_junit.xml
|
||||||
|
'''
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
post {
|
||||||
|
always {
|
||||||
|
junit testResults: '**/*pytest_junit.xml'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stage('Build & test docker image') {
|
||||||
|
steps {
|
||||||
|
script {
|
||||||
|
appImage = docker.build("${DOCKER_IMAGE}")
|
||||||
|
|
||||||
|
sh label: 'Install dgoss', script: '''
|
||||||
|
curl -s -L https://github.com/aelsabbahy/goss/releases/latest/download/goss-linux-amd64 -o goss
|
||||||
|
curl -s -L https://github.com/aelsabbahy/goss/releases/latest/download/dgoss -o dgoss
|
||||||
|
chmod +rx *goss
|
||||||
|
'''
|
||||||
|
|
||||||
|
withEnv(['GOSS_OPTS=-f junit', 'GOSS_PATH=./goss', 'GOSS_SLEEP=3', 'SQLALCHEMY_DATABASE_URI=sqlite:///:memory:']) {
|
||||||
|
sh label: 'run image tests', script: './dgoss run -e SQLALCHEMY_DATABASE_URI=sqlite:///:memory: ${DOCKER_IMAGE} > goss_junit.xml'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
post {
|
||||||
|
always {
|
||||||
|
junit testResults: '**/*goss_junit.xml'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stage('Deploy') {
|
||||||
|
steps {
|
||||||
|
script {
|
||||||
|
sh '''
|
||||||
|
az login --identity
|
||||||
|
az acr login --name ${ACR_NAME}
|
||||||
|
docker push ${DOCKER_IMAGE}
|
||||||
|
'''
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
post {
|
||||||
|
cleanup {
|
||||||
|
script { cleanWs() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
89
api/app.py
89
api/app.py
@ -2,47 +2,66 @@ from dotenv import load_dotenv
|
|||||||
from flask import Flask, jsonify
|
from flask import Flask, jsonify
|
||||||
from flask_jwt_extended import JWTManager
|
from flask_jwt_extended import JWTManager
|
||||||
from jwt import ExpiredSignatureError
|
from jwt import ExpiredSignatureError
|
||||||
from models import db
|
from models import db, RevokedToken
|
||||||
import os
|
import os
|
||||||
from task_views import task_bp
|
from utils import init_db
|
||||||
from user_views import user_bp, init_db
|
from views import user_bp
|
||||||
from werkzeug.exceptions import HTTPException
|
from werkzeug.exceptions import HTTPException
|
||||||
|
|
||||||
# App initialization
|
def create_app(config_name="default"):
|
||||||
load_dotenv()
|
"""Creates and returns a new instance of Flask app."""
|
||||||
app = Flask(__name__)
|
load_dotenv()
|
||||||
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('SQLALCHEMY_DATABASE_URI')
|
app = Flask(__name__)
|
||||||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
|
|
||||||
app.config['JWT_SECRET_KEY'] = os.getenv('JWT_SECRET_KEY', 'changeme')
|
|
||||||
|
|
||||||
# Blueprints registration
|
# Database settings
|
||||||
app.register_blueprint(user_bp)
|
if config_name == "testing":
|
||||||
app.register_blueprint(task_bp)
|
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:" # Database in memory
|
||||||
|
app.config["TESTING"] = True
|
||||||
|
else:
|
||||||
|
app.config["SQLALCHEMY_DATABASE_URI"] = os.getenv("SQLALCHEMY_DATABASE_URI")
|
||||||
|
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
||||||
|
|
||||||
# Database and JWT initialization
|
# JWT settings
|
||||||
db.init_app(app)
|
app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY", "changeme")
|
||||||
jwt = JWTManager(app)
|
|
||||||
|
|
||||||
# Global error handler
|
# Blueprints registration
|
||||||
@app.errorhandler(Exception)
|
app.register_blueprint(user_bp)
|
||||||
def global_error_handler(error):
|
|
||||||
if isinstance(error, HTTPException):
|
# Database and JWT initialization
|
||||||
response = jsonify({"error": error.description})
|
db.init_app(app)
|
||||||
response.status_code = error.code
|
jwt = JWTManager(app)
|
||||||
elif isinstance(error, ExpiredSignatureError):
|
|
||||||
response = jsonify({"error": "Token has expired"})
|
# Function to check if JWT token is revoked
|
||||||
response.status_code = 401
|
@jwt.token_in_blocklist_loader
|
||||||
else: # Wszystkie inne błędy
|
def check_if_token_revoked(jwt_header, jwt_payload):
|
||||||
response = jsonify({"error": str(error)})
|
token = db.session.get(RevokedToken, jwt_payload["jti"])
|
||||||
response.status_code = 500
|
return token is not None
|
||||||
return response
|
|
||||||
|
# Global error handler
|
||||||
|
@app.errorhandler(Exception)
|
||||||
|
def global_error_handler(error):
|
||||||
|
if isinstance(error, HTTPException):
|
||||||
|
response = jsonify({"error": error.description})
|
||||||
|
response.status_code = error.code
|
||||||
|
elif isinstance(error, ExpiredSignatureError):
|
||||||
|
response = jsonify({"error": "Token has expired"})
|
||||||
|
response.status_code = 401
|
||||||
|
else: # All other errors
|
||||||
|
response = jsonify({"error": str(error)})
|
||||||
|
response.status_code = 500
|
||||||
|
return response
|
||||||
|
|
||||||
|
# Fill database by initial values (only if we are not testing)
|
||||||
|
with app.app_context():
|
||||||
|
db.create_all()
|
||||||
|
if config_name != "testing":
|
||||||
|
init_db()
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
# Fill database by initial values
|
# Server start only if we run app directly
|
||||||
with app.app_context():
|
|
||||||
db.create_all()
|
|
||||||
init_db()
|
|
||||||
|
|
||||||
# Server start
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
app.run(host='0.0.0.0')
|
from waitress import serve
|
||||||
|
app = create_app()
|
||||||
|
port = os.getenv("APP_PORT", "80")
|
||||||
|
serve(app, host="0.0.0.0", port=port)
|
||||||
|
@ -16,23 +16,5 @@ class User(db.Model):
|
|||||||
def get_editable_fields():
|
def get_editable_fields():
|
||||||
return {"username", "email", "role", "password"}
|
return {"username", "email", "role", "password"}
|
||||||
|
|
||||||
class Task(db.Model):
|
class RevokedToken(db.Model):
|
||||||
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
|
jti = db.Column(db.String(100), primary_key=True)
|
||||||
title = db.Column(db.String(100), nullable=False)
|
|
||||||
description = db.Column(db.Text)
|
|
||||||
done = db.Column(db.Boolean, default=False)
|
|
||||||
due_date = db.Column(db.DateTime)
|
|
||||||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
|
||||||
|
|
||||||
def to_dict(self):
|
|
||||||
return {
|
|
||||||
"id": self.id,
|
|
||||||
"title": self.title,
|
|
||||||
"description": self.description,
|
|
||||||
"due_date": self.due_date,
|
|
||||||
"done": self.done,
|
|
||||||
}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get_editable_fields():
|
|
||||||
return {"title", "description", "due_date", "done"}
|
|
||||||
|
@ -11,4 +11,5 @@ mysql-connector-python==9.2.0
|
|||||||
python-dotenv==1.0.0
|
python-dotenv==1.0.0
|
||||||
SQLAlchemy==2.0.23
|
SQLAlchemy==2.0.23
|
||||||
typing_extensions==4.8.0
|
typing_extensions==4.8.0
|
||||||
|
waitress==3.0.2
|
||||||
Werkzeug==3.0.1
|
Werkzeug==3.0.1
|
||||||
|
@ -1,109 +0,0 @@
|
|||||||
from datetime import datetime
|
|
||||||
from flask import Blueprint, jsonify, request, abort
|
|
||||||
from flask_jwt_extended import jwt_required, get_jwt_identity
|
|
||||||
from models import Task, db
|
|
||||||
from user_views import admin_required, validate_access
|
|
||||||
|
|
||||||
task_bp = Blueprint('task_bp', __name__)
|
|
||||||
|
|
||||||
# ============================================================
|
|
||||||
# 🚀 1. API ENDPOINTS (ROUTES)
|
|
||||||
# ============================================================
|
|
||||||
|
|
||||||
@task_bp.route('/tasks', methods=['GET'])
|
|
||||||
@jwt_required()
|
|
||||||
def get_all_tasks():
|
|
||||||
admin_required(get_jwt_identity()) # only admin can get all tasks
|
|
||||||
tasks = Task.query.all()
|
|
||||||
return jsonify([task.to_dict() for task in tasks])
|
|
||||||
|
|
||||||
|
|
||||||
@task_bp.route('/tasks/<int:task_id>', methods=['GET'])
|
|
||||||
@jwt_required()
|
|
||||||
def get_task(task_id):
|
|
||||||
task = Task.query.get(task_id) # return task or None if task not found
|
|
||||||
check_if_task_exists(task)
|
|
||||||
return jsonify(task.to_dict())
|
|
||||||
|
|
||||||
|
|
||||||
@task_bp.route('/tasks/user/<int:user_id>', methods=['GET'])
|
|
||||||
@jwt_required()
|
|
||||||
def get_tasks_by_user(user_id):
|
|
||||||
validate_access(user_id)
|
|
||||||
tasks = Task.query.filter_by(user_id=user_id).all()
|
|
||||||
tasks = [task.to_dict() for task in tasks]
|
|
||||||
return jsonify(tasks)
|
|
||||||
|
|
||||||
|
|
||||||
@task_bp.route('/tasks', methods=['POST'])
|
|
||||||
@jwt_required()
|
|
||||||
def create_task():
|
|
||||||
data = request.get_json()
|
|
||||||
validate_task_data(data)
|
|
||||||
due_date = datetime.strptime(data['due_date'], '%d-%m-%Y %H:%M')
|
|
||||||
task = Task(title=data['title'], description=data['description'], due_date=due_date,
|
|
||||||
done=data['done'], user_id=get_jwt_identity())
|
|
||||||
|
|
||||||
db.session.add(task)
|
|
||||||
db.session.commit()
|
|
||||||
return jsonify(task.to_dict())
|
|
||||||
|
|
||||||
|
|
||||||
@task_bp.route('/tasks/<int:task_id>', methods=['PUT', 'PATCH'])
|
|
||||||
@jwt_required()
|
|
||||||
def update_task(task_id):
|
|
||||||
task = Task.query.get(task_id)
|
|
||||||
check_if_task_exists(task)
|
|
||||||
|
|
||||||
request_data = request.get_json()
|
|
||||||
validate_task_data(request_data)
|
|
||||||
request_fields = set(request_data.keys())
|
|
||||||
editable_fields = Task.get_editable_fields()
|
|
||||||
|
|
||||||
# PUT requires all values
|
|
||||||
if request.method == 'PUT':
|
|
||||||
if request_fields != editable_fields:
|
|
||||||
abort(400, "Invalid request data structure.")
|
|
||||||
|
|
||||||
for field_name in editable_fields:
|
|
||||||
requested_value = request_data.get(field_name)
|
|
||||||
if requested_value is None:
|
|
||||||
continue
|
|
||||||
new_value = datetime.strptime(requested_value, '%d-%m-%Y %H:%M') \
|
|
||||||
if field_name == 'due_date' else requested_value
|
|
||||||
setattr(task, field_name, new_value)
|
|
||||||
db.session.commit()
|
|
||||||
return jsonify(task.to_dict())
|
|
||||||
|
|
||||||
|
|
||||||
@task_bp.route('/tasks/<int:task_id>', methods=['DELETE'])
|
|
||||||
@jwt_required()
|
|
||||||
def delete_task(task_id):
|
|
||||||
task = Task.query.get(task_id)
|
|
||||||
check_if_task_exists(task)
|
|
||||||
db.session.delete(task)
|
|
||||||
db.session.commit()
|
|
||||||
return jsonify({})
|
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
|
||||||
# 🔧 2. UTILITIES
|
|
||||||
# ============================================================
|
|
||||||
|
|
||||||
def check_if_task_exists(task):
|
|
||||||
# Check if task exists or user has permissions to see it
|
|
||||||
if task is None:
|
|
||||||
abort(404, "Task not found.")
|
|
||||||
user_id = task.user_id
|
|
||||||
validate_access(user_id)
|
|
||||||
|
|
||||||
|
|
||||||
def validate_task_data(task):
|
|
||||||
due_date = task.get('due_date')
|
|
||||||
try:
|
|
||||||
datetime.strptime(due_date, '%d-%m-%Y %H:%M')
|
|
||||||
except ValueError:
|
|
||||||
abort(400, "Incorrect datetime format. Expected DD-MM-YYYY HH:MM")
|
|
||||||
done = task.get('done')
|
|
||||||
if done not in (0, 1):
|
|
||||||
abort(400, "Incorrect done field value. Expected 0 or 1")
|
|
47
api/tests/conftest.py
Normal file
47
api/tests/conftest.py
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
import pytest
|
||||||
|
from app import create_app
|
||||||
|
from flask_jwt_extended import create_access_token
|
||||||
|
from models import db, User
|
||||||
|
from werkzeug.security import generate_password_hash
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def test_client():
|
||||||
|
"""Creates a new instance of test app."""
|
||||||
|
app = create_app("testing")
|
||||||
|
|
||||||
|
with app.test_client() as client:
|
||||||
|
with app.app_context():
|
||||||
|
db.create_all()
|
||||||
|
yield client
|
||||||
|
db.session.remove()
|
||||||
|
db.drop_all()
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def test_user():
|
||||||
|
"""Create a new user for testing."""
|
||||||
|
user = User(username="testuser", email="test@example.com", password=generate_password_hash("testpass"), role="User")
|
||||||
|
db.session.add(user)
|
||||||
|
db.session.commit()
|
||||||
|
return user
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def test_user2():
|
||||||
|
"""Create a user nr 2 for testing."""
|
||||||
|
user2 = User(username="testuser2", email="test2@example.com", password=generate_password_hash("testpass2"), role="User")
|
||||||
|
db.session.add(user2)
|
||||||
|
db.session.commit()
|
||||||
|
return user2
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def test_admin():
|
||||||
|
"""Create a new admin user for testing."""
|
||||||
|
admin = User(username="adminuser", email="admin@example.com", password=generate_password_hash("adminpass"), role="Administrator")
|
||||||
|
db.session.add(admin)
|
||||||
|
db.session.commit()
|
||||||
|
return admin
|
||||||
|
|
||||||
|
def login_test_user(identity):
|
||||||
|
"""Return Bearer auth header for user identified by provided id"""
|
||||||
|
access_token = create_access_token(identity=str(identity))
|
||||||
|
auth_header = {"Authorization": f"Bearer {access_token}"}
|
||||||
|
return auth_header
|
132
api/tests/test_users.py
Normal file
132
api/tests/test_users.py
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
from conftest import login_test_user
|
||||||
|
import json
|
||||||
|
|
||||||
|
def test_create_user(test_client, test_user, test_admin):
|
||||||
|
"""New user registration test"""
|
||||||
|
|
||||||
|
# Anonymous try to create common user
|
||||||
|
test_user_data = {"username": "test", "email": "testemail@example.com", "password": "testpassword", "role": "User"}
|
||||||
|
response = test_client.post("/users", data=json.dumps(test_user_data), content_type="application/json")
|
||||||
|
assert response.status_code == 201, "Each should can register in the service"
|
||||||
|
data = response.get_json()
|
||||||
|
assert data["username"] == "test"
|
||||||
|
|
||||||
|
# Anonymous try to create admin user
|
||||||
|
admin_user_data = {"username": "testadmin", "email": "testadmin@example.com", "password": "adminpass", "role": "Administrator"}
|
||||||
|
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json")
|
||||||
|
assert response.status_code == 401, "Anonymous should cannot create admin users"
|
||||||
|
|
||||||
|
# Login common user and try to create admin user
|
||||||
|
headers = login_test_user(test_user.id)
|
||||||
|
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json", headers=headers)
|
||||||
|
assert response.status_code == 403, "Common user should cannot create admin users"
|
||||||
|
|
||||||
|
# Try to create admin user using admin account
|
||||||
|
headers = login_test_user(test_admin.id)
|
||||||
|
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json", headers=headers)
|
||||||
|
assert response.status_code == 201, "Logged administrators should can create new admin users"
|
||||||
|
|
||||||
|
def test_edit_user(test_client, test_user, test_admin):
|
||||||
|
"User edit test"
|
||||||
|
# Anonymous cannot edit any user
|
||||||
|
admin_data = test_admin.to_dict()
|
||||||
|
response = test_client.patch(f"/users/{test_admin.id}", data=json.dumps({"username": admin_data["username"], "password": "adminpass"}))
|
||||||
|
assert response.status_code == 401
|
||||||
|
|
||||||
|
# Login users (get dict with auth header and merge it with dict with rest of headers)
|
||||||
|
admin_headers = login_test_user(test_admin.id) | {"Content-Type": "application/json"}
|
||||||
|
user_headers = login_test_user(test_user.id) | {"Content-Type": "application/json"}
|
||||||
|
|
||||||
|
# Check if PUT request contains all editable fields
|
||||||
|
response = test_client.put(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=user_headers)
|
||||||
|
assert response.status_code == 400, "PUT request data must have all editable fields"
|
||||||
|
|
||||||
|
# Check if user can edit their own data
|
||||||
|
response = test_client.patch(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=user_headers)
|
||||||
|
assert response.status_code == 200, "Common user should can edit own account data"
|
||||||
|
|
||||||
|
# Check if user cannot edit other user data
|
||||||
|
response = test_client.patch(f"/users/{test_admin.id}", data=json.dumps({"username": admin_data["username"], "password": "adminpass"}), headers=user_headers)
|
||||||
|
assert response.status_code == 403, "Common user should cannot edit other user data"
|
||||||
|
|
||||||
|
# Check if admin can edit other user data
|
||||||
|
response = test_client.patch(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=admin_headers)
|
||||||
|
assert response.status_code == 200, "Admin user should can edit other user data"
|
||||||
|
|
||||||
|
def test_remove_user(test_client, test_user, test_user2, test_admin):
|
||||||
|
"User remove test"
|
||||||
|
# Anonymous try to remove user
|
||||||
|
response = test_client.delete(f"/users/{test_user.id}")
|
||||||
|
assert response.status_code == 401, "Anonymous should cannot remove user account"
|
||||||
|
|
||||||
|
# Logged user try to remove other user account
|
||||||
|
headers = login_test_user(test_user.id)
|
||||||
|
response = test_client.delete(f"/users/{test_admin.id}", headers=headers)
|
||||||
|
assert response.status_code == 403, "Common user should cannot remove other user account"
|
||||||
|
|
||||||
|
# Logged user try to remove own account
|
||||||
|
response = test_client.delete(f"/users/{test_user.id}", headers=headers)
|
||||||
|
assert response.status_code == 200, "Common user should can remove their own account"
|
||||||
|
|
||||||
|
# Logged admin can remove other user account
|
||||||
|
admin_headers = login_test_user(test_admin.id)
|
||||||
|
response = test_client.delete(f"/users/{test_user2.id}", headers=admin_headers)
|
||||||
|
assert response.status_code == 200, "Admin user should can remove other user account"
|
||||||
|
|
||||||
|
def test_login(test_client, test_user):
|
||||||
|
"""User login test"""
|
||||||
|
|
||||||
|
response = test_client.post(
|
||||||
|
"/login",
|
||||||
|
data=json.dumps({"username": "testuser", "password": "wrongpass"}),
|
||||||
|
content_type="application/json",
|
||||||
|
)
|
||||||
|
assert response.status_code == 401, "User should not become logged if provided wrong password"
|
||||||
|
response = test_client.post(
|
||||||
|
"/login",
|
||||||
|
data=json.dumps({"username": "testuser", "password": "testpass"}),
|
||||||
|
content_type="application/json",
|
||||||
|
)
|
||||||
|
assert response.status_code == 200, "User should become logged if provided right password"
|
||||||
|
|
||||||
|
def test_get_users(test_client, test_user, test_admin):
|
||||||
|
"""Get all users test"""
|
||||||
|
response = test_client.get("/users")
|
||||||
|
assert response.status_code == 401, "Anonymous should cannot get all users data"
|
||||||
|
|
||||||
|
# Common user try to get all users data
|
||||||
|
headers = login_test_user(test_user.id)
|
||||||
|
response = test_client.get("/users", headers=headers)
|
||||||
|
assert response.status_code == 403, "Common user should cannot get all users data"
|
||||||
|
|
||||||
|
# Admin user try to get all users data
|
||||||
|
headers = login_test_user(test_admin.id)
|
||||||
|
response = test_client.get("/users", headers=headers)
|
||||||
|
assert response.status_code == 200, "Admin user should be able to get all users data"
|
||||||
|
|
||||||
|
def test_get_user_with_token(test_client, test_user, test_admin):
|
||||||
|
"""Test to get user data before and after auth using JWT token"""
|
||||||
|
response = test_client.get(f"/users/{test_admin.id}")
|
||||||
|
assert response.status_code == 401, "Anonymous should cannot get user data without login"
|
||||||
|
|
||||||
|
admin_headers = login_test_user(test_admin.id)
|
||||||
|
response = test_client.get(f"/users/{test_admin.id}", headers=admin_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.get_json()
|
||||||
|
assert data["username"] == "adminuser"
|
||||||
|
|
||||||
|
headers = login_test_user(test_user.id)
|
||||||
|
response = test_client.get(f"/users/{test_user.id}", headers=headers)
|
||||||
|
assert response.status_code == 200, "Common user should can get own user data"
|
||||||
|
response = test_client.get(f"/users/{test_admin.id}", headers=headers)
|
||||||
|
assert response.status_code == 403, "Common user should cannot get other user data"
|
||||||
|
response = test_client.get(f"/users/{test_user.id}", headers=admin_headers)
|
||||||
|
assert response.status_code == 200, "Admin should can access all user data"
|
||||||
|
|
||||||
|
def test_user_logout(test_client, test_user):
|
||||||
|
"""Test if logout works and JWT token is revoked"""
|
||||||
|
headers = login_test_user(test_user.id)
|
||||||
|
response = test_client.get(f"/logout", headers=headers)
|
||||||
|
assert response.status_code == 200, "Logged user should can logout"
|
||||||
|
response = test_client.get(f"/logout", headers=headers)
|
||||||
|
assert response.status_code == 401, "Token should be revoked after logout"
|
40
api/utils.py
Normal file
40
api/utils.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
from flask import abort
|
||||||
|
from flask_jwt_extended import get_jwt_identity
|
||||||
|
from models import User, db
|
||||||
|
import os
|
||||||
|
from werkzeug.security import generate_password_hash
|
||||||
|
|
||||||
|
|
||||||
|
def admin_required(user_id, message='Access denied.'):
|
||||||
|
user = db.session.get(User, user_id)
|
||||||
|
if user is None or user.role != "Administrator":
|
||||||
|
abort(403, message)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_access(owner_id, message='Access denied.'):
|
||||||
|
# Check if user try to access or edit resource that does not belong to them
|
||||||
|
logged_user_id = int(get_jwt_identity())
|
||||||
|
logged_user_role = db.session.get(User, logged_user_id).role
|
||||||
|
if logged_user_role != "Administrator" and logged_user_id != owner_id:
|
||||||
|
abort(403, message)
|
||||||
|
|
||||||
|
|
||||||
|
def get_user_or_404(user_id):
|
||||||
|
"Get user from database or abort 404"
|
||||||
|
user = db.session.get(User, user_id)
|
||||||
|
if user is None:
|
||||||
|
abort(404, "User not found")
|
||||||
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
def init_db():
|
||||||
|
"""Create default admin account if database is empty"""
|
||||||
|
with db.session.begin():
|
||||||
|
if not User.query.first(): # Check if user table is empty
|
||||||
|
admin_username = os.getenv("ADMIN_USERNAME", "admin")
|
||||||
|
admin_email = os.getenv("ADMIN_EMAIL", "admin@example.pl")
|
||||||
|
admin_password = os.getenv("ADMIN_PASSWORD", "admin")
|
||||||
|
hashed_password = generate_password_hash(admin_password)
|
||||||
|
admin = User(username=admin_username, email=admin_email, password=hashed_password, role='Administrator')
|
||||||
|
db.session.add(admin)
|
||||||
|
db.session.commit()
|
@ -1,13 +1,14 @@
|
|||||||
from flask import Blueprint, jsonify, request, abort
|
from flask import Blueprint, jsonify, request, abort
|
||||||
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies
|
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, \
|
||||||
from models import User, db
|
verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies, get_jwt
|
||||||
import os
|
from models import db, RevokedToken, User
|
||||||
|
from utils import admin_required, validate_access, get_user_or_404
|
||||||
from werkzeug.security import check_password_hash, generate_password_hash
|
from werkzeug.security import check_password_hash, generate_password_hash
|
||||||
|
|
||||||
user_bp = Blueprint('user_bp', __name__)
|
user_bp = Blueprint('user_bp', __name__)
|
||||||
|
|
||||||
# ============================================================
|
# ============================================================
|
||||||
# 🚀 1. API ENDPOINTS (ROUTES)
|
# API ENDPOINTS (ROUTES)
|
||||||
# ============================================================
|
# ============================================================
|
||||||
|
|
||||||
@user_bp.route('/users', methods=['GET'])
|
@user_bp.route('/users', methods=['GET'])
|
||||||
@ -22,7 +23,7 @@ def get_all_users():
|
|||||||
@jwt_required()
|
@jwt_required()
|
||||||
def get_user(user_id):
|
def get_user(user_id):
|
||||||
validate_access(user_id) # check if user tries to read other user account details
|
validate_access(user_id) # check if user tries to read other user account details
|
||||||
user = User.query.get_or_404(user_id)
|
user = get_user_or_404(user_id)
|
||||||
return jsonify(user.to_dict())
|
return jsonify(user.to_dict())
|
||||||
|
|
||||||
|
|
||||||
@ -56,7 +57,7 @@ def edit_user(user_id):
|
|||||||
if request_fields != editable_fields:
|
if request_fields != editable_fields:
|
||||||
abort(400, "Invalid request data structure.")
|
abort(400, "Invalid request data structure.")
|
||||||
|
|
||||||
user_to_update = User.query.get_or_404(user_id)
|
user_to_update = get_user_or_404(user_id)
|
||||||
for field_name in editable_fields:
|
for field_name in editable_fields:
|
||||||
requested_value = request_data.get(field_name)
|
requested_value = request_data.get(field_name)
|
||||||
if requested_value is None:
|
if requested_value is None:
|
||||||
@ -72,7 +73,7 @@ def edit_user(user_id):
|
|||||||
@jwt_required()
|
@jwt_required()
|
||||||
def remove_user(user_id):
|
def remove_user(user_id):
|
||||||
validate_access(user_id) # Only admin can remove other users accounts
|
validate_access(user_id) # Only admin can remove other users accounts
|
||||||
user_to_delete = User.query.get_or_404(user_id)
|
user_to_delete = get_user_or_404(user_id)
|
||||||
db.session.delete(user_to_delete)
|
db.session.delete(user_to_delete)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return jsonify({"msg": "User removed successfully."})
|
return jsonify({"msg": "User removed successfully."})
|
||||||
@ -102,37 +103,10 @@ def user_login():
|
|||||||
@user_bp.route('/logout', methods=['GET'])
|
@user_bp.route('/logout', methods=['GET'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
def user_logout():
|
def user_logout():
|
||||||
|
jti = get_jwt()["jti"]
|
||||||
|
revoked_token = RevokedToken(jti=jti)
|
||||||
|
db.session.add(revoked_token)
|
||||||
|
db.session.commit()
|
||||||
response = jsonify({"msg": "User logged out successfully."})
|
response = jsonify({"msg": "User logged out successfully."})
|
||||||
unset_jwt_cookies(response)
|
unset_jwt_cookies(response)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
|
||||||
# 🔧 2. UTILITIES
|
|
||||||
# ============================================================
|
|
||||||
|
|
||||||
def admin_required(user_id, message='Access denied.'):
|
|
||||||
user = User.query.get(user_id)
|
|
||||||
if user is None or user.role != "Administrator":
|
|
||||||
abort(403, message)
|
|
||||||
|
|
||||||
|
|
||||||
def validate_access(owner_id, message='Access denied.'):
|
|
||||||
# Check if user try to access or edit resource that does not belong to them
|
|
||||||
logged_user_id = int(get_jwt_identity())
|
|
||||||
logged_user_role = User.query.get(logged_user_id).role
|
|
||||||
if logged_user_role != "Administrator" and logged_user_id != owner_id:
|
|
||||||
abort(403, message)
|
|
||||||
|
|
||||||
|
|
||||||
def init_db():
|
|
||||||
"""Create default admin account if database is empty"""
|
|
||||||
with db.session.begin():
|
|
||||||
if not User.query.first(): # Check if user table is empty
|
|
||||||
admin_username = os.getenv("TODOLIST_ADMIN_USERNAME", "admin")
|
|
||||||
admin_email = os.getenv("TODOLIST_ADMIN_EMAIL", "admin@example.pl")
|
|
||||||
admin_password = os.getenv("TODOLIST_ADMIN_PASSWORD", "admin")
|
|
||||||
hashed_password = generate_password_hash(admin_password)
|
|
||||||
admin = User(username=admin_username, email=admin_email, password=hashed_password, role='Administrator')
|
|
||||||
db.session.add(admin)
|
|
||||||
db.session.commit()
|
|
Reference in New Issue
Block a user