Source code for api.resources.security

from flask_restful import Resource
from flask_jwt_extended import jwt_required, get_jwt_identity, create_access_token, create_refresh_token
from http import HTTPStatus
from webargs import validate
from webargs import fields
from webargs.flaskparser import use_args, use_kwargs
from api.authentication.database.models import User


# --------------------------------- #
# Security API Resources definition #
# --------------------------------- #

[docs]class BaseUserResource(Resource): """Base class for the user authentication and authorization resources""" # User model class definition model = User # Password validator definition password_validator = validate.Regexp( regex="^(?=.*[0-9]+.*)(?=.*[a-zA-Z]+.*).{7,50}$", error="Password must meet: a) contain at least 1 letter and at least 1 number, b) between 6-50 characters" ) # Sign-up form validator definition sign_up_form_validation = { "username": fields.Str(required=True, location="json"), "password": fields.Str(required=True, location="json", validate=password_validator) } # Sign-in form validator definition sign_in_form_validation = { "username": fields.Str(required=True, location="json"), "password": fields.Str(required=True, location="json") }
[docs]class SignupResource(BaseUserResource): """Class implementing sign-up API resource"""
[docs] @use_kwargs(BaseUserResource.sign_up_form_validation) def post(self, username, password): """ Signs-up a new user. :param username: name of the new user :type username: str :param password: password of the user (see: validation requirements) :type password: str :return: sign-up response data (username) :rtype: dict **Example** .. code-block:: python import requests # Prepare the sign-up data (example: new user) body = { "username": "user123", "password": "pAsSw0rd987!" } # Call the sign-up endpoint (example: locally deployed API) response = requests.post( "http://localhost:5000/signup", json=body) # Check if the user was created if response.ok: print(f"User {response.json().get('username')} created") """ # Check if the user already exists if self.model.get_by_username(username): return {"message": "Username already exist"}, HTTPStatus.BAD_REQUEST # Create the new user user = self.model(username=username, password=password) user.hash_password() user.save() # Return the response return {"username": str(user.username)}, HTTPStatus.CREATED
[docs]class LoginResource(BaseUserResource): """Class implementing log-in API resource"""
[docs] @use_kwargs(BaseUserResource.sign_in_form_validation) def post(self, username, password): """ Logs-in an existing user. :param username: name of the existing user :type username: str :param password: password of the user :type password: str :return: log-in response data (username, access token, refresh token) :rtype: dict **Example** .. code-block:: python import requests # Prepare the log-in data (example: existing user) body = { "username": "user123", "password": "pAsSw0rd987!" } # Call the log-in endpoint (example: locally deployed API) response = requests.post( "http://localhost:5000/login", json=body) # Get the access and refresh tokens from the response if response.ok: access_token = response.json().get("access_token") refresh_token = response.json().get("refresh_token") """ # Authenticate the user from the database user = User.authenticate(username=username, password=password) if not user: return {"message": "Invalid credentials"}, HTTPStatus.UNAUTHORIZED # Create the token: a) access token, b) refresh token data = { "username": user.username, "access_token": create_access_token(identity=str(user.id), fresh=True), "refresh_token": create_refresh_token(identity=str(user.id)) } # Return the response with the token return data, HTTPStatus.OK
[docs]class RefreshAccessTokenResource(Resource): """Class implementing refreshing of the access token API resource"""
[docs] @jwt_required(refresh=True) def post(self): """ Refreshes the access token. :return: refresh response data (refreshed access token) :rtype: dict **Example** .. code-block:: python import requests # Prepare the refresh headers headers = { "Authorization": f"Bearer <refresh_token>" } # Call the refresh endpoint (example: locally deployed API) response = requests.post( "http://localhost:5000/refresh", headers=headers) # Get the refreshed access token if response.ok: access_token = response.json().get("access_token") """ # Get the user from the token current_user = get_jwt_identity() if not current_user: return {"message": "Invalid user"}, HTTPStatus.UNAUTHORIZED # Refresh the token token = create_access_token(identity=current_user, fresh=False) # Return the response with the fresh token return {"access_token": token}, HTTPStatus.OK