43 Commits

Author SHA1 Message Date
a79ae2d50f Apply new features from branch 'dev' into jenkins-pipeline 2025-06-12 18:42:28 +00:00
cd4ab3fd27 Handled more errors during db initialization 2025-06-12 18:42:07 +00:00
301cf5922e Changed docker image base to Alpine and added curl 2025-06-11 22:15:37 +00:00
479ec4f917 Added healthcheck 2025-06-11 22:04:35 +00:00
3f40a6126c Added more descriptions of functions 2025-06-11 20:04:04 +00:00
dd9e9ce110 Improved function body 2025-06-11 19:57:15 +00:00
d3d3c98f99 Moved wait_for_db function to utils module 2025-06-11 19:48:58 +00:00
9e010ed389 Implemented waiting for db readiness 2025-06-11 19:43:47 +00:00
d9fe927832 Removed deprecated label option from pipeline 2025-06-11 17:51:55 +00:00
636a382cf5 Deleted jenkins pipeline from main branch 2025-06-11 17:13:27 +00:00
99cfdfddd0 Added annotation required to run Sysbox on pod 2025-06-11 16:52:16 +00:00
f579e440f8 Updated path to pod template yaml file 2025-06-11 16:43:22 +00:00
ba69728c81 Changed pod runtime to sysbox 2025-06-11 16:32:12 +00:00
5366e313c5 Moved Jenkinsfile and pod agent template to separate directory 2025-06-11 16:29:03 +00:00
283be1a1ec Deleted Goss 2025-06-11 16:28:08 +00:00
1b7204c2ba Changed name of variable to store ACR name instead of ACR username 2025-06-10 20:17:25 +00:00
8f9aed299d Added managed identity client id 2025-06-10 19:32:40 +00:00
6522977280 Changed basic auth to managed identity 2025-06-10 18:50:37 +00:00
c707974a2e Corrected agent declaration in Jenkinsfile 2025-06-08 16:56:35 +00:00
cc2f224d60 Moved pod agent code from Jenkins master to YAML file in repo 2025-06-08 16:45:05 +00:00
b14e6cf873 Restored dind container usage 2025-06-07 21:44:24 +00:00
87e3c0df80 Removed Goss tests 2025-06-07 21:14:40 +00:00
aea09a6081 Added bash installation 2025-06-07 15:08:58 +00:00
d05cede409 The command curl is replaced with wget 2025-06-07 15:06:19 +00:00
17162027b6 Removed unnecessary post cleanup 2025-06-07 14:57:35 +00:00
8887f1b2bd Updated Jenkins pipeline to use in Kubernetes 2025-06-07 13:31:03 +00:00
76a351710f Added variable APP_PORT to customize application port 2025-05-04 16:42:43 +00:00
c1f0da4a9c Extended Goss sleep 2025-05-04 15:37:47 +00:00
eefc952ff0 Updated app port in Goss YAML config 2025-05-04 15:26:21 +00:00
8c35b3bd8c Changed development server to production 2025-05-04 15:23:05 +00:00
60011b1c72 Added GOSS_SLEEP flag to wait for container full start before tests 2025-05-04 11:03:12 +00:00
859a962c12 Corrected python command name in Goss config YAML 2025-05-04 11:02:09 +00:00
0e9df4f859 Corrected command to run tests in Goss 2025-05-04 10:14:36 +00:00
1554404657 Corrected port to check in Goss YAML config 2025-05-04 10:14:18 +00:00
925af7d314 Corrected commands to test python app 2025-05-04 06:55:10 +00:00
fb260a0f6d Corrected directory in jenkins pipeline 2025-05-03 20:21:02 +00:00
dcd9a39b46 Corrected shell commands in jenkins pipeline 2025-05-03 20:05:32 +00:00
8194e3e9fe Added Jenkins pipeline to test code and container 2025-05-03 19:47:27 +00:00
0006044ae4 Added goss tests 2025-05-03 19:45:40 +00:00
74a58879ce Refactored code responsible for finding user in database 2025-04-02 20:02:34 +00:00
d325a52222 Refactored test code 2025-04-02 19:28:20 +00:00
546d26ada0 Added test for user edit 2025-04-02 19:22:12 +00:00
acf4b1c26c Added test for user remove 2025-04-02 19:21:48 +00:00
11 changed files with 292 additions and 94 deletions

