Compare commits
32 Commits
37e89b60af
...
woodpecker
Author | SHA1 | Date | |
---|---|---|---|
cefb8eba4d | |||
1e54ba614a | |||
7469609600 | |||
b7a1fcfe49 | |||
a9743ecfbe | |||
76f33f50f5 | |||
203a81573d | |||
383e906102 | |||
e7330f07ee | |||
c996be5953 | |||
bf60011948 | |||
82d5962020 | |||
1254b036f5 | |||
6d84bf694e | |||
c2df9d136e | |||
76a351710f | |||
c1f0da4a9c | |||
eefc952ff0 | |||
8c35b3bd8c | |||
60011b1c72 | |||
859a962c12 | |||
0e9df4f859 | |||
1554404657 | |||
925af7d314 | |||
fb260a0f6d | |||
dcd9a39b46 | |||
8194e3e9fe | |||
0006044ae4 | |||
74a58879ce | |||
d325a52222 | |||
546d26ada0 | |||
acf4b1c26c |
49
.woodpecker/build.yaml
Normal file
49
.woodpecker/build.yaml
Normal file
@ -0,0 +1,49 @@
|
||||
when:
|
||||
- event: push
|
||||
branch: woodpecker
|
||||
|
||||
steps:
|
||||
- name: code-tests
|
||||
image: python:3.11.7-alpine
|
||||
commands:
|
||||
- cd api
|
||||
- python3 -m venv env
|
||||
- source env/bin/activate
|
||||
- pip install -r requirements.txt pytest
|
||||
- python3 -m pytest --junit-xml=pytest_junit.xml
|
||||
|
||||
- name: build
|
||||
image: docker:dind
|
||||
privileged: true
|
||||
environment:
|
||||
ACR_USERNAME: marcin00
|
||||
ACR_PASSWORD:
|
||||
from_secret: acr_password
|
||||
commands:
|
||||
- dockerd-entrypoint.sh &
|
||||
- sleep 10 # czas na uruchomienie usługi Docker
|
||||
|
||||
- DOCKER_IMAGE=marcin00.azurecr.io/user-microservice:$CI_COMMIT_SHA
|
||||
|
||||
- echo "===> Building Docker image"
|
||||
- docker build -t $DOCKER_IMAGE .
|
||||
|
||||
- echo "===> Installing bash"
|
||||
- apk add --no-cache bash
|
||||
|
||||
- echo "===> Installing goss"
|
||||
- wget https://github.com/aelsabbahy/goss/releases/latest/download/goss-linux-amd64 -O goss
|
||||
- wget https://github.com/aelsabbahy/goss/releases/latest/download/dgoss -O dgoss
|
||||
- chmod +x *goss
|
||||
|
||||
- echo "===> Starting container for test"
|
||||
- export GOSS_OPTS="-f junit"
|
||||
- export GOSS_PATH=./goss
|
||||
- export GOSS_SLEEP=3
|
||||
- "./dgoss run -e SQLALCHEMY_DATABASE_URI=sqlite:///:memory: $DOCKER_IMAGE > goss_junit.xml"
|
||||
|
||||
- echo "===> Logging in to ACR"
|
||||
- echo "$ACR_PASSWORD" | docker login marcin00.azurecr.io -u $ACR_USERNAME --password-stdin
|
||||
|
||||
- echo "===> Pushing image to ACR"
|
||||
- docker push $DOCKER_IMAGE
|
72
Jenkinsfile
vendored
Normal file
72
Jenkinsfile
vendored
Normal file
@ -0,0 +1,72 @@
|
||||
pipeline {
|
||||
agent any
|
||||
environment {
|
||||
DOCKER_REGISTRY_URL = 'marcin00.azurecr.io'
|
||||
DOCKER_IMAGE = "${DOCKER_REGISTRY_URL}/user-microservice:${GIT_COMMIT}"
|
||||
ACR_NAME = 'marcin00'
|
||||
}
|
||||
stages {
|
||||
stage('Checkout') {
|
||||
steps {
|
||||
checkout scm
|
||||
}
|
||||
}
|
||||
stage('Test python app') {
|
||||
steps {
|
||||
script {
|
||||
dir('api') {
|
||||
sh '''
|
||||
python3 -m venv env
|
||||
source env/bin/activate
|
||||
pip install -r requirements.txt pytest
|
||||
python3 -m pytest --junit-xml=pytest_junit.xml
|
||||
'''
|
||||
}
|
||||
}
|
||||
}
|
||||
post {
|
||||
always {
|
||||
junit testResults: '**/*pytest_junit.xml'
|
||||
}
|
||||
}
|
||||
}
|
||||
stage('Build & test docker image') {
|
||||
steps {
|
||||
script {
|
||||
appImage = docker.build("${DOCKER_IMAGE}")
|
||||
|
||||
sh label: 'Install dgoss', script: '''
|
||||
curl -s -L https://github.com/aelsabbahy/goss/releases/latest/download/goss-linux-amd64 -o goss
|
||||
curl -s -L https://github.com/aelsabbahy/goss/releases/latest/download/dgoss -o dgoss
|
||||
chmod +rx *goss
|
||||
'''
|
||||
|
||||
withEnv(['GOSS_OPTS=-f junit', 'GOSS_PATH=./goss', 'GOSS_SLEEP=3', 'SQLALCHEMY_DATABASE_URI=sqlite:///:memory:']) {
|
||||
sh label: 'run image tests', script: './dgoss run -e SQLALCHEMY_DATABASE_URI=sqlite:///:memory: ${DOCKER_IMAGE} > goss_junit.xml'
|
||||
}
|
||||
}
|
||||
}
|
||||
post {
|
||||
always {
|
||||
junit testResults: '**/*goss_junit.xml'
|
||||
}
|
||||
}
|
||||
}
|
||||
stage('Deploy') {
|
||||
steps {
|
||||
script {
|
||||
sh '''
|
||||
az login --identity
|
||||
az acr login --name ${ACR_NAME}
|
||||
docker push ${DOCKER_IMAGE}
|
||||
'''
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
post {
|
||||
cleanup {
|
||||
script { cleanWs() }
|
||||
}
|
||||
}
|
||||
}
|
@ -61,5 +61,7 @@ def create_app(config_name="default"):
|
||||
|
||||
# Server start only if we run app directly
|
||||
if __name__ == "__main__":
|
||||
from waitress import serve
|
||||
app = create_app()
|
||||
app.run(host="0.0.0.0")
|
||||
port = os.getenv("APP_PORT", "80")
|
||||
serve(app, host="0.0.0.0", port=port)
|
||||
|
@ -11,4 +11,5 @@ mysql-connector-python==9.2.0
|
||||
python-dotenv==1.0.0
|
||||
SQLAlchemy==2.0.23
|
||||
typing_extensions==4.8.0
|
||||
waitress==3.0.2
|
||||
Werkzeug==3.0.1
|
||||
|
@ -1,6 +1,8 @@
|
||||
import pytest
|
||||
from app import create_app
|
||||
from models import db
|
||||
from flask_jwt_extended import create_access_token
|
||||
from models import db, User
|
||||
from werkzeug.security import generate_password_hash
|
||||
|
||||
@pytest.fixture
|
||||
def test_client():
|
||||
@ -13,3 +15,33 @@ def test_client():
|
||||
yield client
|
||||
db.session.remove()
|
||||
db.drop_all()
|
||||
|
||||
@pytest.fixture
|
||||
def test_user():
|
||||
"""Create a new user for testing."""
|
||||
user = User(username="testuser", email="test@example.com", password=generate_password_hash("testpass"), role="User")
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
return user
|
||||
|
||||
@pytest.fixture
|
||||
def test_user2():
|
||||
"""Create a user nr 2 for testing."""
|
||||
user2 = User(username="testuser2", email="test2@example.com", password=generate_password_hash("testpass2"), role="User")
|
||||
db.session.add(user2)
|
||||
db.session.commit()
|
||||
return user2
|
||||
|
||||
@pytest.fixture
|
||||
def test_admin():
|
||||
"""Create a new admin user for testing."""
|
||||
admin = User(username="adminuser", email="admin@example.com", password=generate_password_hash("adminpass"), role="Administrator")
|
||||
db.session.add(admin)
|
||||
db.session.commit()
|
||||
return admin
|
||||
|
||||
def login_test_user(identity):
|
||||
"""Return Bearer auth header for user identified by provided id"""
|
||||
access_token = create_access_token(identity=str(identity))
|
||||
auth_header = {"Authorization": f"Bearer {access_token}"}
|
||||
return auth_header
|
||||
|
@ -1,130 +1,132 @@
|
||||
from conftest import login_test_user
|
||||
import json
|
||||
from models import User, db
|
||||
from flask_jwt_extended import create_access_token
|
||||
from werkzeug.security import generate_password_hash
|
||||
|
||||
def test_create_user(test_client):
|
||||
def test_create_user(test_client, test_user, test_admin):
|
||||
"""New user registration test"""
|
||||
|
||||
# Anonymous try to create common user
|
||||
test_user_data = {"username": "testuser", "email": "test@example.com", "password": "testpass", "role": "User"}
|
||||
test_user_data = {"username": "test", "email": "testemail@example.com", "password": "testpassword", "role": "User"}
|
||||
response = test_client.post("/users", data=json.dumps(test_user_data), content_type="application/json")
|
||||
assert response.status_code == 201 # User should be created successfully
|
||||
assert response.status_code == 201, "Each should can register in the service"
|
||||
data = response.get_json()
|
||||
assert data["username"] == "testuser"
|
||||
assert data["username"] == "test"
|
||||
|
||||
# Anonymous try to create admin user
|
||||
admin_user_data = {"username": "testadmin", "email": "testadmin@example.com", "password": "adminpass", "role": "Administrator"}
|
||||
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json")
|
||||
assert response.status_code == 401 # Anonymous cannot create admin users
|
||||
assert response.status_code == 401, "Anonymous should cannot create admin users"
|
||||
|
||||
# Login common user and try to create admin user
|
||||
access_token = create_access_token(identity='1')
|
||||
headers = {"Authorization": f"Bearer {access_token}"}
|
||||
headers = login_test_user(test_user.id)
|
||||
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json", headers=headers)
|
||||
assert response.status_code == 403 # Common user cannot create admin users
|
||||
assert response.status_code == 403, "Common user should cannot create admin users"
|
||||
|
||||
# Try to create admin user using admin account
|
||||
hashed_pass = generate_password_hash("adminpass")
|
||||
user = User(username="admin", email="admin@example.com", password=hashed_pass, role="Administrator")
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
access_token = create_access_token(identity=str(user.id))
|
||||
headers = {"Authorization": f"Bearer {access_token}"}
|
||||
headers = login_test_user(test_admin.id)
|
||||
response = test_client.post("/users", data=json.dumps(admin_user_data), content_type="application/json", headers=headers)
|
||||
assert response.status_code == 201 # Logged administrators can create new admin users
|
||||
assert response.status_code == 201, "Logged administrators should can create new admin users"
|
||||
|
||||
def test_edit_user(test_client, test_user, test_admin):
|
||||
"User edit test"
|
||||
# Anonymous cannot edit any user
|
||||
admin_data = test_admin.to_dict()
|
||||
response = test_client.patch(f"/users/{test_admin.id}", data=json.dumps({"username": admin_data["username"], "password": "adminpass"}))
|
||||
assert response.status_code == 401
|
||||
|
||||
def test_login(test_client):
|
||||
# Login users (get dict with auth header and merge it with dict with rest of headers)
|
||||
admin_headers = login_test_user(test_admin.id) | {"Content-Type": "application/json"}
|
||||
user_headers = login_test_user(test_user.id) | {"Content-Type": "application/json"}
|
||||
|
||||
# Check if PUT request contains all editable fields
|
||||
response = test_client.put(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=user_headers)
|
||||
assert response.status_code == 400, "PUT request data must have all editable fields"
|
||||
|
||||
# Check if user can edit their own data
|
||||
response = test_client.patch(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=user_headers)
|
||||
assert response.status_code == 200, "Common user should can edit own account data"
|
||||
|
||||
# Check if user cannot edit other user data
|
||||
response = test_client.patch(f"/users/{test_admin.id}", data=json.dumps({"username": admin_data["username"], "password": "adminpass"}), headers=user_headers)
|
||||
assert response.status_code == 403, "Common user should cannot edit other user data"
|
||||
|
||||
# Check if admin can edit other user data
|
||||
response = test_client.patch(f"/users/{test_user.id}", data=json.dumps({"username": test_user.username, "password": "testpass"}), headers=admin_headers)
|
||||
assert response.status_code == 200, "Admin user should can edit other user data"
|
||||
|
||||
def test_remove_user(test_client, test_user, test_user2, test_admin):
|
||||
"User remove test"
|
||||
# Anonymous try to remove user
|
||||
response = test_client.delete(f"/users/{test_user.id}")
|
||||
assert response.status_code == 401, "Anonymous should cannot remove user account"
|
||||
|
||||
# Logged user try to remove other user account
|
||||
headers = login_test_user(test_user.id)
|
||||
response = test_client.delete(f"/users/{test_admin.id}", headers=headers)
|
||||
assert response.status_code == 403, "Common user should cannot remove other user account"
|
||||
|
||||
# Logged user try to remove own account
|
||||
response = test_client.delete(f"/users/{test_user.id}", headers=headers)
|
||||
assert response.status_code == 200, "Common user should can remove their own account"
|
||||
|
||||
# Logged admin can remove other user account
|
||||
admin_headers = login_test_user(test_admin.id)
|
||||
response = test_client.delete(f"/users/{test_user2.id}", headers=admin_headers)
|
||||
assert response.status_code == 200, "Admin user should can remove other user account"
|
||||
|
||||
def test_login(test_client, test_user):
|
||||
"""User login test"""
|
||||
hashed_pass = generate_password_hash("testpass")
|
||||
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
|
||||
response = test_client.post(
|
||||
"/login",
|
||||
data=json.dumps({"username": "testuser", "password": "wrongpass"}),
|
||||
content_type="application/json",
|
||||
)
|
||||
assert response.status_code == 401 # User should not be logged - wrong password
|
||||
assert response.status_code == 401, "User should not become logged if provided wrong password"
|
||||
response = test_client.post(
|
||||
"/login",
|
||||
data=json.dumps({"username": "testuser", "password": "testpass"}),
|
||||
content_type="application/json",
|
||||
)
|
||||
assert response.status_code == 200 # User should be logged - right password
|
||||
assert response.status_code == 200, "User should become logged if provided right password"
|
||||
|
||||
|
||||
def test_get_users(test_client):
|
||||
def test_get_users(test_client, test_user, test_admin):
|
||||
"""Get all users test"""
|
||||
response = test_client.get("/users")
|
||||
assert response.status_code == 401 # Anonymous cannot get all users data
|
||||
assert response.status_code == 401, "Anonymous should cannot get all users data"
|
||||
|
||||
# Common user try to get all users data
|
||||
hashed_pass = generate_password_hash("testpass")
|
||||
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
access_token = create_access_token(identity=str(user.id))
|
||||
headers = {"Authorization": f"Bearer {access_token}"}
|
||||
headers = login_test_user(test_user.id)
|
||||
response = test_client.get("/users", headers=headers)
|
||||
assert response.status_code == 403 # Common user cannot get all users data
|
||||
assert response.status_code == 403, "Common user should cannot get all users data"
|
||||
|
||||
# Admin user try to get all users data
|
||||
hashed_pass = generate_password_hash("adminpass")
|
||||
user = User(username="testadmin", email="testadmin@example.com", password=hashed_pass, role="Administrator")
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
access_token = create_access_token(identity=str(user.id))
|
||||
headers = {"Authorization": f"Bearer {access_token}"}
|
||||
headers = login_test_user(test_admin.id)
|
||||
response = test_client.get("/users", headers=headers)
|
||||
assert response.status_code == 200 # Admin user should can get all users data
|
||||
assert response.status_code == 200, "Admin user should be able to get all users data"
|
||||
|
||||
|
||||
def test_get_user_with_token(test_client):
|
||||
def test_get_user_with_token(test_client, test_user, test_admin):
|
||||
"""Test to get user data before and after auth using JWT token"""
|
||||
admin_pass = generate_password_hash("admin_pass")
|
||||
admin = User(username="admin", email="admin@example.com", password=admin_pass, role="Administrator")
|
||||
db.session.add(admin)
|
||||
db.session.commit()
|
||||
response = test_client.get(f"/users/{test_admin.id}")
|
||||
assert response.status_code == 401, "Anonymous should cannot get user data without login"
|
||||
|
||||
response = test_client.get(f"/users/{admin.id}")
|
||||
assert response.status_code == 401 # Try to get user data without login
|
||||
|
||||
access_token = create_access_token(identity=str(admin.id))
|
||||
admin_headers = {"Authorization": f"Bearer {access_token}"}
|
||||
|
||||
response = test_client.get(f"/users/{admin.id}", headers=admin_headers)
|
||||
admin_headers = login_test_user(test_admin.id)
|
||||
response = test_client.get(f"/users/{test_admin.id}", headers=admin_headers)
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
assert data["username"] == "admin"
|
||||
assert data["username"] == "adminuser"
|
||||
|
||||
user_pass = generate_password_hash("test_pass")
|
||||
user = User(username="testuser", email="test@example.com", password=user_pass, role="User")
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
access_token = create_access_token(identity=str(user.id))
|
||||
headers = {"Authorization": f"Bearer {access_token}"}
|
||||
response = test_client.get(f"/users/{user.id}", headers=headers)
|
||||
assert response.status_code == 200 # Common user can get own user data
|
||||
response = test_client.get(f"/users/{admin.id}", headers=headers)
|
||||
assert response.status_code == 403 # Common user cannot get other user data
|
||||
response = test_client.get(f"/users/{user.id}", headers=admin_headers)
|
||||
assert response.status_code == 200 # Admin can access all user data
|
||||
|
||||
|
||||
def test_user_logout(test_client):
|
||||
"""Test if logout work and JWT token is revoked"""
|
||||
hashed_pass = generate_password_hash("testpass")
|
||||
user = User(username="testuser", email="test@example.com", password=hashed_pass, role="User")
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
|
||||
access_token = create_access_token(identity=str(user.id))
|
||||
headers = {"Authorization": f"Bearer {access_token}"}
|
||||
headers = login_test_user(test_user.id)
|
||||
response = test_client.get(f"/users/{test_user.id}", headers=headers)
|
||||
assert response.status_code == 200, "Common user should can get own user data"
|
||||
response = test_client.get(f"/users/{test_admin.id}", headers=headers)
|
||||
assert response.status_code == 403, "Common user should cannot get other user data"
|
||||
response = test_client.get(f"/users/{test_user.id}", headers=admin_headers)
|
||||
assert response.status_code == 200, "Admin should can access all user data"
|
||||
|
||||
def test_user_logout(test_client, test_user):
|
||||
"""Test if logout works and JWT token is revoked"""
|
||||
headers = login_test_user(test_user.id)
|
||||
response = test_client.get(f"/logout", headers=headers)
|
||||
assert response.status_code == 200 # Logged user can logout
|
||||
assert response.status_code == 200, "Logged user should can logout"
|
||||
response = test_client.get(f"/logout", headers=headers)
|
||||
assert response.status_code == 401 # Token should be revoked after logout
|
||||
assert response.status_code == 401, "Token should be revoked after logout"
|
||||
|
@ -19,6 +19,14 @@ def validate_access(owner_id, message='Access denied.'):
|
||||
abort(403, message)
|
||||
|
||||
|
||||
def get_user_or_404(user_id):
|
||||
"Get user from database or abort 404"
|
||||
user = db.session.get(User, user_id)
|
||||
if user is None:
|
||||
abort(404, "User not found")
|
||||
return user
|
||||
|
||||
|
||||
def init_db():
|
||||
"""Create default admin account if database is empty"""
|
||||
with db.session.begin():
|
||||
|
12
api/views.py
12
api/views.py
@ -2,7 +2,7 @@ from flask import Blueprint, jsonify, request, abort
|
||||
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, \
|
||||
verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies, get_jwt
|
||||
from models import db, RevokedToken, User
|
||||
from utils import admin_required, validate_access
|
||||
from utils import admin_required, validate_access, get_user_or_404
|
||||
from werkzeug.security import check_password_hash, generate_password_hash
|
||||
|
||||
user_bp = Blueprint('user_bp', __name__)
|
||||
@ -23,9 +23,7 @@ def get_all_users():
|
||||
@jwt_required()
|
||||
def get_user(user_id):
|
||||
validate_access(user_id) # check if user tries to read other user account details
|
||||
user = db.session.get(User, user_id)
|
||||
if user is None:
|
||||
abort(404, "User not found.")
|
||||
user = get_user_or_404(user_id)
|
||||
return jsonify(user.to_dict())
|
||||
|
||||
|
||||
@ -59,7 +57,7 @@ def edit_user(user_id):
|
||||
if request_fields != editable_fields:
|
||||
abort(400, "Invalid request data structure.")
|
||||
|
||||
user_to_update = User.query.get_or_404(user_id)
|
||||
user_to_update = get_user_or_404(user_id)
|
||||
for field_name in editable_fields:
|
||||
requested_value = request_data.get(field_name)
|
||||
if requested_value is None:
|
||||
@ -75,9 +73,7 @@ def edit_user(user_id):
|
||||
@jwt_required()
|
||||
def remove_user(user_id):
|
||||
validate_access(user_id) # Only admin can remove other users accounts
|
||||
user_to_delete = db.session.get(User, user_id)
|
||||
if user_to_delete is None:
|
||||
abort(404, "User not found.")
|
||||
user_to_delete = get_user_or_404(user_id)
|
||||
db.session.delete(user_to_delete)
|
||||
db.session.commit()
|
||||
return jsonify({"msg": "User removed successfully."})
|
||||
|
Reference in New Issue
Block a user