Animated Realtime Spectrograph with Scrolling Waterfall Display in Python
⚠️ WARNING: This page is obsolete
Articles typically receive this designation when the technology they describe is no longer relevant, code provided is later deemed to be of poor quality, or the topics discussed are better presented in future articles. Articles like this are retained for the sake of preservation, but their content should be critically assessed.
My project is coming along nicely. This isn’t an incredibly robust spectrograph program, but it sure gets the job done quickly and easily. The code below will produce a real time scrolling spectrograph entirely with Python! It polls the microphone (or default recording device), should work on any OS, and can be adjusted for vertical resolution / FFT frequency discretion resolution. It has some simple functions for filtering (check out the de-trend filter!) and might serve as a good start to a spectrograph / frequency analysis project. It took my a long time to reach this point! I’ve worked with Python before, and dabbled with the Python Imaging Library (PIL), but this is my first experience with real time linear data analysis and high-demand multi-threading. I hope it helps you. Below are screenshots of the program (two running at the same time) listening to the same radio signals (mostly Morse code) with standard output and with the “de-trending filter” activated.
import pyaudio
import scipy
import struct
import scipy.fftpack
from Tkinter import *
import threading
import time
import datetime
import wckgraph
import math
import Image
import ImageTk
from PIL import ImageOps
from PIL import ImageChops
import time
import random
import threading
import scipy
# ADJUST RESOLUTION OF VERTICAL FFT
bufferSize = 2**11
# bufferSize=2**8
# ADJUSTS AVERAGING SPEED NOT VERTICAL RESOLUTION
# REDUCE HERE IF YOUR PC CANT KEEP UP
sampleRate = 24000
# sampleRate=64000
p = pyaudio.PyAudio()
chunks = []
ffts = []
def stream():
global chunks, inStream, bufferSize
while True:
chunks.append(inStream.read(bufferSize))
def record():
global w, inStream, p, bufferSize
inStream = p.open(format=pyaudio.paInt16, channels=1,
rate=sampleRate, input=True, frames_per_buffer=bufferSize)
threading.Thread(target=stream).start()
# stream()
def downSample(fftx, ffty, degree=10):
x, y = [], []
for i in range(len(ffty)/degree-1):
x.append(fftx[i*degree+degree/2])
y.append(sum(ffty[i*degree:(i+1)*degree])/degree)
return [x, y]
def smoothWindow(fftx, ffty, degree=10):
lx, ly = fftx[degree:-degree], []
for i in range(degree, len(ffty)-degree):
ly.append(sum(ffty[i-degree:i+degree]))
return [lx, ly]
def smoothMemory(ffty, degree=3):
global ffts
ffts = ffts+[ffty]
if len(ffts) < =degree:
# ly.append(fft[i]-(ffty[i-degree]+ffty[i+degree])/2) return [lx,ly] def graph(): global chunks, bufferSize, fftx,ffty, w if len(chunks)>0:
return ffty ffts = ffts[1:] return scipy.average(scipy.array(ffts), 0) def detrend(fftx, ffty, degree=10): lx, ly = fftx[degree:-degree], [] for i in range(degree, len(ffty)-degree): ly.append((ffty[i]-sum(ffty[i-degree:i+degree])/(degree*2)) * 2+128)
data = chunks.pop(0)
data = scipy.array(struct.unpack("%dB" % (bufferSize*2), data))
ffty = scipy.fftpack.fft(data)
fftx = scipy.fftpack.rfftfreq(bufferSize*2, 1.0/sampleRate)
fftx = fftx[0:len(fftx)/4]
ffty = abs(ffty[0:len(ffty)/2])/1000
ffty1 = ffty[:len(ffty)/2]
ffty2 = ffty[len(ffty)/2::]+2
ffty2 = ffty2[::-1]
ffty = ffty1+ffty2
ffty = (scipy.log(ffty)-1)*120
fftx, ffty = downSample(fftx, ffty, 2)
updatePic(fftx, ffty)
reloadPic()
if len(chunks) > 20:
print "falling behind...", len(chunks)
def go(x=None):
global w, fftx, ffty
print "STARTING!"
threading.Thread(target=record).start()
while True:
# record()
graph()
def updatePic(datax, data):
global im, iwidth, iheight
strip = Image.new("L", (1, iheight))
if len(data) > iheight:
data = data[:iheight-1]
# print "MAX FREQ:",datax[-1]
strip.putdata(data)
# print "%03d, %03d" % (max(data[-100:]), min(data[-100:]))
im.paste(strip, (iwidth-1, 0))
im = im.offset(-1, 0)
root.update()
def reloadPic():
global im, lab
lab.image = ImageTk.PhotoImage(im)
lab.config(image=lab.image)
root = Tk()
im = Image.open('./ramp.tif')
im = im.convert("L")
iwidth, iheight = im.size
im = im.crop((0, 0, 500, 480))
# im=Image.new("L",(100,1024))
iwidth, iheight = im.size
root.geometry('%dx%d' % (iwidth, iheight))
lab = Label(root)
lab.place(x=0, y=0, width=iwidth, height=iheight)
go()
UPDATE: I’m not going to post the code for this yet (it’s very messy) but I got this thing to display a spectrograph on a canvas. What’s the advantage of that? Huge, massive spectrographs (thousands of pixels in all directions) can now be browsed in real time using scrollbars, and when you scroll it doesn’t stop recording, and you don’t lose any data! Super cool.