49
.jenkins/Jenkinsfile vendored Normal file
View File

@ -0,0 +1,49 @@
pipeline {
agent {
kubernetes {
yamlFile '.jenkins/podTemplate.yaml'
}
}
environment {
ACR_NAME = 'marcin00'
CLIENT_ID = 'c302726f-fafb-4143-94c1-67a70975574a'
DOCKER_REGISTRY_URL = 'marcin00.azurecr.io'
DOCKER_IMAGE = "${DOCKER_REGISTRY_URL}/user-microservice:${GIT_COMMIT}"
}
stages {
stage('Code Tests') {
steps {
container('python') {
dir('api') {
sh '''
python3 -m venv env
source env/bin/activate
pip install -r requirements.txt pytest
python3 -m pytest --junit-xml=pytest_junit.xml
'''
}
}
}
post {
always {
junit testResults: '**/*pytest_junit.xml'
}
}
}
stage('Build & Push Docker') {
steps {
container('docker') {
sh '''
docker build -t ${DOCKER_IMAGE} .
az login --identity --client-id ${CLIENT_ID}
az acr login --name ${ACR_NAME}
docker push ${DOCKER_IMAGE}
'''
}
}
}
}
}

50
.jenkins/podTemplate.yaml Normal file
View File

@ -0,0 +1,50 @@
apiVersion: v1
kind: Pod
metadata:
annotations:
io.kubernetes.cri-o.userns-mode: "auto:size=65536"
labels:
jenkins: "slave"
jenkins/label: "kubernetes-agent"
spec:
runtimeClassName: sysbox-runc
containers:
- name: jnlp
image: jenkins/inbound-agent:alpine
tty: false
workingDir: /home/jenkins/agent
volumeMounts:
- name: workspace-volume
mountPath: /home/jenkins/agent
env:
- name: JENKINS_WEB_SOCKET
value: "true"
- name: REMOTING_OPTS
value: "-noReconnectAfter 1d"
- name: python
image: python:3.11.7-alpine
command:
- cat
tty: true
workingDir: /home/jenkins/agent
volumeMounts:
- name: workspace-volume
mountPath: /home/jenkins/agent
- name: docker
image: marcin00.azurecr.io/azure-cli-docker:slim-bookworm
tty: true
workingDir: /home/jenkins/agent
volumeMounts:
- name: workspace-volume
mountPath: /home/jenkins/agent
nodeSelector:
kubernetes.io/os: linux
restartPolicy: Never
volumes:
- name: workspace-volume
emptyDir: {}

View File

@ -1,5 +1,6 @@
FROM python:3.11.7-slim-bookworm
FROM python:3.11.7-alpine
WORKDIR /app
COPY api .
RUN apk add --no-cache curl
RUN pip install -r requirements.txt
CMD python3 app.py

View File

@ -4,7 +4,8 @@ from flask_jwt_extended import JWTManager
from jwt import ExpiredSignatureError
from models import db, RevokedToken
import os
from utils import init_db
from tech_views import tech_bp
from utils import init_db, wait_for_db
from views import user_bp
from werkzeug.exceptions import HTTPException
@ -26,6 +27,7 @@ def create_app(config_name="default"):
# Blueprints registration
app.register_blueprint(user_bp)
app.register_blueprint(tech_bp)
# Database and JWT initialization
db.init_app(app)
@ -53,6 +55,7 @@ def create_app(config_name="default"):
# Fill database by initial values (only if we are not testing)
with app.app_context():
wait_for_db(max_retries=100)
db.create_all()
if config_name != "testing":
init_db()
@ -61,5 +64,7 @@ def create_app(config_name="default"):
# Server start only if we run app directly
if __name__ == "__main__":
from waitress import serve
app = create_app()
app.run(host="0.0.0.0")
port = os.getenv("APP_PORT", "80")
serve(app, host="0.0.0.0", port=port)

View File

@ -11,4 +11,5 @@ mysql-connector-python==9.2.0
python-dotenv==1.0.0
SQLAlchemy==2.0.23
typing_extensions==4.8.0
waitress==3.0.2
Werkzeug==3.0.1

20
api/tech_views.py Normal file
View File

