import os, string
import random as _random
from numpy import fft, conjugate, array, mean, arange, searchsorted, argsort, pi
from pyfusion.utils.utils import warn
from pyfusion.debug_ import debug_
import pyfusion
import numpy as np
try:
import uuid
except: # python 2.4
pass
## for python <=2.5 compat, bin() is only python >= 2.6
## code taken from http://stackoverflow.com/questions/1993834/how-change-int-to-binary-on-python-2-5
def __bin(value):
binmap = {'0':'0000', '1':'0001', '2':'0010', '3':'0011',
'4':'0100', '5':'0101', '6':'0110', '7':'0111',
'8':'1000', '9':'1001', 'a':'1010', 'b':'1011',
'c':'1100', 'd':'1101', 'e':'1110', 'f':'1111'}
if value == 0:
return '0b0'
return '0b'+''.join(binmap[x] for x in ('%x' % (value,))).lstrip('0') or '0'
try:
_bin = bin
except NameError: # python <= 2.5
_bin = __bin
[docs]def unique_id():
try:
return str(uuid.uuid4())
except:
return ''.join(_random.choice(string.letters) for i in range(50))
[docs]def get_axes_pixcells(ax):
""" return the pixcell coorindates of the axes ax
This is useful in determining how many characters will fit
"""
return(ax.get_window_extent().bounds)
[docs]def cps(a,b):
return fft.fft(a)*conjugate(fft.fft(b)) # bdb fft 10%
[docs]def subdivide_interval(pts, overlap= None, debug=0):
""" return several intervals which straddle pts
with overlap of ov
The lowest x value is special - the point of division is much closer
to that x then zero.
overlap is a tuple (minoverlap, max), and describes the total overlap
"""
if overlap is None: overlap = [np.max(pts)/50, np.max(pts)/20]
if len(overlap) == 1:
warn('overlap should have a min and a max')
overlap = [overlap/3.0, overlap]
if (np.diff(pts)<0).any():
warn('points out of order - reordering')
pts = np.sort(pts)
begins = []
ends = []
for (i, x) in enumerate(pts):
if i == 0:
divider = x * 0.8 - overlap[1]/2.
else:
divider = (x + pts[i-1])/2.
if i == 0:
begins.append(0)
ends.append(divider + overlap[1]/2.)
else:
this_overlap = min(max((divider - last_divider)/20,overlap[0]),
overlap[1])/2.
begins.append(last_divider - this_overlap)
ends.append(divider + this_overlap)
last_divider = divider
if debug>1: print(begins, pts, ends)
if debug>2:
import pylab as pl
pl.figure()
for i in range(len(pts)):
pl.plot([begins[i],ends[i]],[i,i])
if i>0: pl.plot([pts[i-1],pts[i-1]],[i,i],'o')
pl.ylim(-1,20)
pl.show()
return(begins, ends)
[docs]def find_peaks(arr, minratio=.001, debug=0):
""" find the peaks in the data in arr, by selecting points
See also Shauns using find_peaks running average
where the slope changes sign, and the value is > minratio*max(arr) """
darr = np.diff(arr)
wnz = np.where(darr != 0)[0]
w_ch_sign = np.where(darr[wnz][0:-1]*darr[wnz][1:] < 0)[0]
# now check these to find the max
maxarr = np.max(arr[1:]) # to avoid zero freq
maxi = []
for i in w_ch_sign:
darr_left = darr[wnz[i]]
darr_right = darr[wnz[i]+1]
if darr_left > 0: # have a maximum
imax = np.argmax(arr[wnz[i]:wnz[i]+2])
iarrmax = wnz[i]+imax # imax was relative to subarray
if arr[iarrmax] > minratio*maxarr:
maxi.append(iarrmax)
if debug>1: print('arr elt {ii} = {v:.2f}'
.format(ii=iarrmax,
v=arr[iarrmax]))
debug_(pyfusion.DEBUG,level=2, key='find_peaks')
return(np.array(maxi))
[docs]def find_signal_spectral_peaks(timebase, signal, minratio = .001, debug=0):
ns = len(signal)
FT = np.fft.fft(signal-np.average(signal))/ns # bdb 0% fft?
ipks = find_peaks(np.abs(FT)[0:ns/2], minratio = minratio, debug=1)
fpks = ipks/np.average(np.diff(timebase))/float(ns)
if debug>1:
import pylab as pl
pl.semilogy(np.abs(FT))
pl.semilogy(ipks,np.abs(FT)[ipks],'o')
return(ipks, fpks, np.abs(FT)[ipks])
[docs]def peak_freq(signal,timebase,minfreq=0,maxfreq=1.e18):
"""
TODO: old code: needs review - since then bdb helped a bit...
now also returns peaking factor
this function only has a basic unittest to make sure it returns
the correct freq in a simple case.
>>> tb = np.linspace(0,1,10000)
>>> int(peak_freq(np.sin(2*np.pi*567*tb), tb)[1])
567
"""
timebase = array(timebase)
# Note - the call to this uses the first Svector
sig_fft = fft.rfft(signal) # bdb 5% fft (before rfft - was just fft)
#sample_time = float(mean(timebase[1:]-timebase[:-1]))
sample_time = np.average(np.diff(timebase))
#fft_freqs = (1./sample_time)*arange(len(sig_fft)).astype(float)/(len(sig_fft)-1)
# I think the -1 in (len() - 1) was an error - bdb
fft_freqs = (1./sample_time)*arange(len(sig_fft)).astype(float)/len(signal)
""" not needed for rfft # only show up to nyquist freq
new_len = len(sig_fft)/2
sig_fft = sig_fft[:new_len]
fft_freqs = fft_freqs[:new_len]
"""
[minfreq_elmt,maxfreq_elmt] = searchsorted(fft_freqs,[minfreq,maxfreq])
sig_fft = sig_fft[minfreq_elmt:maxfreq_elmt]
fft_freqs = fft_freqs[minfreq_elmt:maxfreq_elmt]
peak_elmt = (argsort(abs(sig_fft)))[-1]
pkfactor = np.max(np.abs(sig_fft))/np.sqrt(np.average(np.abs(sig_fft)**2))
return [fft_freqs[peak_elmt], peak_elmt, pkfactor]
[docs]def remap_periodic(input_array, min_val, period = 2*pi):
while len(input_array[input_array<min_val]) > 0:
input_array[input_array<min_val] += period
while len(input_array[input_array>=min_val+period]) > 0:
input_array[input_array>=min_val+period] -= period
return input_array
[docs]def list2bin(input_list):
# we explicitly cast to int(), as numpy's integer type clashes with sqlalchemy
return int(sum(2**array(input_list)))
[docs]def bin2list(input_value):
output_list = []
bin_index_str = _bin(input_value)[2:][::-1]
for ind,i in enumerate(bin_index_str):
if i == '1':
output_list.append(ind)
return output_list
[docs]def split_names(names, pad=' ',min_length=3):
""" Given an array of strings, return an array of the part of the string
(e.g. channel name) that varies, and optionally the prefix and suffix.
The array of varying parts is first in the tuple in case others are not
wanted. This is used to make the x labels of phase plots simpler and smaller.
e.g.
>>> split_names(['MP01','MP10'],min_length=2)
(['01', '10'], 'MP', '')
The pad char is put on the end of shorter names - a better way would be
to keep the end char the same, and pad in between the beginning and end
the per channel part is at least min_length long.
This is not really needed, as the routine chooses the lenght so
that the results are not ambiguous (MP01,MP02 -> 1,2 but MP01,MP12 -> 01,12
"""
# make a new array with elements padded to the same length with <pad>
nms = []
maxlen = max([len(nm) for nm in names]) # length of the longest name
for nm in names:
nmarr = [c for c in nm]
while len(nmarr)< maxlen: nmarr.append(pad)
nms.append(nmarr)
# the following numpy array comparisons look simple, but require the name string
# to be exploded into chars. Although a single string can be interchangeably
# referred to as a string or array of chars, these arrays they have to be
# re-constituted before return.
#
# for nm in nms: # for each nm
#find the first mismatch - first will be the first char of the extracted arr
nms_arr=array(nms)
first=0
while (first < maxlen and
(nms_arr[:,first] == nms_arr[0,first]).all()):
first += 1
# and the last
last = maxlen-1
while ((last >= 0) and
(nms_arr[:,last] == nms_arr[0,last]).all()):
last -= 1
# check for no mismatch
if first==maxlen: return(['' for nm in names], ''.join(nms[0]),'')
# otherwise return, (no need for special code for the case of no match at all)
if (1+last-first) < min_length:
add_chars = min_length - (1+last-first)
first = max(0, first-add_chars)
print(first, last, add_chars, maxlen)
return(([''.join(s) for s in nms_arr[:,first:last+1]],
''.join(nms_arr[0,0:first]),
''.join(nms_arr[0,last+1:maxlen+1])))
[docs]def make_title(formatstr, input_data, channum=None, at_dict = {}, min_length=3, raw_names=False):
""" Return a string describing the shot number, channel name etc using
a formatstr which refers to items in a dictionary (at_dict), assembled in
this routine, based on input_data and an optional dictionary which
contains anything not otherwise available in input_data
"""
## at_dict.update({'shot': input_data.meta['shot']})
exception = () if pyfusion.DBG() > 3 else Exception
try:
at_dict.update(input_data.meta) # this gets all of it!
if channum is None:
name = ''
else:
if isinstance(input_data.channels, list):
chan = input_data.channels[channum]
else:
chan = input_data.channels
if raw_names:
name = chan.name
else:
name = chan.config_name
at_dict.update({'units':chan.units})
at_dict.update({'name': name})
# replace internal strings of non-numbers with a single . a14_input03 -> 14.03
short_name=''
last_was_number = False
discarded=''
for c in name: # start from the first char
if c>='0' and c<='9':
short_name += c
last_was_number=True
else:
if last_was_number: short_name += '.'
else: discarded += c
last_was_number=False
if len(short_name) <= min_length:
# if it fits, have the lot
if len(name)<8: short_name=name
# else allow 4 more chars - makes about 6-8 chars
else: short_name = discarded[-4:] + short_name
at_dict.update({'short_name': short_name})
return(formatstr.format(**at_dict))
except exception as ex:
warn('in make_title for format="%s", at_dict=%s' % (formatstr, at_dict),
exception=ex)
return('')
if __name__ == '__main__':
# test program
import doctest
doctest.testmod()
import pyfusion
x=find_peaks([1,2,3,4,2,1,1,10,5,4,3,4,5])
tb = np.linspace(0,1e-3,1000)
signal = np.cos(2*np.pi*20e3*tb) + np.sin(2*np.pi*40e3*tb)
(ip,fp,ap) = find_signal_spectral_peaks(tb, signal, debug=2)
subdivide_interval([.5,1,1.2,2,3], debug=2)