[docs]classRawWebSocketServerAudioSource(BaseAudioSource):""" An audio source that listens for a single WebSocket client, receives raw binary audio messages, and forwards them to a sink. This component is optimized for performance and expects only audio data. """
[docs]def__init__(self,sink,disconnect_callback=None,host='0.0.0.0',port=8765,blocksize=None):""" Initializes the RawWebSocketServerAudioSource. Args: sink, disconnect_callback: Passed to BaseAudioSource. host (str): The host address to listen on. port (int): The port to listen on. blocksize (int, optional): The preferred blocksize for audio chunks. Note: This source processes whatever it receives. """super().__init__(sink,disconnect_callback,blocksize=blocksize)self.host=hostself.port=portself.server_task=Noneself.server=None
asyncdef_handler(self,websocket):""" Handles the WebSocket connection for a single client. """print(f"WebSocket client connected from {websocket.remote_address}")try:asyncformessageinwebsocket:# We expect the incoming messages to be binary audio dataifisinstance(message,bytes):# Convert the bytes to a NumPy array and push to the sinkaudio_chunk=np.frombuffer(message,dtype=np.int16)self.sink(audio_chunk)else:print(f"Warning: Received non-binary message: {message}")exceptwebsockets.exceptions.ConnectionClosedase:print(f"WebSocket client disconnected: {e}")exceptExceptionase:print(f"An error occurred in the WebSocket handler: {e}")finally:ifself.disconnect_callback:self.disconnect_callback()
[docs]asyncdefstart(self):"""Starts the WebSocket server."""ifself.server_taskisnotNone:print("WebSocket server is already running.")returntry:print(f"Starting WebSocket audio source server on {self.host}:{self.port}")self.server=awaitwebsockets.serve(self._handler,self.host,self.port)# Keep a reference to the task to allow stopping itself.server_task=asyncio.create_task(self.server.wait_closed())exceptExceptionase:print(f"Failed to start WebSocket server: {e}")self.server_task=Noneself.server=None
[docs]asyncdefstop(self):"""Stops the WebSocket server gracefully."""ifself.server:print("Stopping WebSocket server...")self.server.close()awaitself.server.wait_closed()self.server=Noneself.server_task=Noneprint("WebSocket server stopped.")