asyncio real-time signal processing

I’m taken with using Python’s relatively recent asycio module for real-time DSP. This isn’t because it looks particularly efficient but because it looks very similar to how we do DSP on an FPGA. Of course we won’t actually get the massive parallelism of a real FPGA but by setting up all our coroutines at the start and letting things go, the data will naturally flow through the processing chain and handle back pressure without any particular intervention or scheduling by us. This also allows us to tap into the the data stream at any point for addition processing, plotting or saving.

In FPGA land we have all our DSP (FIR filtering, demodulating, decimating etc.) as modules that takes streams of data (AXI streams) and every clock cycle they’ll process data with something like:

if rising_edge(clk) and data_vld = '1' then
	process_data....
end if;

All the modules are connected by FIFOs that allow the system to cross clock domains and soak up back-pressure if the system is producing data faster than it can be processed. Of course if the system is continually taking data faster than processing we’ll run into an issue but for short burst of data this works well as the chains of FIFO’s just absorb the pressure without any additional thinking.

With Python’s asyncio co-routines and Queues we should be able to setup something very similar. Let’s setup a simple class structure with DataProducer's, DataCrucher's, DataPlotters's, and DataWriters.

A DataProducer is anything that produces data that can be consumed by anything else. At the start this may be an ADC but in a chain of filters each element is a DataCruncher that also acts as a DataProducer for next element in the chain. Let’s make a fake ADC that takes data up to a timeout and then finishes.

class DataProducer(object):
	queue

class ADC(DataProducer):

	    async def take_data(self, loop, timeout):
	        end_time = loop.time() + timeout
	        while True:
	            data = np.random.rand(100)
	            timeStamp = str(datetime.datetime.now())
	            logging.info("Acquired data and putting in queue")
	            await self.queue.put(data)
	            if (loop.time() + 1.0) >= end_time:
	                shutdown(loop)
	                break
	            await asyncio.sleep(1)

A DataProducer has a queue that it pushes data to. The data will be correctly routed by a DataInterconnect later. We define the take_data method as an async co-routine using the new Python 3.5 syntax of async def. Since the Queue.put method is a coroutine we await on that (again using the the new syntax instead of the old yield from).

class DataCrucher(DataProducer):
	pass

In Python the FIFOs will be handled by asyncio.Queue