120 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			120 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from flask import Blueprint, jsonify, request, abort
 | 
						|
from flask_jwt_extended import create_access_token, set_access_cookies, jwt_required, \
 | 
						|
verify_jwt_in_request, get_jwt_identity, unset_jwt_cookies, get_jwt
 | 
						|
from models import db, RevokedToken, User
 | 
						|
import os
 | 
						|
from utils import admin_required, validate_access, get_user_or_404
 | 
						|
from werkzeug.security import check_password_hash, generate_password_hash
 | 
						|
 | 
						|
user_bp = Blueprint('user_bp', __name__)
 | 
						|
 | 
						|
# ============================================================
 | 
						|
# API ENDPOINTS (ROUTES)
 | 
						|
# ============================================================
 | 
						|
 | 
						|
@user_bp.route('/users', methods=['GET'])
 | 
						|
@jwt_required()
 | 
						|
def get_all_users():
 | 
						|
    admin_required(get_jwt_identity()) # only admin can get all users details
 | 
						|
    users = User.query.all()
 | 
						|
    return jsonify([user.to_dict() for user in users])
 | 
						|
 | 
						|
 | 
						|
@user_bp.route('/users/<int:user_id>', methods=['GET'])
 | 
						|
@jwt_required()
 | 
						|
def get_user(user_id):
 | 
						|
    validate_access(user_id) # check if user tries to read other user account details
 | 
						|
    user = get_user_or_404(user_id)
 | 
						|
    return jsonify(user.to_dict())
 | 
						|
 | 
						|
 | 
						|
@user_bp.route('/users', methods=['POST'])
 | 
						|
def create_user():
 | 
						|
    data = request.get_json()
 | 
						|
    new_user_role = data['role']
 | 
						|
    if new_user_role == "Administrator":
 | 
						|
        verify_jwt_in_request()
 | 
						|
        admin_required(get_jwt_identity(), message="Access denied. Only administrators can create admin accounts.")
 | 
						|
    hashed_password = generate_password_hash(data['password'])
 | 
						|
    user = User(username=data['username'], email=data['email'], password=hashed_password, role=new_user_role)
 | 
						|
    db.session.add(user)
 | 
						|
    db.session.commit()
 | 
						|
    return jsonify(user.to_dict()), 201
 | 
						|
 | 
						|
 | 
						|
@user_bp.route('/users/<int:user_id>', methods=['PUT', 'PATCH'])
 | 
						|
@jwt_required()
 | 
						|
def edit_user(user_id):
 | 
						|
    validate_access(user_id) # check if user tries to edit other user account
 | 
						|
    request_data = request.get_json()
 | 
						|
    if request_data.get('role') == 'Administrator':
 | 
						|
        admin_required(get_jwt_identity())
 | 
						|
 | 
						|
    request_fields = set(request_data.keys())
 | 
						|
    editable_fields = User.get_editable_fields()
 | 
						|
    
 | 
						|
    # PUT requires all values
 | 
						|
    if request.method == 'PUT':
 | 
						|
        if request_fields != editable_fields:
 | 
						|
            abort(400, "Invalid request data structure.")
 | 
						|
 | 
						|
    user_to_update = get_user_or_404(user_id)
 | 
						|
    for field_name in editable_fields:
 | 
						|
        requested_value = request_data.get(field_name)
 | 
						|
        if requested_value is None:
 | 
						|
            continue
 | 
						|
        new_value = generate_password_hash(requested_value) \
 | 
						|
            if field_name == 'password' else requested_value
 | 
						|
        setattr(user_to_update, field_name, new_value)
 | 
						|
    db.session.commit()
 | 
						|
    return jsonify(user_to_update.to_dict())
 | 
						|
 | 
						|
 | 
						|
@user_bp.route('/users/<int:user_id>', methods=['DELETE'])
 | 
						|
@jwt_required()
 | 
						|
def remove_user(user_id):
 | 
						|
    validate_access(user_id) # Only admin can remove other users accounts
 | 
						|
    user_to_delete = get_user_or_404(user_id)
 | 
						|
    db.session.delete(user_to_delete)
 | 
						|
    db.session.commit()
 | 
						|
    return jsonify({"msg": "User removed successfully."})
 | 
						|
 | 
						|
 | 
						|
@user_bp.route('/login', methods=['POST'])
 | 
						|
def user_login():
 | 
						|
    request_data = request.get_json()
 | 
						|
    username = request_data['username']
 | 
						|
    password = request_data['password']
 | 
						|
 | 
						|
    user_from_db=User.query.filter(User.username == username).first()
 | 
						|
    if user_from_db is not None:
 | 
						|
        password_hash = user_from_db.password
 | 
						|
    else:
 | 
						|
        abort(401, "User failed login")
 | 
						|
    
 | 
						|
    if password_hash and check_password_hash(password_hash, password):
 | 
						|
        access_token = create_access_token(identity=str(user_from_db.id))
 | 
						|
        response = jsonify({"msg": "User logged in successfully.", "user_id": user_from_db.id})
 | 
						|
        set_access_cookies(response, access_token)
 | 
						|
        return response
 | 
						|
    else:
 | 
						|
        abort(401, "User failed login")
 | 
						|
 | 
						|
 | 
						|
@user_bp.route('/logout', methods=['GET'])
 | 
						|
@jwt_required()
 | 
						|
def user_logout():
 | 
						|
    jti = get_jwt()["jti"]
 | 
						|
    revoked_token = RevokedToken(jti=jti)
 | 
						|
    db.session.add(revoked_token)
 | 
						|
    db.session.commit()
 | 
						|
    response = jsonify({"msg": "User logged out successfully."})
 | 
						|
    unset_jwt_cookies(response)
 | 
						|
    return response
 | 
						|
 | 
						|
@user_bp.route('/version', methods=['GET'])
 | 
						|
def version():
 | 
						|
    return jsonify({
 | 
						|
        "version": os.getenv("APP_VERSION", "unknown"),
 | 
						|
        "build_time": os.getenv("BUILD_DATE", "unknown")
 | 
						|
    }) |