Source code for pyfusion.acquisition.base

"""Base classes for  pyfusion data acquisition. It is  expected that the
base classes  in this module will  not be called  explicitly, but rather
inherited by subclasses in the acquisition sub-packages.

"""
from __future__ import print_function
import numpy as np
from numpy.testing import assert_array_almost_equal
from numpy import ndarray, shape
from pyfusion.conf.utils import import_setting, kwarg_config_handler, \
     get_config_as_dict, import_from_str
from pyfusion.data.timeseries import Signal, Timebase, TimeseriesData
from pyfusion.data.base import Coords, Channel, ChannelList, get_coords_for_channel
from pyfusion.debug_ import debug_
import traceback
import sys, os
import pyfusion  # only needed for .VERBOSE and .DEBUG

[docs]def newloadv3(filename, verbose=1): """ This the the version that works in python 3, but can't handle Nans Intended to replace load() in numpy counterpart is data/savez_compress.py """ from numpy import load as loadz from numpy import cumsum, array dic=loadz(filename) # if dic['version'] != None: # if len((dic.files=='version').nonzero())>0: if len(dic.files)>3: if verbose>2: print ("local v%d " % (dic['version']),end='') else: if verbose>2: print("local v0: simple ", end='') return(dic) # quick, minimal return if verbose>2: print(' contains %s' % dic.files) signalexpr=dic['signalexpr'] timebaseexpr=dic['timebaseexpr'] if 'time_unit_in_seconds' in dic: timeunitexpr = dic['time_unit_in_seconds'] else: timeunitexpr = array(1) # savez saves ARRAYS always, so have to turn array back into scalar # exec(signalexpr.tolist()) # Changed exec code to eval for python3, otherwise the name was not defined # for the target variables - they could only be accessed with # e.g. locals().signal # retdic = {"signal":locals()['signal'], "timebase":locals()['timebase'], # "parent_element": dic['parent_element']} # Sucess using eval instead of exec signal = eval(signalexpr.tolist().split(b'=')[1]) time_unit_in_seconds = timeunitexpr.tolist() timebase = time_unit_in_seconds * eval(timebaseexpr.tolist().split(b'=')[1]) retdic = {"signal":signal, "timebase":timebase, "parent_element": dic['parent_element'], "params": dic['params'].tolist()} return(retdic)
if sys.version<'3,': from pyfusion.data.save_compress import newload else: newload = newloadv3
[docs]def try_fetch_local(input_data, bare_chan): """ return data if in the local cache, otherwise None doesn't work for single channel HJ data. sgn (not gain) be only be used at the single channel base/fetch level """ for each_path in pyfusion.config.get('global', 'localdatapath').split('+'): # check for multi value shot number, e.g. utc bounds for W7-X data shot = input_data.shot # MDSplus style path to access sorted files into folders by shot path, patt = os.path.split(each_path) # print(patt) if len(patt) == 2*len(patt.replace('~','')): # a subdir code based on date/shot subdir = '' # reverse the order of both the pattern and the shot so a posn is 0th char strshot = str(shot[0]) if len(np.shape(shot))>0 else str(shot) revshot = strshot[::-1] for i,ch in enumerate(patt): if (i%2) == 0: if ch != '~': raise LookupError("Can't parse {d} as a MDS style subdir" .format(d=patt)) continue subdir += revshot[ord(ch) - ord('a')] else: subdir = patt debug_(pyfusion.DEBUG, 3, key='MDS style subdir', msg=each_path) each_path = os.path.join(path, subdir) if isinstance(shot, (tuple, list, ndarray)): shot_str = '{s0}_{s1}'.format(s0=shot[0], s1=shot[1]) else: shot_str = str(shot) input_data.localname = os.path.join(each_path, '{shot}_{bc}.npz' .format(shot=shot_str, bc=bare_chan)) # original - data_filename %filename_dict) if pyfusion.VERBOSE>2: print(each_path,input_data.localname) files_exist = os.path.exists(input_data.localname) debug_(pyfusion.DEBUG, 3, key='try_local_fetch') if files_exist: intmp = np.any([st in input_data.localname.lower() for st in ['tmp', 'temp']]) # add anything you wish to warn about if pyfusion.VERBOSE>0 or intmp: if intmp: pyfusion.logging.warning('Using {f} in temporary directory!' .format(f=input_data.localname)) print('found local data in {f}'. format(f=input_data.localname)) break if not files_exist: return None signal_dict = newload(input_data.localname) if 'params' in signal_dict and 'name' in signal_dict['params'] and 'W7X_L5' in signal_dict['params']['name']: if signal_dict['params']['pyfusion_version'] < '0.6.8b': raise ValueError('probe assignments in error LP11-22 in {fn}' .format(fn=input_data.localname)) if np.nanmax(signal_dict['timebase']) == 0: pyfusion.logging.warning('making a fake timebase for {fn}' .format(fn=input_data.localname)) signal_dict['timebase'] = 2e-6*np.cumsum(1.0 + 0*signal_dict['signal']) coords = get_coords_for_channel(**input_data.__dict__) #ch = Channel(bare_chan, Coords('dummy', (0,0,0))) ch = Channel(bare_chan, coords) output_data = TimeseriesData(timebase=Timebase(signal_dict['timebase']), signal=Signal(signal_dict['signal']), channels=ch) # bdb - used "fetcher" instead of "self" in the "direct from LHD data" version # when using saved files, should use the name - not input_data.config_name # it WAS the config_name coming from the raw format. output_data.config_name = bare_chan # would be nice to get to the gain here - but how - maybe setup will get it output_data.meta.update({'shot':input_data.shot}) if 'params' in signal_dict: output_data.params = signal_dict['params'] if 'utc' in signal_dict['params']: output_data.utc = signal_dict['params'].get('utc',None) else: # yes, it seems like duplication, but no output_data.utc = None output_data.params = dict(comment = 'old npz file has no params') oldsrc = ', originally from ' + output_data.params['source'] if 'source' in output_data.params else '' output_data.params.update(dict(source='from npz cache' + oldsrc)) return(output_data)
[docs]class BaseAcquisition(object): """Base class for datasystem specific acquisition classes. :param config_name: name of acquisition as specified in\ configuration file. On instantiation, the pyfusion configuration is searched for a ``[Acquisition:config_name]`` section. The contents of the configuration section are loaded into the object namespace. For example, a configuration section:: [Acquisition:my_custom_acq] acq_class = pyfusion.acquisition.base.BaseAcquisition server = my.dataserver.com will result in the following behaviour:: >>> from pyfusion.acquisition.base import BaseAcquisition >>> my_acq = BaseAcquisition('my_custom_acq') >>> print(my_acq.server) my.dataserver.com The configuration entries can be overridden with keyword arguments:: >>> my_other_acq = BaseAcquisition('my_custom_acq', server='your.data.net') >>> print(my_other_acq.server) your.data.net """ def __init__(self, config_name=None, **kwargs): if config_name is not None: self.__dict__.update(get_config_as_dict('Acquisition', config_name)) self.__dict__.update(kwargs)
[docs] def getdata(self, shot, config_name=None, **kwargs): """Get the data and return prescribed subclass of BaseData. :param shot: shot number :param config_name: ?? bdb name of a fetcher class in the configuration file :returns: an instance of a subclass of \ :py:class:`~pyfusion.data.base.BaseData` or \ :py:class:`~pyfusion.data.base.BaseDataSet` This method needs to know which data fetcher class to use, if a config_name argument is supplied then the ``[Diagnostic:config_name]`` section must exist in the configuration file and contain a ``data_fetcher`` class specification, for example:: [Diagnostic:H1_mirnov_array_1_coil_1] data_fetcher = pyfusion.acquisition.H1.fetch.H1DataFetcher mds_path = \h1data::top.operations.mirnov:a14_14:input_1 coords_cylindrical = 1.114, 0.7732, 0.355 coord_transform = H1_mirnov If a ``data_fetcher`` keyword argument is supplied, it overrides the configuration file specification. The fetcher class is instantiated, including any supplied keyword arguments, and the result of the ``fetch`` method of the fetcher class is returned. """ from pyfusion import config # if there is a data_fetcher arg, use that, otherwise get from config if 'data_fetcher' in kwargs: fetcher_class_name = kwargs['data_fetcher'] else: fetcher_class_name = config.pf_get('Diagnostic', config_name, 'data_fetcher') fetcher_class = import_from_str(fetcher_class_name) ## Problem: no check to see if it is a diag of the right device!?? d = fetcher_class(self, shot, config_name=config_name, **kwargs).fetch() d.history += "\n:: shot: {s} :: config: {c}".format(s=shot, c=config_name) return d
[docs]class BaseDataFetcher(object): """Base class providing interface for fetching data from an experimental database. :param acq: in instance of a subclass of :py:class:`BaseAcquisition` :param shot: shot number :param config_name: name of a Diagnostic configuration section. It is expected that subclasses of BaseDataFetcher will be called via the :py:meth:`~BaseAcquisition.getdata` method, which calls the data fetcher's :py:meth:`fetch` method. """ def __init__(self, acq, shot, config_name=None, **kwargs): self.shot = shot self.acq = acq self.no_cache = False # this allows getData to request that cached data is NOT used - eg for saving local #bdb?? add device name here, so can prepend to Diagnostic # e.g. LHD_Diagnostic - avoids ambiguity debug_(pyfusion.DEBUG,5,key='device_name') if config_name is not None: self.__dict__.update(get_config_as_dict('Diagnostic', config_name)) # look for the first valid shot range for this diag - # see config/'Valid Shots' in the reference docs self.config_name = config_name devshort = self.config_name.split('_')[0] # assume the device short name is first, before _ for Mod in ['M1', 'M2', 'M3', 'M4']: if isinstance(shot, (tuple, list, ndarray)) and shot[0] > 1e9: pyfusion.logger.warning('Ignoring valid_shots') break # it is a utc pair, don't check for valid_shots (dangerous though!) if self.find_valid_for_shot(): break else: # replace W7X_ with W7XM1_ etc self.__dict__.update(get_config_as_dict('Diagnostic', config_name.replace(devshort, devshort+Mod))) else: raise LookupError(config_name) if pyfusion.VERBOSE>3: print(get_config_as_dict('Diagnostic', config_name)) self.__dict__.update(kwargs) # self.config_name=config_name # print('BaseDFinit',config_name,self.__dict__.keys())
[docs] def find_valid_for_shot(self): """ Determine if this diag definition or modified diag is valid for this shot """ def check_to_clause(shot, k, dic): """ check for leftover instances of to=[date, shot] where shot is 0 - this is very likely to be a mistake as there is no shot before 0 """ if ('_to' in k and isinstance(shot,(list, tuple, ndarray)) and dic[k][1] == 0): print('******** Warning - valid shot of 0 in to clause?') if hasattr(self,'valid_shots'): valid_shots = self.valid_shots elif hasattr(self.acq, 'valid_shots'): valid_shots = self.acq.valid_shots else: valid_shots = None # another example of inheritance - need to formalise this, extract the code to a function? is_valid = True if valid_shots is not None: valid_dict = eval('dict({ps})'.format(ps=valid_shots)) for k in valid_dict: root = k.replace('_from','').replace('_to','') if '_' + root in self.config_name: if pyfusion.VERBOSE>1: print(k, root, self.shot, valid_dict[k]) check_to_clause(self.shot, k, valid_dict) if ('_from' in k and self.shot < valid_dict[k] or ('_to' in k and self.shot > valid_dict[k])): is_valid = False debug_(pyfusion.DEBUG, 2, 'valid_shots') return(is_valid)
[docs] def setup(self): """Called by :py:meth:`fetch` before retrieving the data. setup() ideally does sufficient of the preliminaries for fetch so that their is enough information in self. for a useful error report if fetch fails, while avoiding exceptions wherever possible (during setup()). """ pass # base:setup should always be overridden
[docs] def do_fetch(self): """Actually fetches the data, using the environment set up by :py:meth:`setup` :returns: an instance of a subclass of \ :py:class:`~pyfusion.data.base.BaseData` or \ :py:class:`~pyfusion.data.base.BaseDataSet` Although :py:meth:`BaseDataFetcher.do_fetch` does not return any data object itself, it is expected that a `do_fetch()` method on a subclass of :py:class:`BaseDataFetcher` will. """ pass
[docs] def pulldown(self): """Called by :py:meth:`fetch` after retrieving the data.""" pass
[docs] def error_info(self, step=None): """ return specific information about error to aid interpretation - e.g for mds, path The dummy return should be replaced in the specific routines """ return('(further info not provided by %s)' % (self.acq.acq_class))
[docs] def fetch(self): """Always use this to fetch the data, so that :py:meth:`setup` and :py:meth:`pulldown` are used to setup and pull down the environmet used by :py:meth:`do_fetch`. :returns: the instance of a subclass of \ :py:class:`~pyfusion.data.base.BaseData` or \ :py:class:`~pyfusion.data.base.BaseDataSet` returned by \ :py:meth:`do_fetch` """ if pyfusion.DBG() > 2: exception = () # defeat the try/except else: exception = Exception sgn = 1 chan = self.config_name if chan[0]=='-': #sgn = -sgn raise ValueError('Channel {chan} should not have sign at this point' .format(chan=chan)) #bare_chan = (chan.split('-'))[-1] # use its gain, or if it doesn't have one, its acq gain. if pyfusion.RAW == 0: gain_units = "1 arb" # default to gain 1, no units if hasattr(self,'gain'): gain_units = self.gain elif hasattr(self.acq, 'gain'): gain_units = self.acq.gain else: gain_units = "1 raw" sgn *= float(gain_units.split()[0]) if pyfusion.VERBOSE > 0: print('Gain factor', sgn, self.config_name) tmp_data = None if self.no_cache else try_fetch_local(self, chan) # is there a local copy? if tmp_data is None: method = 'specific' try: self.setup() except exception as details: traceback.print_exc() raise LookupError("{inf}\n{details}".format(inf=self.error_info(step='setup'),details=details)) try: data = self.do_fetch() if not hasattr(data, 'params'): data.params = dict(comment='should really inherit all params from acq.fetch') data.params.update(dict(source=self.acq.acq_class)) except exception as details: # put None here to show exceptions. # then replace with Exception once # "error_info" is working well # this is to provide traceback from deep in a call stack # the normal traceback doesn't see past the base.py into whichever do_fetch # Avoid printing exception stuff unless we want a little verbosity if pyfusion.VERBOSE>0: # this simple method doesn't work, as it only has info after # getting to the prompt if hasattr(sys, "last_type"): traceback.print_last() else: print('sys has not recorded any exception - needs to be at prompt?') # this one DOES work. print(sys.exc_info()) (extype, ex, tb) = sys.exc_info() for tbk in traceback.extract_tb(tb): print("Line {0}: {1}, {2}".format(tbk[1],tbk[0],tbk[2:])) # get what info possible extra = str('shot={s}, diag={d}' .format(s=self.__dict__.get('shot','?'), d=self.__dict__.get('config_name','?'))) raise LookupError("{ex}: {inf}\n{details}{CLASS}" .format(ex=extra, inf=self.error_info(step='setup'), details=details,CLASS=details.__class__)) else: data = tmp_data method = 'local_npz' if hasattr(data, 'params') and 'DMD' in data.params: self_params = eval('dict('+self.params+')') if pyfusion.VERBOSE>0: print('Comparing params in base.py',self_params,'\n', data.params) if self_params['DMD'] != data.params['DMD']: # below was raise Exception but really should be replace with nans? print(('conflicting DMD from npz on {s}, {d}: {dmd1}, {dmd2}'. format(s=self.shot, d=self.config_name, dmd1=self_params['DMD'], dmd2=data.params['DMD']))) else: if pyfusion.VERBOSE>-1: print("Can't check DMD on {s}, {d}".format(s=self.shot, d=self.config_name)) data.meta.update({'shot':self.shot}) if hasattr(data,'comment'): # extract comment if there print('!data item {cn} already has comment {c}' .format(cn=data.config_name, c=data.comment)) data.comment = self.comment if hasattr(self, 'comment') else '' data.signal = sgn * data.signal if not hasattr(data,'utc'): data.utc = None # Coords shouldn't be fetched for BaseData (they are required # for TimeSeries) - who said this? # data.coords.load_from_config(**self.__dict__) if pyfusion.VERBOSE>0: print("base.py: data.config_name", data.config_name) data.channels.config_name=data.config_name if len(gain_units.split()) > 1: data.channels.units = gain_units.split()[-1] else: data.channels.units = data.units if hasattr(data,'units') else ' ' if method == 'specific': # don't pull down if we didn't setup self.pulldown() return data
[docs]class MultiChannelFetcher(BaseDataFetcher): """Fetch data from a diagnostic with multiple timeseries channels. This fetcher requres a multichannel configuration section such as:: [Diagnostic:H1_mirnov_array_1] data_fetcher = pyfusion.acquisition.base.MultiChannelFetcher channel_1 = H1_mirnov_array_1_coil_1 channel_2 = H1_mirnov_array_1_coil_2 channel_3 = H1_mirnov_array_1_coil_3 channel_4 = H1_mirnov_array_1_coil_4 The channel names must be `channel\_` followed by an integer, and the channel values must correspond to other configuration sections (for example ``[Diagnostic:H1_mirnov_array_1_coil_1]``, ``[Diagnostic:H1_mirnov_array_1_coil_1]``, etc) which each return a single channel instance of :py:class:`~pyfusion.data.timeseries.TimeseriesData`. """
[docs] def ordered_channel_names(self): """Get an ordered list of the channel names in the diagnostic :rtype: list """ channel_list = [] for k in self.__dict__.keys(): if k.startswith('channel_'): channel_list.append( [int(k.split('channel_')[1]), self.__dict__[k]] ) channel_list.sort() if len(channel_list) == 0: print('********* warning!! empty channel list - are there ay channel_N attributes? ') return [i[1] for i in channel_list]
[docs] def fetch(self): """Fetch each channel and combine into a multichannel instance of :py:class:`~pyfusion.data.timeseries.TimeseriesData`. :rtype: :py:class:`~pyfusion.data.timeseries.TimeseriesData` """ ## initially, assume only single channel signals # this base debug breakpoint will apply to all flavours of acquisition debug_(pyfusion.DEBUG, level=3, key='entry_base_multi_fetch') ordered_channel_names = self.ordered_channel_names() data_list = [] channels = ChannelList() # empty I guess common_tb = None # common_timebase meta_dict={} group_utc = None # assume no utc, will be replaced by channels # t_min, t_max seem obsolete, and should be done in single chan fetch if hasattr(self, 't_min') and hasattr(self, 't_max'): t_range = [float(self.t_min), float(self.t_max)] else: t_range = [] for chan in ordered_channel_names: sgn = 1 if chan[0]=='-': sgn = -sgn bare_chan = (chan.split('-'))[-1] ch_data = self.acq.getdata(self.shot, bare_chan) if len(t_range) == 2: ch_data = ch_data.reduce_time(t_range) channels.append(ch_data.channels) # two tricky things here - tmp.data.channels only gets one channel hhere # Config_name for a channel is attached to the multi part - # We need to move it to the particular channel # Was channels[-1].config_name = chan # 2013 - default to something if config_name not defined if pyfusion.VERBOSE>0: print("base:multi ch_data.config_name", ch_data.config_name) if hasattr(ch_data,'config_name'): channels[-1].config_name = ch_data.config_name else: channels[-1].config_name = 'fix_me' meta_dict.update(ch_data.meta) # tack on the data utc ch_data.signal.utc = ch_data.utc # Make a common timebase and do some basic checks. if common_tb is None: common_tb = ch_data.timebase if hasattr(ch_data,'utc'): group_utc = ch_data.utc tb_chan = ch_data.channels # for the first, append the whole signal data_list.append(sgn * ch_data.signal) else: if hasattr(self, 'skip_timebase_check') and self.skip_timebase_check == 'True': # append regardless, but will have to go back over # later to check length cf common_tb if pyfusion.VERBOSE > 0: print('utcs: ******',ch_data.utc[0],group_utc[0]) if ch_data.utc[0] != group_utc[0]: dts = (ch_data.utc[0] - group_utc[0])/1e9 print('*** different start times *****\n********trace {chcnf} starts after {tbch} by {dts:.2g} s' .format(tbch = tb_chan.config_name, chcnf = ch_data.config_name, dts=dts)) # should pad this channel out with nans - for now report an error. # this won't work if combining signals on a common_tb # ch_data.timebase += -dts # not sure if + or -? # print(ch_data.timebase[0]) """ kind of works for L53_LP0701 309 13, but time error dtclock = 2e-6 nsampsdiff = int(round(dts/dtclock,0)) newlen = len(ch_data.timebase) + nsampsdiff newsig = np.array(newlen * [np.nan]) newtb = np.array(newlen * [np.nan]) newsig[nsampsdiff:] = ch_data.signal newtb[nsampsdiff:] = ch_data.timebase ch_data.signal = newsig ch_data.timebase = newtb ch_data.timebase += -dts # not sure if + or -? """ if len(ch_data.signal)<len(common_tb): common_tb = ch_data.timebase tb_chan = ch_data.channels data_list.append(ch_data.signal[0:len(common_tb)]) else: try: assert_array_almost_equal(common_tb, ch_data.timebase) data_list.append(ch_data.signal) except: print('#### matching error in {c} - perhaps timebase not the same as the previous channel'.format(c=ch_data.config_name)) raise if hasattr(self, 'skip_timebase_check') and self.skip_timebase_check == 'True': # Messy - if we ignored timebase checks, then we have to check # length and cut to size, otherwise it will be stored as a signal (and # we will end up with a signal of signals) # This code may look unpythonic, but it avoids a copy() # and may be faster than for sig in data_list.... ltb = len(common_tb) for i in range(len(data_list)): if len(data_list[i]) > ltb: # this is a replacement. data_list.insert(i,data_list.pop(i)[0:ltb]) signal = Signal(data_list) print(shape(signal)) output_data = TimeseriesData(signal=signal, timebase=common_tb, channels=channels) #output_data.meta.update({'shot':self.shot}) output_data.meta.update(meta_dict) output_data.comment = self.comment if hasattr(self, 'comment') else '' print(output_data.comment) #if not hasattr(output_data,'utc'): # output_data.utc = None # probably should try to get it # # from a channel - but how? output_data.utc = group_utc # should check that all channels have same utc debug_(pyfusion.DEBUG, level=2, key='return_base_multi_fetch') return output_data