Saltar al contenido

WebSockets

Al definir WebSockets, normalmente declaras un parámetro de tipo WebSocket y con él puedes leer datos del cliente y enviarle datos.

Lee más sobre esto en la documentación de FastAPI para WebSockets

Es proporcionado directamente por Starlette, pero puedes importarlo desde fastapi:

from fastapi import WebSocket

Consejo

Cuando quieres definir dependencias que deberían ser compatibles tanto con HTTP como con WebSockets, puedes definir un parámetro que tome un HTTPConnection en lugar de un Request o un WebSocket.

fastapi.WebSocket

WebSocket(scope, receive, send)

Bases: HTTPConnection[StateT]

Código fuente en starlette/websockets.py
def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:
    super().__init__(scope)
    assert scope["type"] == "websocket"
    self._receive = receive
    self._send = send
    self.client_state = WebSocketState.CONNECTING
    self.application_state = WebSocketState.CONNECTING

scope instance-attribute

scope = scope

app property

app

url property

url

base_url property

base_url

headers property

headers

query_params property

query_params

path_params property

path_params

cookies property

cookies

client property

client

state property

state

client_state instance-attribute

client_state = CONNECTING

application_state instance-attribute

application_state = CONNECTING

url_for

url_for(name, /, **path_params)
Código fuente en starlette/requests.py
def url_for(self, name: str, /, **path_params: Any) -> URL:
    url_path_provider: Router | Starlette | None = self.scope.get("router") or self.scope.get("app")
    if url_path_provider is None:
        raise RuntimeError("The `url_for` method can only be used inside a Starlette application or with a router.")
    url_path = url_path_provider.url_path_for(name, **path_params)
    return url_path.make_absolute_url(base_url=self.base_url)

receive async

receive()

Recibir mensajes websocket ASGI, asegurando transiciones de estado válidas.

Código fuente en starlette/websockets.py
async def receive(self) -> Message:
    """
    Receive ASGI websocket messages, ensuring valid state transitions.
    """
    if self.client_state == WebSocketState.CONNECTING:
        message = await self._receive()
        message_type = message["type"]
        if message_type != "websocket.connect":
            raise RuntimeError(f'Expected ASGI message "websocket.connect", but got {message_type!r}')
        self.client_state = WebSocketState.CONNECTED
        return message
    elif self.client_state == WebSocketState.CONNECTED:
        message = await self._receive()
        message_type = message["type"]
        if message_type not in {"websocket.receive", "websocket.disconnect"}:
            raise RuntimeError(
                f'Expected ASGI message "websocket.receive" or "websocket.disconnect", but got {message_type!r}'
            )
        if message_type == "websocket.disconnect":
            self.client_state = WebSocketState.DISCONNECTED
        return message
    else:
        raise RuntimeError('Cannot call "receive" once a disconnect message has been received.')

send async

send(message)

Enviar mensajes websocket ASGI, asegurando transiciones de estado válidas.

Código fuente en starlette/websockets.py
async def send(self, message: Message) -> None:
    """
    Send ASGI websocket messages, ensuring valid state transitions.
    """
    if self.application_state == WebSocketState.CONNECTING:
        message_type = message["type"]
        if message_type not in {"websocket.accept", "websocket.close", "websocket.http.response.start"}:
            raise RuntimeError(
                'Expected ASGI message "websocket.accept", "websocket.close" or "websocket.http.response.start", '
                f"but got {message_type!r}"
            )
        if message_type == "websocket.close":
            self.application_state = WebSocketState.DISCONNECTED
        elif message_type == "websocket.http.response.start":
            self.application_state = WebSocketState.RESPONSE
        else:
            self.application_state = WebSocketState.CONNECTED
        await self._send(message)
    elif self.application_state == WebSocketState.CONNECTED:
        message_type = message["type"]
        if message_type not in {"websocket.send", "websocket.close"}:
            raise RuntimeError(
                f'Expected ASGI message "websocket.send" or "websocket.close", but got {message_type!r}'
            )
        if message_type == "websocket.close":
            self.application_state = WebSocketState.DISCONNECTED
        try:
            await self._send(message)
        except OSError:
            self.application_state = WebSocketState.DISCONNECTED
            raise WebSocketDisconnect(code=1006)
    elif self.application_state == WebSocketState.RESPONSE:
        message_type = message["type"]
        if message_type != "websocket.http.response.body":
            raise RuntimeError(f'Expected ASGI message "websocket.http.response.body", but got {message_type!r}')
        if not message.get("more_body", False):
            self.application_state = WebSocketState.DISCONNECTED
        await self._send(message)
    else:
        raise RuntimeError('Cannot call "send" once a close message has been sent.')

