<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;"># Copyright (C) 2010  Leo Singer
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
"""
Sliding maximum of past n samples, heap implementation.
"""
__author__ = "Leo Singer &lt;leo.singer@ligo.org&gt;"


from gstlal.pipeutil import *
from gstlal import pipeio
import numpy


class SlidingMaxHeap(object):

	def __init__(self, n):
		self.offset = 0
		self.n = n
		self.heap = []
		self.history = []

	def update(self, x):
		nsamples = len(self.history)
		if nsamples &lt; self.n:
			self.heap.append( (x, self.offset) )
			self.history.append(nsamples)
			self.sift_up(nsamples)
		else:
			old_index = self.history[self.offset % self.n]
			new_index = nsamples - 1
			if new_index != old_index:
				self.swap(old_index, new_index)
				self.sift(old_index)
			self.history[self.offset % self.n] = new_index
			self.heap[new_index] = (x, self.offset)
			self.sift_up(new_index)
		self.offset += 1
		return self.heap[0][0]

	def is_heap(self):
		for i in range(len(self.history)):
			j = 2*i+1
			k = 2*i+2
			if j &lt; len(self.history) and self.cmp(i, j) != 1:
				return False
			if k &lt; len(self.history) and self.cmp(i, j) != 1:
				return False
		return True

	def swap(self, i, j):
		offset_i = self.heap[i][1] % self.n
		offset_j = self.heap[j][1] % self.n
		self.history[offset_i], self.history[offset_j] = j, i
		self.heap[i], self.heap[j] = self.heap[j], self.heap[i]

	def cmp(self, i, j):
		return cmp(self.heap[i][0], self.heap[j][0])

	def sift_up(self, i):
		while i &gt; 0:
			j = (i - 1) / 2
			if self.cmp(i, j) == -1:
				break
			self.swap(i, j)
			i = j

	def sift_down(self, i):
		while True:
			j = 2 * i + 1
			k = j + 1
			if k &lt; self.n - 1:
				if self.cmp(i, j) == -1:
					if self.cmp(j, k) == -1:
						self.swap(i, k)
						i = k
					else:
						self.swap(i, j)
						i = j
				elif self.cmp(i, k) == -1:
					self.swap(i, k)
					i = k
				else:
					break
			elif j &lt; self.n - 1 and self.cmp(i, j) == -1:
				self.swap(i, j)
				i = j
			else:
				break

	def sift(self, i):
		if i == 0:
			self.sift_down(i)
		else:
			parent = (i - 1) / 2
			if self.heap[i] &gt; self.heap[parent]:
				self.sift_up(i)
			else:
				self.sift_down(i)


class sliding_max_fast(gst.BaseTransform):

	__gstdetails__ = (
		"Sliding maximum",
		"Filter",
		__doc__.strip(),
		__author__
	)
	__gproperties__ = {
		'n': (
			gobject.TYPE_UINT,
			'Window length',
			'Number of samples in sliding window',
			1, gobject.G_MAXUINT, 16, # min, max, default
			gobject.PARAM_READWRITE | gobject.PARAM_CONSTRUCT
		),
	}
	__gsttemplates__ = (
		gst.PadTemplate("sink",
			gst.PAD_SINK, gst.PAD_ALWAYS,
			gst.caps_from_string("""
				audio/x-raw-float,
				endianness = (int) BYTE_ORDER,
				width = (int) {32, 64},
				channels = (int) 1
			""")
		),
		gst.PadTemplate("src",
			gst.PAD_SRC, gst.PAD_ALWAYS,
			gst.caps_from_string("""
				audio/x-raw-float,
				endianness = (int) BYTE_ORDER,
				width = (int) {32, 64},
				channels = (int) 1
			""")
		)
	)

	def do_set_property(self, prop, val):
		"""gobject-&gt;set_property virtual method."""
		if prop.name == 'n':
			self.n = val
			self.heap = SlidingMaxHeap(self.n)

	def do_get_property(self, prop):
		"""gobject-&gt;get_property virtual method."""
		if prop.name == 'n':
			return self.n

	def do_transform(self, inbuf, outbuf):
		"""GstBaseTransform-&gt;transform virtual method."""

		# Convert received buffer to Numpy array.
		x = pipeio.array_from_audio_buffer(inbuf)

		# Do the dirty work.
		y = numpy.array([self.heap.update(xi) for xi in x.flatten()], dtype=x.dtype)

		# Copy output to buffer.
		outbuf[:len(y.data)] = y.data

		# Done!
		return gst.FLOW_OK


# Register element class
gstlal_element_register(sliding_max_fast)
</pre></body></html>