# Copyright (C) 2010 Leo Singer # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """ Sliding maximum of past n samples, heap implementation. """ __author__ = "Leo Singer " from gstlal.pipeutil import * from gstlal import pipeio import numpy class SlidingMaxHeap(object): def __init__(self, n): self.offset = 0 self.n = n self.heap = [] self.history = [] def update(self, x): nsamples = len(self.history) if nsamples < self.n: self.heap.append( (x, self.offset) ) self.history.append(nsamples) self.sift_up(nsamples) else: old_index = self.history[self.offset % self.n] new_index = nsamples - 1 if new_index != old_index: self.swap(old_index, new_index) self.sift(old_index) self.history[self.offset % self.n] = new_index self.heap[new_index] = (x, self.offset) self.sift_up(new_index) self.offset += 1 return self.heap[0][0] def is_heap(self): for i in range(len(self.history)): j = 2*i+1 k = 2*i+2 if j < len(self.history) and self.cmp(i, j) != 1: return False if k < len(self.history) and self.cmp(i, j) != 1: return False return True def swap(self, i, j): offset_i = self.heap[i][1] % self.n offset_j = self.heap[j][1] % self.n self.history[offset_i], self.history[offset_j] = j, i self.heap[i], self.heap[j] = self.heap[j], self.heap[i] def cmp(self, i, j): return cmp(self.heap[i][0], self.heap[j][0]) def sift_up(self, i): while i > 0: j = (i - 1) / 2 if self.cmp(i, j) == -1: break self.swap(i, j) i = j def sift_down(self, i): while True: j = 2 * i + 1 k = j + 1 if k < self.n - 1: if self.cmp(i, j) == -1: if self.cmp(j, k) == -1: self.swap(i, k) i = k else: self.swap(i, j) i = j elif self.cmp(i, k) == -1: self.swap(i, k) i = k else: break elif j < self.n - 1 and self.cmp(i, j) == -1: self.swap(i, j) i = j else: break def sift(self, i): if i == 0: self.sift_down(i) else: parent = (i - 1) / 2 if self.heap[i] > self.heap[parent]: self.sift_up(i) else: self.sift_down(i) class sliding_max_fast(gst.BaseTransform): __gstdetails__ = ( "Sliding maximum", "Filter", __doc__.strip(), __author__ ) __gproperties__ = { 'n': ( gobject.TYPE_UINT, 'Window length', 'Number of samples in sliding window', 1, gobject.G_MAXUINT, 16, # min, max, default gobject.PARAM_READWRITE | gobject.PARAM_CONSTRUCT ), } __gsttemplates__ = ( gst.PadTemplate("sink", gst.PAD_SINK, gst.PAD_ALWAYS, gst.caps_from_string(""" audio/x-raw-float, endianness = (int) BYTE_ORDER, width = (int) {32, 64}, channels = (int) 1 """) ), gst.PadTemplate("src", gst.PAD_SRC, gst.PAD_ALWAYS, gst.caps_from_string(""" audio/x-raw-float, endianness = (int) BYTE_ORDER, width = (int) {32, 64}, channels = (int) 1 """) ) ) def do_set_property(self, prop, val): """gobject->set_property virtual method.""" if prop.name == 'n': self.n = val self.heap = SlidingMaxHeap(self.n) def do_get_property(self, prop): """gobject->get_property virtual method.""" if prop.name == 'n': return self.n def do_transform(self, inbuf, outbuf): """GstBaseTransform->transform virtual method.""" # Convert received buffer to Numpy array. x = pipeio.array_from_audio_buffer(inbuf) # Do the dirty work. y = numpy.array([self.heap.update(xi) for xi in x.flatten()], dtype=x.dtype) # Copy output to buffer. outbuf[:len(y.data)] = y.data # Done! return gst.FLOW_OK # Register element class gstlal_element_register(sliding_max_fast)