@ -0,0 +1,20 @@
from flask import Blueprint, jsonify
from models import db
from sqlalchemy import text
from utils import db_ready
# Blueprint with technical endpoints
tech_bp = Blueprint('tech_bp', __name__)
@tech_bp.route('/health', methods=['GET'])
def health_check():
"Check if service works and database is functional"
try:
with db.engine.connect() as connection:
connection.execute(text("SELECT 1"))
return jsonify(status="healthy"), 200
except Exception:
if db_ready:
return jsonify(status="unhealthy"), 500
else:
return jsonify(status="starting"), 503

View File

@ -1,6 +1,8 @@
import pytest
from app import create_app
from models import db
from flask_jwt_extended import create_access_token
from models import db, User
from werkzeug.security import generate_password_hash
@pytest.fixture
def test_client():
@ -13,3 +15,33 @@ def test_client():
yield client
db.session.remove()
db.drop_all()
@pytest.fixture
def test_user():
"""Create a new user for testing."""
user = User(username="testuser", email="test@example.com", password=generate_password_hash("testpass"), role="User")
db.session.add(user)
db.session.commit()
return user
@pytest.fixture
def test_user2():
"""Create a user nr 2 for testing."""
user2 = User(username="testuser2", email="test2@example.com", password=generate_password_hash("testpass2"), role="User")
db.session.add(user2)
db.session.commit()
return user2
@pytest.fixture
def test_admin():
"""Create a new admin user for testing."""
admin = User(username="adminuser", email="admin@example.com", password=generate_password_hash("adminpass"), role="Administrator")
db.session.add(admin)
db.session.commit()
return admin
def login_test_user(identity):
"""Return Bearer auth header for user identified by provided id"""
access_token = create_access_token(identity=str(identity))
auth_header = {"Authorization": f"Bearer {access_token}"}
return auth_header

View File

