Limit access for users to get users details

This commit is contained in:
Marcin-Ramotowski 2025-03-15 16:18:51 +00:00
parent d7c089aa83
commit 157dff8768
2 changed files with 13 additions and 10 deletions

View File

@ -11,7 +11,7 @@ class User(db.Model):
password = db.Column(db.String(60), nullable=False) password = db.Column(db.String(60), nullable=False)
def to_dict(self): def to_dict(self):
return {"id": self.id, "username": self.username} return {"id": self.id, "username": self.username, "email": self.email, "role": self.role}
class Task(db.Model): class Task(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True) id = db.Column(db.Integer, primary_key=True, autoincrement=True)

View File

@ -5,10 +5,10 @@ from werkzeug.security import check_password_hash, generate_password_hash
user_bp = Blueprint('user_bp', __name__) user_bp = Blueprint('user_bp', __name__)
def admin_required(username): def admin_required(user_id):
user = User.query.filter_by(username=username).first() user = User.query.get(user_id)
if user is None or user.role != "Administrator": if user is None or user.role != "Administrator":
abort(403, {'error': f'Access denied.'}) return jsonify({'error': f'Access denied.'}), 403
@user_bp.route('/users', methods=['GET']) @user_bp.route('/users', methods=['GET'])
@jwt_required() @jwt_required()
@ -20,6 +20,10 @@ def get_all_users():
@user_bp.route('/users/<int:user_id>', methods=['GET']) @user_bp.route('/users/<int:user_id>', methods=['GET'])
@jwt_required() @jwt_required()
def get_user(user_id): def get_user(user_id):
logged_user_id = int(get_jwt_identity())
logged_user_role = User.query.get(logged_user_id).role
if logged_user_role != "Administrator" and logged_user_id != user_id:
return jsonify({'error': f'Access denied.'}), 403
user = User.query.get_or_404(user_id) user = User.query.get_or_404(user_id)
return jsonify(user.to_dict()) return jsonify(user.to_dict())
@ -65,20 +69,19 @@ def user_login():
if user_from_db is not None: if user_from_db is not None:
password_from_db = user_from_db.password password_from_db = user_from_db.password
else: else:
return jsonify({"msg": f"User {username} failed login"}) return jsonify({"msg": "User failed login"})
if password_from_db and check_password_hash(password_hash, password_from_db): if password_from_db and check_password_hash(password_hash, password_from_db):
access_token = create_access_token(identity=username) access_token = create_access_token(identity=str(user_from_db.id))
response = jsonify({"msg": f"User {username} logged in successfully."}) response = jsonify({"msg": "User logged in successfully."})
set_access_cookies(response, access_token) set_access_cookies(response, access_token)
return response return response
else: else:
return jsonify({"msg": f"User {username} failed login."}) return jsonify({"msg": "User failed login."})
@user_bp.route('/logout', methods=['GET']) @user_bp.route('/logout', methods=['GET'])
@jwt_required() @jwt_required()
def user_logout(): def user_logout():
current_user = get_jwt_identity() response = jsonify({"msg": "User logged out successfully."})
response = jsonify({"msg": f"User {current_user} logged out successfully."})
unset_jwt_cookies(response) unset_jwt_cookies(response)
return response return response