Puedes usar los scopes de OAuth2 directamente con FastAPI, están integrados para funcionar sin problemas.
Esto te permitiría tener un sistema de permisos más granular, siguiendo el estándar OAuth2, integrado en tu aplicación OpenAPI (y la documentación de la API).
OAuth2 con scopes es el mecanismo utilizado por muchos grandes proveedores de autenticación, como Facebook, Google, GitHub, Microsoft, X (Twitter), etc. Lo usan para proporcionar permisos específicos a usuarios y aplicaciones.
Cada vez que "inicias sesión con" Facebook, Google, GitHub, Microsoft, X (Twitter), esa aplicación está usando OAuth2 con scopes.
En esta sección verás cómo manejar la autenticación y autorización con el mismo OAuth2 con scopes en tu aplicación FastAPI.
Aviso
Esta es una sección más o menos avanzada. Si estás empezando, puedes omitirla.
No necesitas necesariamente los scopes de OAuth2, y puedes manejar la autenticación y autorización como quieras.
Pero OAuth2 con scopes se puede integrar fácilmente en tu API (con OpenAPI) y en la documentación de tu API.
Sin embargo, tú mismo aplicas esos scopes, o cualquier otro requisito de seguridad/autorización, como necesites, en tu código.
En muchos casos, OAuth2 con scopes puede ser excesivo.
Pero si sabes que lo necesitas, o tienes curiosidad, sigue leyendo.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Otras versiones y variantes
Consejo
Preferible usar la versión con Annotated si es posible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
El primer cambio es que ahora estamos declarando el esquema de seguridad OAuth2 con dos scopes disponibles, me y items.
El parámetro scopes recibe un dict con cada scope como clave y la descripción como valor:
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Otras versiones y variantes
Consejo
Preferible usar la versión con Annotated si es posible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Como ahora estamos declarando esos scopes, aparecerán en la documentación de la API cuando inicies sesión/autorices.
Y podrás seleccionar a qué scopes quieres dar acceso: me y items.
Este es el mismo mecanismo que se usa cuando das permisos al iniciar sesión con Facebook, Google, GitHub, etc:
Ahora, modifica la path operation del token para devolver los scopes solicitados.
Seguimos usando el mismo OAuth2PasswordRequestForm. Incluye una propiedad scopes con una list de str, con cada scope que recibió en la petición.
Y devolvemos los scopes como parte del token JWT.
Peligro
Por simplicidad, aquí simplemente estamos añadiendo los scopes recibidos directamente al token.
Pero en tu aplicación, por seguridad, debes asegurarte de que solo añades los scopes que el usuario realmente puede tener, o los que tienes predefinidos.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Otras versiones y variantes
Consejo
Preferible usar la versión con Annotated si es posible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Declarar scopes en path operations y dependencias¶
Ahora declaramos que la path operation para /users/me/items/ requiere el scope items.
Para esto, importamos y usamos Security de fastapi.
Puedes usar Security para declarar dependencias (igual que Depends), pero Security también recibe un parámetro scopes con una lista de scopes (strings).
En este caso, pasamos una función de dependencia get_current_active_user a Security (de la misma manera que haríamos con Depends).
Pero también pasamos una list de scopes, en este caso con un solo scope: items (podría tener más).
Y la función de dependencia get_current_active_user también puede declarar sub-dependencias, no solo con Depends sino también con Security. Declarando su propia función de sub-dependencia (get_current_user), y más requisitos de scope.
En este caso, requiere el scope me (podría requerir más de un scope).
Nota
No necesitas necesariamente añadir diferentes scopes en diferentes lugares.
Lo estamos haciendo aquí para demostrar cómo FastAPI maneja los scopes declarados en diferentes niveles.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Otras versiones y variantes
Consejo
Preferible usar la versión con Annotated si es posible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Detalles Técnicos
Security es en realidad una subclase de Depends, y tiene un solo parámetro extra que veremos más adelante.
Pero al usar Security en lugar de Depends, FastAPI sabrá que puede declarar scopes de seguridad, usarlos internamente, y documentar la API con OpenAPI.
Pero cuando importas Query, Path, Depends, Security y otros de fastapi, estos son en realidad funciones que devuelven clases especiales.
Aquí es donde estamos usando el mismo esquema OAuth2 que creamos antes, declarándolo como una dependencia: oauth2_scheme.
Como esta función de dependencia no tiene ningún requisito de scope por sí misma, podemos usar Depends con oauth2_scheme, no tenemos que usar Security cuando no necesitamos especificar scopes de seguridad.
También declaramos un parámetro especial de tipo SecurityScopes, importado de fastapi.security.
Esta clase SecurityScopes es similar a Request (Request se usaba para obtener el objeto de petición directamente).
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Otras versiones y variantes
Consejo
Preferible usar la versión con Annotated si es posible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
El parámetro security_scopes será de tipo SecurityScopes.
Tendrá una propiedad scopes con una lista que contiene todos los scopes requeridos por sí misma y todas las dependencias que la usan como sub-dependencia. Es decir, todos los "dependientes"... esto puede sonar confuso, se explica de nuevo más abajo.
El objeto security_scopes (de la clase SecurityScopes) también proporciona un atributo scope_str con un único string, conteniendo esos scopes separados por espacios (lo vamos a usar).
Creamos un HTTPException que podemos reutilizar (raise) más adelante en varios puntos.
En esta excepción, incluimos los scopes requeridos (si los hay) como un string separado por espacios (usando scope_str). Ponemos ese string que contiene los scopes en el header WWW-Authenticate (esto es parte de la especificación).
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Otras versiones y variantes
Consejo
Preferible usar la versión con Annotated si es posible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Verificamos que obtenemos un username, y extraemos los scopes.
Y luego validamos esos datos con el modelo de Pydantic (capturando la excepción ValidationError), y si obtenemos un error leyendo el token JWT o validando los datos con Pydantic, lanzamos el HTTPException que creamos antes.
Para eso, actualizamos el modelo de Pydantic TokenData con una nueva propiedad scopes.
Al validar los datos con Pydantic podemos asegurarnos de que tenemos, por ejemplo, exactamente una list de str con los scopes y un str con el username.
En lugar de, por ejemplo, un dict, o algo más, ya que podría romper la aplicación en algún momento más adelante, convirtiéndolo en un riesgo de seguridad.
También verificamos que tengamos un usuario con ese username, y si no, lanzamos esa misma excepción que creamos antes.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Otras versiones y variantes
Consejo
Preferible usar la versión con Annotated si es posible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Ahora verificamos que todos los scopes requeridos, por esta dependencia y todos los dependientes (incluyendo path operations), estén incluidos en los scopes proporcionados en el token recibido, de lo contrario lanzamos un HTTPException.
Para esto, usamos security_scopes.scopes, que contiene una list con todos estos scopes como str.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Otras versiones y variantes
Consejo
Preferible usar la versión con Annotated si es posible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}
Revisemos de nuevo este árbol de dependencias y los scopes.
Como la dependencia get_current_active_user tiene a get_current_user como sub-dependencia, el scope "me" declarado en get_current_active_user será incluido en la lista de scopes requeridos en security_scopes.scopes que se pasa a get_current_user.
La path operation misma también declara un scope, "items", así que esto también estará en la lista de security_scopes.scopes que se pasa a get_current_user.
Así es como se ve la jerarquía de dependencias y scopes:
The path operationread_own_items has:
Scopes requeridos ["items"] con la dependencia:
get_current_active_user:
The dependency function get_current_active_user has:
Scopes requeridos ["me"] con la dependencia:
get_current_user:
The dependency function get_current_user has:
No requiere scopes por sí misma.
Una dependencia usando oauth2_scheme.
A security_scopes parameter of type SecurityScopes:
This security_scopes parameter has a property scopes with a list containing all these scopes declared above, so:
security_scopes.scopes contendrá ["me", "items"] para la path operationread_own_items.
security_scopes.scopes contendrá ["me"] para la path operationread_users_me, porque está declarado en la dependencia get_current_active_user.
security_scopes.scopes contendrá [] (nada) para la path operationread_system_status, porque no declaró ningún Security con scopes, y su dependencia, get_current_user, tampoco declara ningún scope.
Consejo
Lo importante y "mágico" aquí es que get_current_user tendrá una lista diferente de scopes para verificar para cada path operation.
Todo dependiendo de los scopes declarados en cada path operation y cada dependencia en el árbol de dependencias para esa path operation específica.
Puedes usar SecurityScopes en cualquier punto, y en múltiples lugares, no tiene que estar en la dependencia "raíz".
Siempre tendrá los scopes de seguridad declarados en las dependencias de Security actuales y todos los dependientes para esa específicapath operation y ese específico árbol de dependencias.
Como SecurityScopes tendrá todos los scopes declarados por los dependientes, puedes usarlo para verificar que un token tiene los scopes requeridos en una función de dependencia central, y luego declarar diferentes requisitos de scope en diferentes path operations.
Se verificarán independientemente para cada path operation.
Si abres la documentación de la API, puedes autenticarte y especificar qué scopes quieres autorizar.
Si no seleccionas ningún scope, estarás "autenticado", pero cuando intentes acceder a /users/me/ o /users/me/items/ obtendrás un error diciendo que no tienes suficientes permisos. Aún podrás acceder a /status/.
Y si seleccionas el scope me pero no el scope items, podrás acceder a /users/me/ pero no a /users/me/items/.
Eso es lo que le pasaría a una aplicación de terceros que intentara acceder a una de estas path operations con un token proporcionado por un usuario, dependiendo de cuántos permisos le haya dado el usuario a la aplicación.
En este ejemplo estamos usando el flujo "password" de OAuth2.
Esto es apropiado cuando estamos iniciando sesión en nuestra propia aplicación, probablemente con nuestro propio frontend.
Porque podemos confiar en que recibirá el username y password, ya que lo controlamos.
Pero si estás construyendo una aplicación OAuth2 a la que otros se conectarían (es decir, si estás construyendo un proveedor de autenticación equivalente a Facebook, Google, GitHub, etc.) deberías usar uno de los otros flujos.
El más común es el flujo implícito.
El más seguro es el flujo code, pero es más complejo de implementar ya que requiere más pasos. Como es más complejo, muchos proveedores terminan sugiriendo el flujo implícito.
Nota
Es común que cada proveedor de autenticación nombre sus flujos de una manera diferente, para hacerlo parte de su marca.
Pero al final, están implementando el mismo estándar OAuth2.
FastAPI incluye utilidades para todos estos flujos de autenticación OAuth2 en fastapi.security.oauth2.
De la misma manera que puedes definir una list de Depends en el parámetro dependencies del decorador (como se explica en Dependencias en decoradores de path operations), también podrías usar Security con scopes ahí.