Revoking Tokens

This will allow you to revoke a specific tokens so that it can no longer access your endpoints. You will have to choose what tokens you want to check against the denylist. Denylist works by providing a callback function to this extension, using the token_in_denylist_loader(). This method will be called whenever the specified tokens (access and/or refresh) is used to access a protected endpoint. If the callback function says that the tokens is revoked, we will not allow the requester to continue, otherwise we will allow the requester to access the endpoint as normal.

Here is a basic example use tokens revoking:

from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel

from async_fastapi_jwt_auth import AuthJWT
from async_fastapi_jwt_auth.exceptions import AuthJWTException
from async_fastapi_jwt_auth.auth_jwt import AuthJWTBearer

app = FastAPI()
auth_dep = AuthJWTBearer()


class User(BaseModel):
    username: str
    password: str


# set denylist enabled to True
# you can set to check access or refresh token or even both of them
class Settings(BaseModel):
    authjwt_secret_key: str = "secret"
    authjwt_denylist_enabled: bool = True
    authjwt_denylist_token_checks: set = {"access", "refresh"}


@AuthJWT.load_config
def get_config():
    return Settings()


@app.exception_handler(AuthJWTException)
def authjwt_exception_handler(request: Request, exc: AuthJWTException):
    return JSONResponse(status_code=exc.status_code, content={"detail": exc.message})


# A storage engine to save revoked tokens. in production,
# you can use Redis for storage system
denylist = set()


# For this example, we are just checking if the tokens jti
# (unique identifier) is in the denylist set. This could
# be made more complex, for example storing the token in Redis
# with the value true if revoked and false if not revoked
@AuthJWT.token_in_denylist_loader
async def check_if_token_in_denylist(decrypted_token):
    jti = decrypted_token["jti"]
    return jti in denylist


@app.post("/login")
async def login(user: User, authorize: AuthJWT = Depends(auth_dep)):
    if user.username != "test" or user.password != "test":
        raise HTTPException(status_code=401, detail="Bad username or password")

    access_token = await authorize.create_access_token(subject=user.username)
    refresh_token = await authorize.create_refresh_token(subject=user.username)
    return {"access_token": access_token, "refresh_token": refresh_token}


# Standard refresh endpoint. Token in denylist will not
# be able to access this endpoint
@app.post("/refresh")
async def refresh(authorize: AuthJWT = Depends(auth_dep)):
    await authorize.jwt_refresh_token_required()

    current_user = await authorize.get_jwt_subject()
    new_access_token = await authorize.create_access_token(subject=current_user)
    return {"access_token": new_access_token}


# Endpoint for revoking the current users access token
@app.delete("/access-revoke")
async def access_revoke(authorize: AuthJWT = Depends(auth_dep)):
    await authorize.jwt_required()

    jti = (await authorize.get_raw_jwt())["jti"]
    denylist.add(jti)
    return {"detail": "Access token has been revoke"}


# Endpoint for revoking the current users refresh token
@app.delete("/refresh-revoke")
async def refresh_revoke(authorize: AuthJWT = Depends(auth_dep)):
    await authorize.jwt_refresh_token_required()

    jti = (await authorize.get_raw_jwt())["jti"]
    denylist.add(jti)
    return {"detail": "Refresh token has been revoke"}


# A token in denylist will not be able to access this anymore
@app.get("/protected")
async def protected(authorize: AuthJWT = Depends(auth_dep)):
    await authorize.jwt_required()

    current_user = await authorize.get_jwt_subject()
    return {"user": current_user}

In production, you will likely want to use either a database or in-memory store (such as Redis) to store your tokens. Memory stores are great if you are wanting to revoke a tokens when the users log out and you can define timeout to your tokens in Redis, after the timeout has expired, the tokens will automatically be deleted.

Note

Before that make sure redis already installed on your local machine, you can use docker using this command docker run -d -p 6379:6379 redis

Here example use Redis for revoking a tokens:

#   This example has not been tested with this async fork, if it worked for you - write to me :)
from datetime import timedelta

from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from redis import Redis

from async_fastapi_jwt_auth import AuthJWT
from async_fastapi_jwt_auth.exceptions import AuthJWTException
from async_fastapi_jwt_auth.auth_jwt import AuthJWTBearer

app = FastAPI()
auth_dep = AuthJWTBearer()


class User(BaseModel):
    username: str
    password: str


class Settings(BaseModel):
    authjwt_secret_key: str = "secret"
    authjwt_denylist_enabled: bool = True
    authjwt_denylist_token_checks: set = {"access", "refresh"}
    access_expires: int = timedelta(minutes=15)
    refresh_expires: int = timedelta(days=30)


settings = Settings()


@AuthJWT.load_config
def get_config():
    return settings


@app.exception_handler(AuthJWTException)
def authjwt_exception_handler(request: Request, exc: AuthJWTException):
    return JSONResponse(status_code=exc.status_code, content={"detail": exc.message})


# Setup our redis connection for storing the denylist tokens
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=True)


# Create our function to check if a token has been revoked. In this simple
# case, we will just store the tokens jti (unique identifier) in redis.
# This function will return the revoked status of a token. If a token exists
# in redis and value is true, token has been revoked
@AuthJWT.token_in_denylist_loader
async def check_if_token_in_denylist(decrypted_token):
    jti = decrypted_token["jti"]
    entry = redis_conn.get(jti)
    return entry and entry == "true"


@app.post("/login")
async def login(user: User, authorize: AuthJWT = Depends(auth_dep)):
    if user.username != "test" or user.password != "test":
        raise HTTPException(status_code=401, detail="Bad username or password")

    access_token = await authorize.create_access_token(subject=user.username)
    refresh_token = await authorize.create_refresh_token(subject=user.username)
    return {"access_token": access_token, "refresh_token": refresh_token}


# Standard refresh endpoint. Token in denylist will not
# be able to access this endpoint
@app.post("/refresh")
async def refresh(authorize: AuthJWT = Depends(auth_dep)):
    await authorize.jwt_refresh_token_required()

    current_user = await authorize.get_jwt_subject()
    new_access_token = await authorize.create_access_token(subject=current_user)
    return {"access_token": new_access_token}


# Endpoint for revoking the current users access token
@app.delete("/access-revoke")
async def access_revoke(authorize: AuthJWT = Depends(auth_dep)):
    await authorize.jwt_required()

    # Store the tokens in redis with the value true for revoked.
    # We can also set an expires time on these tokens in redis,
    # so they will get automatically removed after they expired.
    jti = (await authorize.get_raw_jwt())["jti"]
    redis_conn.setex(jti, settings.access_expires, "true")
    return {"detail": "Access token has been revoke"}


# Endpoint for revoking the current users refresh token
@app.delete("/refresh-revoke")
async def refresh_revoke(authorize: AuthJWT = Depends(auth_dep)):
    await authorize.jwt_refresh_token_required()

    jti = (await authorize.get_raw_jwt())["jti"]
    redis_conn.setex(jti, settings.refresh_expires, "true")
    return {"detail": "Refresh token has been revoke"}


# A token in denylist will not be able to access this anymore
@app.get("/protected")
async def protected(authorize: AuthJWT = Depends(auth_dep)):
    await authorize.jwt_required()

    current_user = await authorize.get_jwt_subject()
    return {"user": current_user}