<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;"># Copyright (C) 2010  Leo Singer
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
"""
Sliding maximum of past n samples.
"""
__author__ = "Leo Singer &lt;leo.singer@ligo.org&gt;"


from gstlal.pipeutil import *
from gstlal import pipeio
import numpy


class sliding_max(gst.BaseTransform):

	__gstdetails__ = (
		"Sliding maximum",
		"Filter",
		__doc__.strip(),
		__author__
	)
	__gproperties__ = {
		'n': (
			gobject.TYPE_UINT,
			'Window length',
			'Number of samples in sliding window',
			1, gobject.G_MAXUINT, 16, # min, max, default
			gobject.PARAM_READWRITE | gobject.PARAM_CONSTRUCT
		),
	}
	__gsttemplates__ = (
		gst.PadTemplate("sink",
			gst.PAD_SINK, gst.PAD_ALWAYS,
			gst.caps_from_string("""
				audio/x-raw-float,
				endianness = (int) BYTE_ORDER,
				width = (int) {32, 64},
				channels = (int) 1
			""")
		),
		gst.PadTemplate("src",
			gst.PAD_SRC, gst.PAD_ALWAYS,
			gst.caps_from_string("""
				audio/x-raw-float,
				endianness = (int) BYTE_ORDER,
				width = (int) {32, 64},
				channels = (int) 1
			""")
		)
	)

	def do_set_property(self, prop, val):
		"""gobject-&gt;set_property virtual method."""
		if prop.name == 'n':
			self.n = val

	def do_get_property(self, prop):
		"""gobject-&gt;get_property virtual method."""
		if prop.name == 'n':
			return self.n

	def do_start(self):
		"""GstBaseTransform-&gt;start virtual method."""
		self.history = []
		return True

	def do_transform(self, inbuf, outbuf):
		"""GstBaseTransform-&gt;transform virtual method."""

		# Convert received buffer to Numpy array.
		x = pipeio.array_from_audio_buffer(inbuf)

		# Create output array.
		y = numpy.zeros(x.shape, dtype=x.dtype)

		# Do the dirty work.
		for i, xi in enumerate(x):
			self.history.append(xi)
			while len(self.history) &gt; self.n:
				self.history.pop(0)
			y[i] = max(self.history)

		# Copy output to buffer.
		outbuf[:len(y.data)] = y.data

		# Done!
		return gst.FLOW_OK


# Register element class
gstlal_element_register(sliding_max)
</pre></body></html>