@ -1,130 +1,132 @@
from conftest import login_test_user
import json
from models import User, db
from flask_jwt_extended import create_access_token
from werkzeug.security import generate_password_hash
def test_create_user(test_client):
def test_create_user(test_client, test_user, test_admin):
"""New user registration test"""
# Anonymous try to create common user
test_user_data = {"username": "testuser", "email": "test@example.com", "password": "testpass", "role": "User"}
test_user_data = {"username": "test", "email": "testemail@example.com", "password": "testpassword", "role": "User"}
response = test_client.post("/users", data=json.dumps(test_user_data), content_type="application/json")
assert response.status_code == 201 # User should be created successfully
assert response.status_code == 201, "Each should can register in the service"
data = response.get_json()
assert data["username"] == "testuser"
assert data["username"] == "test"
# Anonymous try to create admin user
admin_user_data = {"username": "testadmin", "email": "testadmin@example.com", "password": "adminpass", "role": "Administrator"}
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json")
assert response.status_code == 401 # Anonymous cannot create admin users
assert response.status_code == 401, "Anonymous should cannot create admin users"
# Login common user and try to create admin user
access_token = create_access_token(identity='1')
headers = {"Authorization": f"Bearer {access_token}"}
headers = login_test_user(test_user.id)
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json", headers=headers)
assert response.status_code == 403 # Common user cannot create admin users
assert response.status_code == 403, "Common user should cannot create admin users"
# Try to create admin user using admin account
hashed_pass = generate_password_hash("adminpass")
user = User(username="admin", email="admin@example.com", password=hashed_pass, role="Administrator")
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
headers = login_test_user(test_admin.id)
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json", headers=headers)
assert response.status_code == 201 # Logged administrators can create new admin users
assert response.status_code == 201, "Logged administrators should can create new admin users"
def test_edit_user(test_client, test_user, test_admin):
"User edit test"
# Anonymous cannot edit any user
admin_data = test_admin.to_dict()
response = test_client.patch(f"/users/{test_admin.id}", data=json.dumps({"username": admin_data["username"], "password": "adminpass"}))
assert response.status_code == 401
def test_login(test_client):
# Login users (get dict with auth header and merge it with dict with rest of headers)
admin_headers = login_test_user(test_admin.id) | {"Content-Type": "application/json"}
user_headers = login_test_user(test_user.id) | {"Content-Type": "application/json"}
# Check if PUT request contains all editable fields
response = test_client.put(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=user_headers)
assert response.status_code == 400, "PUT request data must have all editable fields"
# Check if user can edit their own data
response = test_client.patch(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=user_headers)
assert response.status_code == 200, "Common user should can edit own account data"
# Check if user cannot edit other user data
response = test_client.patch(f"/users/{test_admin.id}", data=json.dumps({"username": admin_data["username"], "password": "adminpass"}), headers=user_headers)
assert response.status_code == 403, "Common user should cannot edit other user data"
# Check if admin can edit other user data
response = test_client.patch(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=admin_headers)
assert response.status_code == 200, "Admin user should can edit other user data"
def test_remove_user(test_client, test_user, test_user2, test_admin):
"User remove test"
# Anonymous try to remove user
response = test_client.delete(f"/users/{test_user.id}")
assert response.status_code == 401, "Anonymous should cannot remove user account"
# Logged user try to remove other user account
headers = login_test_user(test_user.id)
response = test_client.delete(f"/users/{test_admin.id}", headers=headers)
assert response.status_code == 403, "Common user should cannot remove other user account"
# Logged user try to remove own account
response = test_client.delete(f"/users/{test_user.id}", headers=headers)
assert response.status_code == 200, "Common user should can remove their own account"
# Logged admin can remove other user account
admin_headers = login_test_user(test_admin.id)
response = test_client.delete(f"/users/{test_user2.id}", headers=admin_headers)
assert response.status_code == 200, "Admin user should can remove other user account"
def test_login(test_client, test_user):
"""User login test"""
hashed_pass = generate_password_hash("testpass")
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
db.session.add(user)
db.session.commit()
response = test_client.post(
"/login",
data=json.dumps({"username": "testuser", "password": "wrongpass"}),
content_type="application/json",
)
assert response.status_code == 401 # User should not be logged - wrong password
assert response.status_code == 401, "User should not become logged if provided wrong password"
response = test_client.post(
"/login",
data=json.dumps({"username": "testuser", "password": "testpass"}),
content_type="application/json",
)
assert response.status_code == 200 # User should be logged - right password
assert response.status_code == 200, "User should become logged if provided right password"
def test_get_users(test_client):
def test_get_users(test_client, test_user, test_admin):
"""Get all users test"""
response = test_client.get("/users")
assert response.status_code == 401 # Anonymous cannot get all users data
assert response.status_code == 401, "Anonymous should cannot get all users data"
# Common user try to get all users data
hashed_pass = generate_password_hash("testpass")
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
headers = login_test_user(test_user.id)
response = test_client.get("/users", headers=headers)
assert response.status_code == 403 # Common user cannot get all users data
assert response.status_code == 403, "Common user should cannot get all users data"
# Admin user try to get all users data
hashed_pass = generate_password_hash("adminpass")
user = User(username="testadmin", email="testadmin@example.com", password=hashed_pass, role="Administrator")
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
headers = login_test_user(test_admin.id)
response = test_client.get("/users", headers=headers)
assert response.status_code == 200 # Admin user should can get all users data
assert response.status_code == 200, "Admin user should be able to get all users data"
def test_get_user_with_token(test_client):
def test_get_user_with_token(test_client, test_user, test_admin):
"""Test to get user data before and after auth using JWT token"""
admin_pass = generate_password_hash("admin_pass")
admin = User(username="admin", email="admin@example.com", password=admin_pass, role="Administrator")
db.session.add(admin)
db.session.commit()
response = test_client.get(f"/users/{test_admin.id}")
assert response.status_code == 401, "Anonymous should cannot get user data without login"
response = test_client.get(f"/users/{admin.id}")
assert response.status_code == 401 # Try to get user data without login
access_token = create_access_token(identity=str(admin.id))
admin_headers = {"Authorization": f"Bearer {access_token}"}
response = test_client.get(f"/users/{admin.id}", headers=admin_headers)
admin_headers = login_test_user(test_admin.id)
response = test_client.get(f"/users/{test_admin.id}", headers=admin_headers)
assert response.status_code == 200
data = response.get_json()
assert data["username"] == "admin"
assert data["username"] == "adminuser"
user_pass = generate_password_hash("test_pass")
user = User(username="testuser", email="test@example.com", password=user_pass, role="User")
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
response = test_client.get(f"/users/{user.id}", headers=headers)
assert response.status_code == 200 # Common user can get own user data
response = test_client.get(f"/users/{admin.id}", headers=headers)
assert response.status_code == 403 # Common user cannot get other user data
response = test_client.get(f"/users/{user.id}", headers=admin_headers)
assert response.status_code == 200 # Admin can access all user data
def test_user_logout(test_client):
"""Test if logout work and JWT token is revoked"""
hashed_pass = generate_password_hash("testpass")
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
db.session.add(user)
db.session.commit()
access_token = create_access_token(identity=str(user.id))
headers = {"Authorization": f"Bearer {access_token}"}
headers = login_test_user(test_user.id)
response = test_client.get(f"/users/{test_user.id}", headers=headers)
assert response.status_code == 200, "Common user should can get own user data"
response = test_client.get(f"/users/{test_admin.id}", headers=headers)
assert response.status_code == 403, "Common user should cannot get other user data"
response = test_client.get(f"/users/{test_user.id}", headers=admin_headers)
assert response.status_code == 200, "Admin should can access all user data"
def test_user_logout(test_client, test_user):
"""Test if logout works and JWT token is revoked"""
headers = login_test_user(test_user.id)
response = test_client.get(f"/logout", headers=headers)
assert response.status_code == 200 # Logged user can logout
assert response.status_code == 200, "Logged user should can logout"
response = test_client.get(f"/logout", headers=headers)
assert response.status_code == 401 # Token should be revoked after logout
assert response.status_code == 401, "Token should be revoked after logout"

