# Copyright (C) 2010 Leo Singer # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """ Sliding maximum of past n samples. """ __author__ = "Leo Singer " from gstlal.pipeutil import * from gstlal import pipeio import numpy class sliding_max(gst.BaseTransform): __gstdetails__ = ( "Sliding maximum", "Filter", __doc__.strip(), __author__ ) __gproperties__ = { 'n': ( gobject.TYPE_UINT, 'Window length', 'Number of samples in sliding window', 1, gobject.G_MAXUINT, 16, # min, max, default gobject.PARAM_READWRITE | gobject.PARAM_CONSTRUCT ), } __gsttemplates__ = ( gst.PadTemplate("sink", gst.PAD_SINK, gst.PAD_ALWAYS, gst.caps_from_string(""" audio/x-raw-float, endianness = (int) BYTE_ORDER, width = (int) {32, 64}, channels = (int) 1 """) ), gst.PadTemplate("src", gst.PAD_SRC, gst.PAD_ALWAYS, gst.caps_from_string(""" audio/x-raw-float, endianness = (int) BYTE_ORDER, width = (int) {32, 64}, channels = (int) 1 """) ) ) def do_set_property(self, prop, val): """gobject->set_property virtual method.""" if prop.name == 'n': self.n = val def do_get_property(self, prop): """gobject->get_property virtual method.""" if prop.name == 'n': return self.n def do_start(self): """GstBaseTransform->start virtual method.""" self.history = [] return True def do_transform(self, inbuf, outbuf): """GstBaseTransform->transform virtual method.""" # Convert received buffer to Numpy array. x = pipeio.array_from_audio_buffer(inbuf) # Create output array. y = numpy.zeros(x.shape, dtype=x.dtype) # Do the dirty work. for i, xi in enumerate(x): self.history.append(xi) while len(self.history) > self.n: self.history.pop(0) y[i] = max(self.history) # Copy output to buffer. outbuf[:len(y.data)] = y.data # Done! return gst.FLOW_OK # Register element class gstlal_element_register(sliding_max)