Refactored code to limit access to other user accounts
This commit is contained in:
parent
b0ad83b9a0
commit
0171c62650
@ -10,20 +10,24 @@ def admin_required(user_id):
|
|||||||
if user is None or user.role != "Administrator":
|
if user is None or user.role != "Administrator":
|
||||||
return jsonify({'error': f'Access denied.'}), 403
|
return jsonify({'error': f'Access denied.'}), 403
|
||||||
|
|
||||||
|
def validate_access(owner_id):
|
||||||
|
# Check if user try to access or edit resource that does not belong to them
|
||||||
|
logged_user_id = int(get_jwt_identity())
|
||||||
|
logged_user_role = User.query.get(logged_user_id).role
|
||||||
|
if logged_user_role != "Administrator" and logged_user_id != owner_id:
|
||||||
|
return jsonify({'error': f'Access denied.'}), 403
|
||||||
|
|
||||||
@user_bp.route('/users', methods=['GET'])
|
@user_bp.route('/users', methods=['GET'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
def get_all_users():
|
def get_all_users():
|
||||||
admin_required(get_jwt_identity())
|
admin_required(get_jwt_identity()) # only admin can get all users details
|
||||||
users = User.query.all()
|
users = User.query.all()
|
||||||
return jsonify([user.to_dict() for user in users])
|
return jsonify([user.to_dict() for user in users])
|
||||||
|
|
||||||
@user_bp.route('/users/<int:user_id>', methods=['GET'])
|
@user_bp.route('/users/<int:user_id>', methods=['GET'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
def get_user(user_id):
|
def get_user(user_id):
|
||||||
logged_user_id = int(get_jwt_identity())
|
validate_access(user_id) # check if user tries to read other user account details
|
||||||
logged_user_role = User.query.get(logged_user_id).role
|
|
||||||
if logged_user_role != "Administrator" and logged_user_id != user_id:
|
|
||||||
return jsonify({'error': f'Access denied.'}), 403
|
|
||||||
user = User.query.get_or_404(user_id)
|
user = User.query.get_or_404(user_id)
|
||||||
return jsonify(user.to_dict())
|
return jsonify(user.to_dict())
|
||||||
|
|
||||||
@ -33,10 +37,7 @@ def create_user():
|
|||||||
new_user_role = data['role']
|
new_user_role = data['role']
|
||||||
# Only administrator can create admin accounts
|
# Only administrator can create admin accounts
|
||||||
if new_user_role == "Administrator":
|
if new_user_role == "Administrator":
|
||||||
logged_user_id = int(get_jwt_identity())
|
admin_required(get_jwt_identity())
|
||||||
logged_user_role = User.query.get(logged_user_id).role
|
|
||||||
if logged_user_role != "Administrator":
|
|
||||||
return jsonify({'error': f'You can not create admin users.'}), 403
|
|
||||||
user = User(username=data['username'], email=data['email'], password=data['password'], role=new_user_role)
|
user = User(username=data['username'], email=data['email'], password=data['password'], role=new_user_role)
|
||||||
db.session.add(user)
|
db.session.add(user)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
@ -49,10 +50,7 @@ def edit_user(user_id):
|
|||||||
user_to_update = User.query.get_or_404(user_id)
|
user_to_update = User.query.get_or_404(user_id)
|
||||||
request_username = request_data.get('username')
|
request_username = request_data.get('username')
|
||||||
request_email = request_data.get('email')
|
request_email = request_data.get('email')
|
||||||
logged_user_id = int(get_jwt_identity())
|
validate_access(user_id) # check if user tries to edit other user account
|
||||||
logged_user_role = User.query.get(logged_user_id).role
|
|
||||||
if logged_user_role != "Administrator" and logged_user_id != user_id:
|
|
||||||
return jsonify({'error': f'You can not edit other user accounts.'}), 403
|
|
||||||
if request_username and request_email:
|
if request_username and request_email:
|
||||||
user_to_update.username = request_username
|
user_to_update.username = request_username
|
||||||
user_to_update.email = request_email
|
user_to_update.email = request_email
|
||||||
@ -64,11 +62,7 @@ def edit_user(user_id):
|
|||||||
@user_bp.route('/users/<int:user_id>', methods=['DELETE'])
|
@user_bp.route('/users/<int:user_id>', methods=['DELETE'])
|
||||||
@jwt_required()
|
@jwt_required()
|
||||||
def remove_user(user_id):
|
def remove_user(user_id):
|
||||||
logged_user_id = int(get_jwt_identity())
|
validate_access(user_id) # Only admin can remove other users accounts
|
||||||
logged_user_role = User.query.get(logged_user_id).role
|
|
||||||
# Only admin can remove other users accounts
|
|
||||||
if logged_user_role != "Administrator" and logged_user_id != user_id:
|
|
||||||
return jsonify({'error': f'You can not remove other user accounts.'}), 403
|
|
||||||
user_to_delete = User.query.get_or_404(user_id)
|
user_to_delete = User.query.get_or_404(user_id)
|
||||||
db.session.delete(user_to_delete)
|
db.session.delete(user_to_delete)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user