[docs]classTCPClientAudioSink(BaseAudioSink):""" An audio sink that connects to a TCP server and sends audio chunks. """
[docs]def__init__(self,host,port,sample_rate=16000,channels=1,dtype='int16',blocksize=None):""" Initializes the TCPClientAudioSink. Args: host (str): The hostname or IP address of the server to connect to. port (int): The port of the server. sample_rate, channels, dtype, blocksize: Passed to BaseAudioSink. """super().__init__(sample_rate,channels,dtype,blocksize)self.host=hostself.port=portself._writer=Noneself._loop=Noneself._send_thread=threading.Thread(target=self._send_loop,daemon=True)
[docs]asyncdefstart(self):""" Establishes the connection to the TCP server and starts the sending loop. """print(f"TCPClientAudioSink: Attempting to connect to {self.host}:{self.port}...")try:reader,writer=awaitasyncio.open_connection(self.host,self.port)self._writer=writerself._loop=asyncio.get_running_loop()self._send_thread.start()print("TCPClientAudioSink: Connection successful.")exceptConnectionRefusedError:print(f"TCPClientAudioSink: Connection refused by {self.host}:{self.port}. Is the server running?")self.close()raiseexceptExceptionase:print(f"TCPClientAudioSink: An unexpected error occurred during connection: {e}")self.close()raise
def_send_loop(self):"""The actual sending logic that runs in a separate thread."""whilenotself._is_closed.is_set():try:chunk=self._buffer.popleft()ifself._writerandnotself._writer.is_closing():# Schedule the write operation on the event loop from this threadfuture=asyncio.run_coroutine_threadsafe(self._write_chunk(chunk),self._loop)# Wait for the result to ensure the chunk is sent before proceedingfuture.result()exceptIndexError:time.sleep(0.005)# Wait for more dataexceptExceptionase:print(f"TCPClientAudioSink: Error in send loop: {e}")self.close()asyncdef_write_chunk(self,chunk:np.ndarray):"""Coroutine to write a chunk to the stream."""self._writer.write(chunk.tobytes())awaitself._writer.drain()
[docs]defclose(self):"""Closes the connection and stops the sender thread."""ifnotself._is_closed.is_set():super().close()print("TCPClientAudioSink: Closing connection...")ifself._writer:# Schedule the closing on the event loopifself._loopandself._loop.is_running():asyncio.run_coroutine_threadsafe(self._writer.close(),self._loop)else:# If loop is not running, a simple close might be all we can dotry:self._writer.close()except:# Ignore errors on final closepassifself._send_thread.is_alive():self._send_thread.join(timeout=1)print("TCPClientAudioSink: Connection closed.")