Refresh Tokens
These are long-lived tokens which can be used to create a new access tokens once an old access token has expired. Refresh tokens cannot access an endpoint that is protected with jwt_required(), jwt_optional(), and fresh_jwt_required() and access tokens cannot access an endpoint that is protected with jwt_refresh_token_required().
Utilizing refresh tokens we can help reduce the damage that can be done if an access tokens is stolen. However, if an attacker gets a refresh tokens they can keep generating new access tokens and accessing protected endpoints as though he was that user. We can help combat this by using the fresh tokens pattern, discussed in the next section.
Note
For accessing /refresh endpoint remember to change access_token with refresh_token in the header
Authorization: Bearer <refresh_token>
Here is an example of using access and refresh tokens:
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from async_fastapi_jwt_auth import AuthJWT
from async_fastapi_jwt_auth.exceptions import AuthJWTException
from async_fastapi_jwt_auth.auth_jwt import AuthJWTBearer
app = FastAPI()
auth_dep = AuthJWTBearer()
class User(BaseModel):
username: str
password: str
class Settings(BaseModel):
authjwt_secret_key: str = "secret"
@AuthJWT.load_config
def get_config():
return Settings()
@app.exception_handler(AuthJWTException)
def authjwt_exception_handler(request: Request, exc: AuthJWTException):
return JSONResponse(status_code=exc.status_code, content={"detail": exc.message})
@app.post("/login")
async def login(user: User, authorize: AuthJWT = Depends(auth_dep)):
if user.username != "test" or user.password != "test":
raise HTTPException(status_code=401, detail="Bad username or password")
# Use create_access_token() and create_refresh_token() to create our
# access and refresh tokens
access_token = await authorize.create_access_token(subject=user.username)
refresh_token = await authorize.create_refresh_token(subject=user.username)
return {"access_token": access_token, "refresh_token": refresh_token}
@app.post("/refresh")
async def refresh(authorize: AuthJWT = Depends(auth_dep)):
"""
The jwt_refresh_token_required() function insures a valid refresh
token is present in the request before running any code below that function.
we can use the get_jwt_subject() function to get the subject of the refresh
token, and use the create_access_token() function again to make a new access token
"""
await authorize.jwt_refresh_token_required()
current_user = await authorize.get_jwt_subject()
new_access_token = await authorize.create_access_token(subject=current_user)
return {"access_token": new_access_token}
@app.get("/protected")
async def protected(authorize: AuthJWT = Depends(auth_dep)):
await authorize.jwt_required()
current_user = await authorize.get_jwt_subject()
return {"user": current_user}