Saltar al contenido

Stream JSON Lines

Podrías tener una secuencia de datos que te gustaría enviar en un "stream", podrías hacerlo con JSON Lines.

Nota

Añadido en FastAPI 0.134.0.

¿Qué es un Stream?

Hacer "stream" de datos significa que tu app comenzará a enviar items de datos al cliente sin esperar a que la secuencia completa de items esté lista.

Así, enviará el primer item, el cliente lo recibirá y comenzará a procesarlo, y tú podrías seguir produciendo el siguiente item.

Incluso podría ser un stream infinito, donde sigues enviando datos.

JSON Lines

En estos casos, es común enviar "JSON Lines", que es un formato donde envías un objeto JSON por línea.

Una respuesta tendría un content type de application/jsonl (en lugar de application/json) y el body sería algo como:

{"name": "Plumbus", "description": "A multi-purpose household device."}
{"name": "Portal Gun", "description": "A portal opening device."}
{"name": "Meeseeks Box", "description": "A box that summons a Meeseeks."}

Es muy similar a un array de JSON (equivalente a una lista de Python), pero en lugar de estar envuelto en [] y tener , entre los items, tiene un objeto JSON por línea, están separados por un carácter de nueva línea.

Nota

El punto importante es que tu app podrá producir cada línea por turno, mientras el cliente consume las líneas anteriores.

Detalles Técnicos

Como cada objeto JSON estará separado por una nueva línea, no pueden contener caracteres literales de nueva línea en su contenido, pero pueden contener nuevas líneas escapadas (\n), lo cual es parte del estándar de JSON.

Pero normalmente no tendrás que preocuparte por esto, se hace automáticamente, continúa leyendo. 🤓

Casos de Uso

Podrías usar esto para hacer stream de datos desde un servicio de AI LLM, desde logs o telemetría, o desde otros tipos de datos que pueden ser estructurados en items de JSON.

Consejo

Si quieres transmitir datos binarios, por ejemplo video o audio, revisa la guía avanzada: Stream Data.

Stream JSON Lines con FastAPI

Para hacer stream de JSON Lines con FastAPI puedes, en lugar de usar return en tu path operation function, usar yield para producir cada item por turno.

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Vista previa del archivo completo
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

Si cada item JSON que quieres devolver es de tipo Item (un modelo de Pydantic) y es una función async, puedes declarar el return type como AsyncIterable[Item]:

from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Vista previa del archivo completo
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

Si declaras el return type, FastAPI lo usará para validar los datos, documentarlos en OpenAPI, filtrarlos y serializarlos usando Pydantic.

Consejo

Como Pydantic lo serializará del lado de Rust, obtendrás un rendimiento mucho mayor que si no declaras un tipo de retorno.

Funciones path operation no-async

También puedes usar funciones regulares con def (sin async), y usar yield de la misma manera.

FastAPI se asegurará de que se ejecute correctamente para que no bloquee el event loop.

Como en este caso la función no es async, el tipo de retorno correcto sería Iterable[Item]:

# Code above omitted 👆

@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item

# Code below omitted 👇
👀 Vista previa del archivo completo
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

Sin tipo de retorno

También puedes omitir el return type. FastAPI entonces usará el jsonable_encoder para convertir los datos a algo que pueda ser serializado a JSON y luego enviarlo como JSON Lines.

# Code above omitted 👆

@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item

# Code below omitted 👇
👀 Vista previa del archivo completo
from collections.abc import AsyncIterable, Iterable

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: str | None


items = [
    Item(name="Plumbus", description="A multi-purpose household device."),
    Item(name="Portal Gun", description="A portal opening device."),
    Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
]


@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-async")
def stream_items_no_async() -> Iterable[Item]:
    for item in items:
        yield item


@app.get("/items/stream-no-annotation")
async def stream_items_no_annotation():
    for item in items:
        yield item


@app.get("/items/stream-no-async-no-annotation")
def stream_items_no_async_no_annotation():
    for item in items:
        yield item

Server-Sent Events (SSE)

FastAPI también tiene soporte de primera clase para Server-Sent Events (SSE), que son bastante similares pero con un par de detalles extra. Puedes aprender sobre ellos en el siguiente capítulo: Server-Sent Events (SSE). 🤓