accept async

accept(subprotocol=None, headers=None)
Código fuente en starlette/websockets.py
async def accept(
    self,
    subprotocol: str | None = None,
    headers: Iterable[tuple[bytes, bytes]] | None = None,
) -> None:
    headers = headers or []

    if self.client_state == WebSocketState.CONNECTING:  # pragma: no branch
        # If we haven't yet seen the 'connect' message, then wait for it first.
        await self.receive()
    await self.send({"type": "websocket.accept", "subprotocol": subprotocol, "headers": headers})

receive_text async

receive_text()
Código fuente en starlette/websockets.py
async def receive_text(self) -> str:
    if self.application_state != WebSocketState.CONNECTED:
        raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
    message = await self.receive()
    self._raise_on_disconnect(message)
    return cast(str, message["text"])

receive_bytes async

receive_bytes()
Código fuente en starlette/websockets.py
async def receive_bytes(self) -> bytes:
    if self.application_state != WebSocketState.CONNECTED:
        raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
    message = await self.receive()
    self._raise_on_disconnect(message)
    return cast(bytes, message["bytes"])

receive_json async

receive_json(mode='text')
Código fuente en starlette/websockets.py
async def receive_json(self, mode: str = "text") -> Any:
    if mode not in {"text", "binary"}:
        raise RuntimeError('The "mode" argument should be "text" or "binary".')
    if self.application_state != WebSocketState.CONNECTED:
        raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
    message = await self.receive()
    self._raise_on_disconnect(message)

    if mode == "text":
        text = message["text"]
    else:
        text = message["bytes"].decode("utf-8")
    return json.loads(text)

iter_text async

iter_text()
Código fuente en starlette/websockets.py
async def iter_text(self) -> AsyncIterator[str]:
    try:
        while True:
            yield await self.receive_text()
    except WebSocketDisconnect:
        pass

iter_bytes async

iter_bytes()
Código fuente en starlette/websockets.py
async def iter_bytes(self) -> AsyncIterator[bytes]:
    try:
        while True:
            yield await self.receive_bytes()
    except WebSocketDisconnect:
        pass

iter_json async

iter_json()
Código fuente en starlette/websockets.py
async def iter_json(self) -> AsyncIterator[Any]:
    try:
        while True:
            yield await self.receive_json()
    except WebSocketDisconnect:
        pass

send_text async

send_text(data)
Código fuente en starlette/websockets.py
async def send_text(self, data: str) -> None:
    await self.send({"type": "websocket.send", "text": data})

send_bytes async

send_bytes(data)
Código fuente en starlette/websockets.py
async def send_bytes(self, data: bytes) -> None:
    await self.send({"type": "websocket.send", "bytes": data})

send_json async

send_json(data, mode='text')
Código fuente en starlette/websockets.py
async def send_json(self, data: Any, mode: str = "text") -> None:
    if mode not in {"text", "binary"}:
        raise RuntimeError('The "mode" argument should be "text" or "binary".')
    text = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
    if mode == "text":
        await self.send({"type": "websocket.send", "text": text})
    else:
        await self.send({"type": "websocket.send", "bytes": text.encode("utf-8")})

close async

close(code=1000, reason=None)
Código fuente en starlette/websockets.py
async def close(self, code: int = 1000, reason: str | None = None) -> None:
    await self.send({"type": "websocket.close", "code": code, "reason": reason or ""})

WebSockets - clases adicionales

Clases adicionales para manejar WebSockets.

Proporcionadas directamente por Starlette, pero puedes importarlas desde fastapi:

from fastapi.websockets import WebSocketDisconnect, WebSocketState

fastapi.websockets.WebSocketDisconnect

WebSocketDisconnect(code=1000, reason=None)

Hereda de: Exception

Código fuente en starlette/websockets.py
def __init__(self, code: int = 1000, reason: str | None = None) -> None:
    self.code = code
    self.reason = reason or ""

code instance-attribute

code = code

reason instance-attribute

reason = reason or ''

Cuando un cliente se desconecta, se lanza una excepción WebSocketDisconnect, puedes capturarla.

Puedes importarlo directamente desde fastapi:

from fastapi import WebSocketDisconnect

Lee más sobre esto en la documentación de FastAPI para WebSockets

fastapi.websockets.WebSocketState

Bases: Enum

CONNECTING class-attribute instance-attribute

CONNECTING = 0

CONNECTED class-attribute instance-attribute

CONNECTED = 1

DISCONNECTED class-attribute instance-attribute

DISCONNECTED = 2

RESPONSE class-attribute instance-attribute

RESPONSE = 3

WebSocketState es una enumeración de los posibles estados de una conexión WebSocket.