[docs]classRawWebSocketClientAudioSink(BaseAudioSink):""" An audio sink that connects to a WebSocket server and sends raw audio chunks as binary messages. This component is optimized for performance. """
[docs]def__init__(self,uri,sample_rate=16000,channels=1,dtype='int16',blocksize=None):""" Initializes the RawWebSocketClientAudioSink. Args: uri (str): The WebSocket URI to connect to (e.g., "ws://localhost:8765"). sample_rate, channels, dtype, blocksize: Passed to BaseAudioSink. """super().__init__(sample_rate,channels,dtype,blocksize)self.uri=uriself._websocket=Noneself._loop=Noneself._send_thread=threading.Thread(target=self._send_loop,daemon=True)
[docs]asyncdefstart(self):"""Establishes the connection to the WebSocket server."""print(f"WebSocketClientAudioSink: Attempting to connect to {self.uri}...")try:self._websocket=awaitwebsockets.connect(self.uri)self._loop=asyncio.get_running_loop()self._send_thread.start()print("WebSocketClientAudioSink: Connection successful.")exceptConnectionRefusedError:print(f"WebSocketClientAudioSink: Connection refused by {self.uri}.")self.close()raiseexceptExceptionase:print(f"WebSocketClientAudioSink: An unexpected error occurred: {e}")self.close()raise
def_send_loop(self):"""The sending logic that runs in a separate thread."""whilenotself._is_closed.is_set():try:chunk=self._buffer.popleft()# Attempt to send the chunk. If the connection is closed, this will raise an exception.future=asyncio.run_coroutine_threadsafe(self._send_chunk(chunk),self._loop)future.result()# Wait for the send to completeexceptIndexError:time.sleep(0.005)# Wait for more data to arriveexceptwebsockets.exceptions.ConnectionClosed:print("WebSocketClientAudioSink: Connection closed. Stopping thread.")self._is_closed.set()breakexceptExceptionase:# Catch any other exception, log it, and stop the thread gracefully.print(f"WebSocketClientAudioSink: Error in send loop: {e}. Stopping thread.")self._is_closed.set()breakasyncdef_send_chunk(self,chunk:np.ndarray):"""Coroutine to send a chunk over the WebSocket."""awaitself._websocket.send(chunk.tobytes())
[docs]defclose(self):"""Closes the WebSocket connection."""ifnotself._is_closed.is_set():super().close()print("WebSocketClientAudioSink: Closing connection...")ifself._websocket:ifself._loopandself._loop.is_running():future=asyncio.run_coroutine_threadsafe(self._websocket.close(),self._loop)try:future.result(timeout=2.0)exceptExceptionase:print(f"WebSocketClientAudioSink: Error during close: {e}")else:# Fallback for when the loop isn't runningtry:# This is not ideal in an async context but is a last resortasyncio.run(self._websocket.close())except:passifself._send_thread.is_alive():self._send_thread.join(timeout=1)print("WebSocketClientAudioSink: Connection closed.")