View File

@ -2,23 +2,50 @@ from flask import abort
from flask_jwt_extended import get_jwt_identity
from models import User, db
import os
from sqlalchemy import text
from sqlalchemy.exc import DatabaseError, InterfaceError
import time
from werkzeug.security import generate_password_hash
db_ready = False
def admin_required(user_id, message='Access denied.'):
"Check if common user try to make administrative action."
user = db.session.get(User, user_id)
if user is None or user.role != "Administrator":
abort(403, message)
def validate_access(owner_id, message='Access denied.'):
# Check if user try to access or edit resource that does not belong to them
"Check if user try to access or edit resource that does not belong to them."
logged_user_id = int(get_jwt_identity())
logged_user_role = db.session.get(User, logged_user_id).role
if logged_user_role != "Administrator" and logged_user_id != owner_id:
abort(403, message)
def get_user_or_404(user_id):
"Get user from database or abort 404"
user = db.session.get(User, user_id)
if user is None:
abort(404, "User not found")
return user
def wait_for_db(max_retries):
"Try to connect with database <max_retries> times."
global db_ready
for _ in range(max_retries):
try:
with db.engine.connect() as connection:
connection.execute(text("SELECT 1"))
db_ready = True
return
except DatabaseError | InterfaceError:
time.sleep(3)
raise Exception("Failed to connect to database.")
def init_db():
"""Create default admin account if database is empty"""
with db.session.begin():

View File

@ -2,7 +2,7 @@ from flask import Blueprint, jsonify, request, abort
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, \
verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies, get_jwt
from models import db, RevokedToken, User
from utils import admin_required, validate_access
from utils import admin_required, validate_access, get_user_or_404
from werkzeug.security import check_password_hash, generate_password_hash
user_bp = Blueprint('user_bp', __name__)
@ -23,9 +23,7 @@ def get_all_users():
@jwt_required()
def get_user(user_id):
validate_access(user_id) # check if user tries to read other user account details
user = db.session.get(User, user_id)
if user is None:
abort(404, "User not found.")
user = get_user_or_404(user_id)
return jsonify(user.to_dict())
@ -59,7 +57,7 @@ def edit_user(user_id):
if request_fields != editable_fields:
abort(400, "Invalid request data structure.")
user_to_update = User.query.get_or_404(user_id)
user_to_update = get_user_or_404(user_id)
for field_name in editable_fields:
requested_value = request_data.get(field_name)
if requested_value is None:
@ -75,9 +73,7 @@ def edit_user(user_id):
@jwt_required()
def remove_user(user_id):
validate_access(user_id) # Only admin can remove other users accounts
user_to_delete = db.session.get(User, user_id)
if user_to_delete is None:
abort(404, "User not found.")
user_to_delete = get_user_or_404(user_id)
db.session.delete(user_to_delete)
db.session.commit()
return jsonify({"msg": "User removed successfully."})

View File

@ -7,9 +7,24 @@ services:
build: .
env_file:
- api/.env
ports:
- 80:80
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost/health"]
interval: 10s
timeout: 5s
retries: 5
start_period: 15s
db:
container_name: db
hostname: db
image: mysql:latest
env_file:
- db/.env
ports:
- 3306:3306
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
interval: 10s
timeout: 5s
retries: 5