asyncio real-time signal processing
I’m taken with using Python’s relatively recent asycio module for real-time DSP. This isn’t because it looks particularly efficient but because it looks very similar to how we do DSP on an FPGA. Of course we won’t actually get the massive parallelism of a real FPGA but by setting up all our coroutines at the start and letting things go, the data will naturally flow through the processing chain and handle back pressure without any particular intervention or scheduling by us. This also allows us to tap into the the data stream at any point for addition processing, plotting or saving.
In FPGA land we have all our DSP (FIR filtering, demodulating, decimating etc.) as modules that takes streams of data (AXI streams) and every clock cycle they’ll process data with something like:
if rising_edge(clk) and data_vld = '1' then
process_data....
end if;
All the modules are connected by FIFOs that allow the system to cross clock domains and soak up back-pressure if the system is producing data faster than it can be processed. Of course if the system is continually taking data faster than processing we’ll run into an issue but for short burst of data this works well as the chains of FIFO’s just absorb the pressure without any additional thinking.
With Python’s asyncio co-routines and Queues we should be able to setup
something very similar. Let’s setup a simple class structure with
DataProducer
’s, DataCrucher
’s, DataPlotters
’s, and DataWriters
.
A DataProducer
is anything that produces data that can be consumed by
anything else. At the start this may be an ADC but in a chain of filters each
element is a DataCruncher
that also acts as a DataProducer
for next
element in the chain. Let’s make a fake ADC that takes data up to a timeout
and then finishes.
class DataProducer(object):
queue
class ADC(DataProducer):
async def take_data(self, loop, timeout):
end_time = loop.time() + timeout
while True:
data = np.random.rand(100)
timeStamp = str(datetime.datetime.now())
logging.info("Acquired data and putting in queue")
await self.queue.put(data)
if (loop.time() + 1.0) >= end_time:
shutdown(loop)
break
await asyncio.sleep(1)
A DataProducer
has a queue that it pushes data to. The data will be
correctly routed by a DataInterconnect
later. We define the take_data
method as an async co-routine using the new Python 3.5 syntax of async def
.
Since the
Queue.put
method is a coroutine we await
on that (again using the the new syntax
instead of the old yield from
).
class DataCrucher(DataProducer):
pass
In Python the FIFOs will be handled by asyncio.Queue