Saltar al contenido

Eventos de Lifespan

Puedes definir lógica (código) que debería ser ejecutada antes de que la aplicación arranque. Esto significa que este código será ejecutado una vez, antes de que la aplicación empiece a recibir requests.

De la misma manera, puedes definir lógica (código) que debería ser ejecutada cuando la aplicación se está apagando. En este caso, este código será ejecutado una vez, después de haber manejado posiblemente muchas requests.

Como este código se ejecuta antes de que la aplicación empiece a tomar requests, y justo después de que termine de manejar requests, cubre todo el lifespan de la aplicación (la palabra "lifespan" será importante en un segundo 😉).

Esto puede ser muy útil para configurar recursos que necesitas usar para toda la app, y que son compartidos entre las requests, y/o que necesitas limpiar después. Por ejemplo, un pool de conexiones a base de datos, o cargar un modelo de machine learning compartido.

Caso de Uso

Empecemos con un ejemplo caso de uso y luego veamos cómo resolverlo con esto.

Imaginemos que tienes algunos modelos de machine learning que quieres usar para manejar requests. 🤖

Los mismos modelos se comparten entre las requests, así que, no es un modelo por request, o uno por usuario o algo similar.

Imaginemos que cargar el modelo puede tomar bastante tiempo, porque tiene que leer muchos datos del disco. Así que no quieres hacerlo para cada request.

Podrías cargarlo en el nivel superior del módulo/archivo, pero eso también significaría que cargaría el modelo incluso si solo estás ejecutando un test automatizado simple, entonces ese test sería lento porque tendría que esperar a que el modelo se cargue antes de poder ejecutar una parte independiente del código.

Eso es lo que vamos a resolver, vamos a cargar el modelo antes de que las requests sean manejadas, pero solo justo antes de que la aplicación empiece a recibir requests, no mientras se está cargando el código.

Lifespan

Puedes definir esta lógica de startup y shutdown usando el parámetro lifespan de la app de FastAPI, y un "context manager" (te muestro qué es en un segundo).

Empecemos con un ejemplo y luego veámoslo en detalle.

Creamos una función async lifespan() con yield así:

from contextlib import asynccontextmanager

from fastapi import FastAPI


def fake_answer_to_everything_ml_model(x: float):
    return x * 42


ml_models = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    yield
    # Clean up the ML models and release the resources
    ml_models.clear()


app = FastAPI(lifespan=lifespan)


@app.get("/predict")
async def predict(x: float):
    result = ml_models["answer_to_everything"](x)
    return {"result": result}

Aquí estamos simulando la operación costosa de startup de cargar el modelo poniendo la función del modelo (falsa) en el diccionario con los modelos de machine learning antes del yield. Este código será ejecutado antes de que la aplicación empiece a tomar requests, durante el startup.

Y luego, justo después del yield, descargamos el modelo. Este código será ejecutado después de que la aplicación termine de manejar requests, justo antes del shutdown. Esto podría, por ejemplo, liberar recursos como memoria o una GPU.

Consejo

El shutdown ocurriría cuando estás deteniendo la aplicación.

Quizás necesitas iniciar una nueva versión, o simplemente te cansaste de ejecutarla. 🤷

Función lifespan

Lo primero a notar, es que estamos definiendo una función async con yield. Esto es muy similar a las Dependencies con yield.

from contextlib import asynccontextmanager

from fastapi import FastAPI


def fake_answer_to_everything_ml_model(x: float):
    return x * 42


ml_models = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    yield
    # Clean up the ML models and release the resources
    ml_models.clear()


app = FastAPI(lifespan=lifespan)


@app.get("/predict")
async def predict(x: float):
    result = ml_models["answer_to_everything"](x)
    return {"result": result}

La primera parte de la función, antes del yield, será ejecutada antes de que la aplicación arranque.

Y la parte después del yield será ejecutada después de que la aplicación haya terminado.

Async Context Manager

Si revisas, la función está decorada con un @asynccontextmanager.

Eso convierte la función en algo llamado un "async context manager".

from contextlib import asynccontextmanager

from fastapi import FastAPI


def fake_answer_to_everything_ml_model(x: float):
    return x * 42


ml_models = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    yield
    # Clean up the ML models and release the resources
    ml_models.clear()


app = FastAPI(lifespan=lifespan)


@app.get("/predict")
async def predict(x: float):
    result = ml_models["answer_to_everything"](x)
    return {"result": result}

Un context manager en Python es algo que puedes usar en una sentencia with, por ejemplo, open() puede ser usado como un context manager:

