[docs]classTCPServerAudioSource(BaseAudioSource):""" An audio source that listens for a single incoming TCP connection, receives audio data, processes it, and pushes it to a provided sink. """
[docs]def__init__(self,sink,disconnect_callback=None,host='0.0.0.0',port=8123,gain_factor=1.0,blocksize=960):""" Initializes the TCPServerAudioSource. Args: sink: A callable that accepts one argument (the audio chunk as a NumPy array). disconnect_callback (callable, optional): A callable that is invoked when a client disconnects. host (str): The host address to listen on. port (int): The port to listen on. gain_factor (float): Factor to amplify the incoming audio. blocksize (int): The number of frames per audio chunk. """super().__init__(sink,disconnect_callback,blocksize=blocksize)self.host=hostself.port=portself.gain_factor=gain_factorself.server=None
asyncdef_handle_client(self,reader,writer):"""Handles a single client connection."""addr=writer.get_extra_info('peername')print(f"[*] Accepted connection from {addr[0]}:{addr[1]}")# Each int16 frame is 2 byteschunk_bytes=self.blocksize*2try:whileTrue:try:data=awaitreader.readexactly(chunk_bytes)ifnotdata:break# Should not happen with readexactly, but good practiceexcept(asyncio.IncompleteReadError,ConnectionAbortedError,ConnectionResetError)ase:print(f"Client {addr} disconnected: {e}")break# Exit the loop cleanly on disconnectionaudio_chunk=np.frombuffer(data,dtype=np.int16)ifself.gain_factor!=1.0:amplified_chunk=(audio_chunk.astype(np.float32)*self.gain_factor)processed_chunk=np.clip(amplified_chunk,-32768,32767).astype(np.int16)else:processed_chunk=audio_chunkself.sink(processed_chunk)exceptExceptionase:print(f"An unexpected error occurred with client {addr}: {e}")finally:print(f"Closing connection from {addr}")ifself.disconnect_callback:try:self.disconnect_callback()exceptExceptionase:print(f"Error in disconnect_callback: {e}")writer.close()awaitwriter.wait_closed()
[docs]asyncdefstart(self):"""Starts the server to listen for an audio source."""ifself.serverisnotNone:print("Server is already running.")returnself.server=awaitasyncio.start_server(self._handle_client,self.host,self.port)addrs=', '.join(str(sock.getsockname())forsockinself.server.sockets)print(f'[*] Audio source server listening on {addrs}')asyncwithself.server:awaitself.server.serve_forever()
[docs]asyncdefstop(self):"""Stops the audio server gracefully."""ifself.server:self.server.close()awaitself.server.wait_closed()self.server=Noneprint("Server stopped.")