with open("file.txt") as file:
    file.read()

En versiones recientes de Python, también hay un async context manager. Lo usarías con async with:

async with lifespan(app):
    await do_stuff()

Cuando creas un context manager o un async context manager como el de arriba, lo que hace es que, antes de entrar al bloque with, ejecutará el código antes del yield, y después de salir del bloque with, ejecutará el código después del yield.

En nuestro ejemplo de código de arriba, no lo usamos directamente, sino que lo pasamos a FastAPI para que lo use.

El parámetro lifespan de la app de FastAPI toma un async context manager, así que podemos pasarle nuestro nuevo lifespan async context manager.

from contextlib import asynccontextmanager

from fastapi import FastAPI


def fake_answer_to_everything_ml_model(x: float):
    return x * 42


ml_models = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    yield
    # Clean up the ML models and release the resources
    ml_models.clear()


app = FastAPI(lifespan=lifespan)


@app.get("/predict")
async def predict(x: float):
    result = ml_models["answer_to_everything"](x)
    return {"result": result}

Eventos Alternativos (deprecado)

Aviso

La manera recomendada de manejar el startup y el shutdown es usando el parámetro lifespan de la app de FastAPI como se describió arriba. Si proporcionas un parámetro lifespan, los event handlers de startup y shutdown ya no serán llamados. Es todo lifespan o todos eventos, no ambos.

Probablemente puedas saltarte esta parte.

Hay una manera alternativa de definir esta lógica para ser ejecutada durante el startup y durante el shutdown.

Puedes definir event handlers (funciones) que necesitan ser ejecutados antes de que la aplicación arranque, o cuando la aplicación se esté apagando.

Estas funciones pueden ser declaradas con async def o def normal.

Evento startup

Para añadir una función que debería ejecutarse antes de que la aplicación arranque, declárala con el evento "startup":

from fastapi import FastAPI

app = FastAPI()

items = {}


@app.on_event("startup")
async def startup_event():
    items["foo"] = {"name": "Fighters"}
    items["bar"] = {"name": "Tenders"}


@app.get("/items/{item_id}")
async def read_items(item_id: str):
    return items[item_id]

En este caso, la función event handler de startup inicializará la "base de datos" de items (solo un dict) con algunos valores.

Puedes añadir más de una función event handler.

Y tu aplicación no empezará a recibir requests hasta que todos los event handlers de startup hayan completado.

Evento shutdown

Para añadir una función que debería ejecutarse cuando la aplicación se está apagando, declárala con el evento "shutdown":

from fastapi import FastAPI

app = FastAPI()


@app.on_event("shutdown")
def shutdown_event():
    with open("log.txt", mode="a") as log:
        log.write("Application shutdown")


@app.get("/items/")
async def read_items():
    return [{"name": "Foo"}]

Aquí, la función event handler de shutdown escribirá una línea de texto "Application shutdown" a un archivo log.txt.

Nota

En la función open(), el mode="a" significa "append" (añadir), así que, la línea será añadida después de lo que haya en ese archivo, sin sobrescribir el contenido anterior.

Consejo

Notá que en este caso estamos usando una función open() estándar de Python que interactúa con un archivo.

Así que, involucra I/O (entrada/salida), que requiere "esperar" a que las cosas sean escritas al disco.

Pero open() no usa async y await.

Así que, declaramos la función event handler con def estándar en lugar de async def.

startup y shutdown juntos

Hay una alta probabilidad de que la lógica para tu startup y shutdown esté conectada, quizás quieres iniciar algo y luego terminarlo, adquirir un recurso y luego liberarlo, etc.

Hacer eso en funciones separadas que no comparten lógica o variables juntas es más difícil ya que necesitarías almacenar valores en variables globales o trucos similares.

Por eso, ahora se recomienda usar el lifespan como se explicó arriba.

Detalles Técnicos

Solo un detalle técnico para los nerds curiosos. 🤓

Por debajo, en la especificación técnica de ASGI, esto es parte del Lifespan Protocol, y define eventos llamados startup y shutdown.

Nota

Puedes leer más sobre los handlers de lifespan de Starlette en la documentación de Lifespan de Starlette.

Incluyendo cómo manejar el estado del lifespan que puede ser usado en otras áreas de tu código.

Sub Aplicaciones

🚨 Ten en cuenta que estos eventos de lifespan (startup y shutdown) solo serán ejecutados para la aplicación principal, no para las Sub Applications - Mounts.