Source code for HDF5_BLS_treat.treat

import numpy as np
from scipy import optimize
import json
import inspect

from HDF5_BLS_treat import Treat_backend, Models, TreatmentError

Treat_version = 0.1

[docs] class Treat(Treat_backend): """This class is a class inherited from the Treat_backend class used to define functions to treat the data. Each function is meant to perform the minimum of operation so as to give the user a total control over the treatment. """
[docs] def __init__(self, frequency: np.ndarray, PSD: np.ndarray): super().__init__(frequency = frequency, PSD = PSD) self._algorithm = { "name": "Brillouin treatment", "version": "0.0", "author": "None", "description": "A blank algorithm for treating BLS spectra.", "functions": [] } self._history = [] # Initializing main attributes self.frequency = frequency self.PSD = PSD # Initializing treatment applicator selection attributes self._treat_selection = "sampled" # Delete unused dimensions new_shape = [] for s in self.frequency.shape: if s > 1: new_shape.append(s) self.frequency = self.frequency.reshape(new_shape) new_shape = [] for s in self.PSD.shape: if s > 1: new_shape.append(s) # Initializing array sample attributes WARNING: ONLY 1D frequency arrays are supported for now if len(self.frequency.shape) == 1: self.frequency_sample = self.frequency self.PSD_sample = self.PSD while len(self.PSD_sample.shape) > 1: self.PSD_sample = np.average(self.PSD_sample, axis = 0) else: raise ValueError(f"Only 1D frequency arrays are supported for now. The given frequency array has the following dimension: {self.frequency.shape}.") # Store the base arrays self._history_base = {"frequency_sample": self.frequency_sample, "PSD_sample": self.PSD_sample}
# Point and window definition
[docs] def add_point(self, position_center_window: float = None, window_width: float = None, type_pnt: str = "Other"): """ Adds a single point to the list of points together with a window to the list of windows with its type. Each point is an intensity extremum obtained by fitting a quadratic polynomial to the windowed data. The point is given as a value on the frequency axis (not a position on this axis). The "position_center_window" parameter is the center of the window surrounding the peak. The "window_width" parameter is the width of the window surrounding the peak (full width). The "type_pnt" parameter is the type of the peak. It can be either "Stokes", "Anti-Stokes", "Elastic" or "Other". Parameters ---------- position_center_window : float A value on the self.x axis corresponding to the center of a window surrounding a peak window_width : float or 2-tuple of float A value on the self.x axis corresponding to the width of a window surrounding a peak. If a tuple is given, the first element is the lower bound of the window and the second element is the upper bound given in absolute value from center point. type_pnt : str The nature of the peak. Must be one of the following: "Other","Stokes", "Anti-Stokes" or "Elastic" """ def refine_peak_position(frequency, PSD, window): """Refines the position of a peak based on a window surrounding it. The refining is done by finding the maximum of the windowed data, interpolating the gradient around this maximum, and then finding the position of the maximum based on a zero gradient. Parameters ---------- window : list The window surrounding the peak. The format is [start, end]. """ # Extract the windowed abscissa and ordinate arrays pos = np.where((frequency >= window[0]) & (frequency <= window[1])) wndw_x = frequency[pos] wndw_y = PSD[pos] # select the window of 10 points around the peak pos_peak0 = np.argmax(wndw_y) loc_wndw = np.arange(max(0, pos_peak0-5), min(len(wndw_y), pos_peak0+5)) # Fit a quadratic polynomial to the windowed data and returns the local extremum params = np.polyfit(wndw_x[loc_wndw], wndw_y[loc_wndw], 2) new_x = -params[1]/(2*params[0]) return new_x # Base case: if any of the parameters is None, return if position_center_window is None or window_width is None or type_pnt is None: return # Check that the type of the point is correct if type_pnt not in ["Other", "Stokes", "Anti-Stokes", "Elastic"]: raise ValueError("The type of the point must be one of the following: 'Stokes', 'Anti-Stokes' or 'Elastic'") # Check that the window is in the range of the data if not window_width is None: if type(window_width) in [float, int]: window = [position_center_window-window_width/2, position_center_window+window_width/2] if window[1]<self.frequency_sample[0] or window[0]>self.frequency_sample[-1]: raise ValueError(f"The window {window} is out of the range of the data") else: window = [position_center_window - window_width[0], position_center_window + window_width[1]] if window[1]<self.frequency_sample[0] or window[0]>self.frequency_sample[-1]: raise ValueError(f"The window {window} is out of the range of the data") # Ensure that the window is within the range of the data window[0] = max(self.frequency_sample[0], window[0]) window[1] = min(self.frequency_sample[-1], window[1]) # Refine the position of the peaks new_x = refine_peak_position(frequency = self.frequency_sample, PSD = self.PSD_sample, window = window) else: new_x = position_center_window # Goes through the list of points to get the number of occurences of peaks of same type i = 0 for elt in self.points: nme = elt[0].split("_")[0] if type_pnt == nme: i+=1 # Add the point to the list of points and the list of windows self._save_history = False self.points.append([f"{type_pnt}_{i}", new_x]) # Add the window to the list of windows self.windows.append([window[0], window[1]])
[docs] def add_window(self, position_center_window: float = None, window_width: float = None): """ Adds a single window to the list of windows together with the central point (with type "Window") to the list of windows. The positions are given as values on the frequency axis (not a position). The "position_center_window" parameter is the center of the window surrounding the peak. The "window_width" parameter is the width of the window surrounding the peak (full width). Parameters ---------- position_center_window : float A value on the self.x axis corresponding to the center of a window surrounding a peak window : float A value on the self.x axis corresponding to the width of a window surrounding a peak """ # Base case: if any of the parameters is None, return if position_center_window is None or window_width is None: return # Check that the window is in the range of the data window = [position_center_window-window_width/2, position_center_window+window_width/2] if window[1]<self.frequency_sample[0] or window[0]>self.frequency_sample[-1]: raise ValueError(f"The window {window} is out of the range of the data") # Ensure that the window is within the range of the data window[0] = max(self.frequency_sample[0], window[0]) window[1] = min(self.frequency_sample[-1], window[1]) # Set the central point of the window new_x = (window[0] + window[1])/2 # Goes through the list of points to get the number of occurences of peaks of same type i = 0 for elt in self.points: nme = elt[0].split("_")[0] if nme == "Window": i+=1 # Add the point to the list of points and the list of windows self._save_history = False self.points.append([f"Window_{i}", new_x]) # Add the window to the list of windows self.windows.append([window[0], window[1]])
# Pre-treatment functions
[docs] def normalize_data(self, threshold_noise : float = 0.01): """ Normalizes the data by identifying the regions corresponding to noise, substracting the average of these regions from the data, and dividing by the average of the amplitude of all peaks that are not of type elastic. Parameters ---------- threshold_noise : float, optional The threshold for identifying noise regions. This value is the highest possible value for noise relative to the average intensity of the selected peaks, on the PSD when the minimal value of the PSD is brought to 0. Default is 1% Returns ------- None """ # Subtract the lowest value of intensity to PSD_sample self.PSD_sample = self.PSD_sample - np.min(self.PSD_sample) # Extract the peaks that are not elastic or not windows peaks = [p[1] for p in self.points if p[0][0] not in ["E", "W"]] # Calculate the average intensity of the peaks position_peaks = np.array([np.argmin(np.abs(self.frequency_sample - p)) for p in peaks]) average_intensity_peaks = np.average(self.PSD_sample[position_peaks]) # Get all regions corresponding to noise (i.e. regions with intensity below 10% of the average intensity of the peaks) noise_regions = [] for i in range(len(self.PSD_sample)): if self.PSD_sample[i] < average_intensity_peaks * threshold_noise: noise_regions.append(i) # Get average noise value average_noise = np.average(self.PSD_sample[noise_regions]) # Normalize the data self.PSD_sample = self.PSD_sample - average_noise self.PSD_sample = self.PSD_sample / average_intensity_peaks # Clear the points and windows stored in the class self.silent_clear_points() return self.PSD_sample
# Estimation functions
[docs] def estimate_width_inelastic_peaks(self, max_width_guess : float = 2): """ Estimates the full width at half maximum of the inelastic peaks stored in self.points. Note that the data is supposed to have a zero offset. The estimation is done by finding the samples of the data closes to half of the peak height. Parameters ---------- max_width_guess : float, optional The higher limit to the estimation of the full width. Default is 2 GHz. """ # Reinitiate the width estimator self.width_estimator = [] # Extract the points corresponding either to Stokes or Anti-Stokes peaks for i in range(len(self.points)): if self.points[i][0].split("_")[0] in ["Anti-Stokes", "Stokes"]: p = self.points[i][1] # Extract the peak position pos_peak = np.argmin(np.abs(self.frequency_sample - p)) # Guess the width of the peak by finding the points at half the height pos_half = pos_peak while pos_half>0: if self.PSD_sample[pos_half] > self.PSD_sample[pos_peak]/2: pos_half = pos_half-1 else: break gamma = self.frequency_sample[pos_half] pos_half = pos_peak while pos_half<len(self.frequency_sample)-1: if self.PSD_sample[pos_half] > self.PSD_sample[pos_peak]/2: pos_half = pos_half+1 else: break gamma = min(max_width_guess, self.frequency_sample[pos_half] - gamma) self.width_estimator.append(gamma) else: self.width_estimator.append(0)
# Fitting model definition
[docs] def define_model(self, model: str = "Lorentzian", elastic_correction: bool = False): """Defines the model to be used to fit the data. Parameters ---------- model : str, optional The model to be used. The models should match the names of the attribute "models" of the class Models, by default "Lorentzian" elastic_correction : bool, optional Whether to correct for the presence of an elastic peak by setting adding a linear function to the model, by default False """ if elastic_correction: model = model + " elastic" # Try selecting the model, raise an error if the model is not found Model = Models() if model not in Model.models.keys(): raise ValueError(f"The model {model} is not recognized.") self.fit_model = model
# Algorithm application functions
[docs] def apply_algorithm_on_all(self): """ Takes all the steps of the algorithm up to the moment this function is called and applies the steps to each individual spectrum in the dataset. This function updates the global attributes of the class concerning the shift, the linewidth and the amplitude together with their variance, taking into account error propagation. If a spectrum could not be fitted, its value is set to 0 in the global attributes. All the points where the spectra could not be fitted are marked with the "fit_error_marker" parameter in the global attributes (shift, linewidth, amplitude, shift_var, linewidth_var, amplitude_var) and their coordinates are stored in the "point_error" list. The "point_error_type" attribute is also updated with the type of error returned by the fit function (see scipy.optimize.curve_fit documentation). The function returns the number of spectra that could not be fitted. """ def plus_one(shape, max_shape, dim): if dim == -1: return None if shape[dim] == max_shape[dim]-1: shape[dim] = 0 return plus_one(shape, max_shape, dim-1) else: shape[dim] += 1 return shape def initialize(): # Initialize the list of points that could not be fitted self.point_error = [] self.point_error_type = [] self.point_error_value = [] # Initialize the list of fitted points dim = list(self.PSD.shape[:-1])+[len(self.shift_sample)] self.shift = np.zeros(dim).astype(float) self.offset = np.zeros(dim).astype(float) self.linewidth = np.zeros(dim).astype(float) self.shift_var = np.zeros(dim).astype(float) self.linewidth_var = np.zeros(dim).astype(float) self.amplitude = np.zeros(dim).astype(float) self.amplitude_var = np.zeros(dim).astype(float) # Initialize the index of the selected spectrum return np.zeros(len(self.PSD.shape[:-1])).astype(int) # Initialize the list of fitted points, error point and index of selected spectrum PSD_i = initialize() # Set the treat selection to "all" self._treat_selection = "all" # Remove the last step of the algorithm (corresponding to this function) temp_algorithm = self._algorithm["functions"][-1] self._algorithm["functions"] = self._algorithm["functions"][:-1] # Sets the frequency array to the main frequency array (assuming 1D frequency array) self.frequency_sample = self.frequency # Initialize the attributes for the progress callback count = 0 total = np.prod(self.PSD.shape[:-1]) # Iterate on each spectrum of the PSD array while PSD_i is not None: # Assign the current PSD and frequency arrays to the corresponding variables self.PSD_sample = self.PSD[tuple(PSD_i)] # Run the algorithm on the current PSD and frequency arrays self.silent_run_algorithm() self.offset[tuple(PSD_i)] = self.offset_sample self.shift[tuple(PSD_i)] = self.shift_sample self.linewidth[tuple(PSD_i)] = self.linewidth_sample self.shift_var[tuple(PSD_i)] = self.shift_err_sample self.linewidth_var[tuple(PSD_i)] = self.linewidth_err_sample self.amplitude[tuple(PSD_i)] = self.amplitude_sample self.amplitude_var[tuple(PSD_i)] = self.amplitude_err_sample if np.all(np.isnan(self.shift[tuple(PSD_i)])): self.point_error.append(PSD_i.copy()) self.point_error_type.append("fit_error") self.point_error_value.append(np.nan) PSD_i = plus_one(PSD_i, self.PSD.shape[:-1], len(PSD_i)-1) if self._progress_callback is not None: count += 1 self._progress_callback(count, total) self.BLT = self.linewidth/self.shift self.BLT_var = self.BLT**2 * ((self.shift_var/self.shift)**2 + (self.linewidth_var/self.linewidth)**2) self._algorithm["functions"].append(temp_algorithm)
[docs] def adjust_treatment_on_errors(self, position = None, new_parameters = None): """ Reapplies the treatment on the point error located at the position "position" with the new parameters "new_parameters". Parameters ---------- position : list, optional The position of the point error to be adjusted. Default is None. new_parameters : list of dictionnaries, optional The list of new parameters to be applied to re-run the treatment on the errors. Each element is either None (if we don't change the parameters) or a dictionnary of the parameters to be passed to the function. Default is None, means that all the parameters used earlier are used. """ def extract_initial_algorithm(): """Extracts the functions that are applied before the apply_algorithm_on_all function. Returns ------- 3 dictionaries The algorithm that has been applied to all the data, the algorithm that has been used to combine the data and the algorithm used to mark errors """ algorithm = {"functions": []} combine_algotihm = {"functions": []} mark_errors_algorithm = {"functions": []} passed_apply_on_all = False # We go through the functions that are applied for f in self._algorithm["functions"]: # If we are not applying the algorithm to all the data if not f["function"] == "apply_algorithm_on_all": # If the function is before the apply_algorithm_on_all function, we add it to the algorithm if not passed_apply_on_all: algorithm["functions"].append(f) # If it's after but before the adjust_treatment_on_errors function, we add it to the mark_errors as it will be used to mark errors else: if f["function"] == "combine_results_FSR": combine_algotihm["functions"].append(f) elif f["function"] == "adjust_treatment_on_errors": break else: mark_errors_algorithm["functions"].append(f) else: passed_apply_on_all = True return algorithm, combine_algotihm, mark_errors_algorithm def set_position(algorithm, position): for f in algorithm["functions"]: if f["function"] == "combine_results_FSR": f["parameters"]["position"] = position return algorithm # Extract the algorithm that was used to initially treat the data, the one used to mark the errors and the parameters used to store the extracted values algorithm, combine_algorithm, mark_errors_algorithm = extract_initial_algorithm() # Update the parameters with the new values. If new value is None, keep as is, if False, don't add step, else update with the provided new values new_algorithm = {"functions": []} if not new_parameters is None: for i in range(len(algorithm["functions"])): if new_parameters[i] is None: new_algorithm["functions"].append(algorithm["functions"][i]) elif new_parameters[i] == False: continue else: for k, v in new_parameters[i].items(): if k in algorithm["functions"][i]["parameters"].keys(): algorithm["functions"][i]["parameters"][k] = v new_algorithm["functions"].append(algorithm["functions"][i]) # If no position are specified, we apply the algorithm on all the points that had errors if position is None: position = self.point_error.copy() # Sets the _treat_selection attribute to "errors". This makes sure that the algorithm doesn't store the results in _history (reduces the memory usage and time complexity) self._treat_selection = "errors" # Initialize the list of error points self.point_error = [] self.point_error_type = [] self.point_error_value = [] # Initialize the callback for the progress bar count = 0 total = len(position) # Apply the algorithm on either the provided positions or all the points that had errors for PSD_i in position: self.PSD_sample = self.PSD[tuple(PSD_i)] self.silent_run_algorithm(algorithm = new_algorithm) combine_algorithm = set_position(algorithm = combine_algorithm, position = PSD_i) self.silent_run_algorithm(algorithm = combine_algorithm) if self._progress_callback is not None: count += 1 self._progress_callback(count, total) # And we mark the errors again. self.silent_run_algorithm(algorithm = mark_errors_algorithm)
# Fitting functions
[docs] def single_fit_all_inelastic(self, default_width: float = 1, guess_offset: bool = False, update_point_position: bool = True, bound_shift: list = None, bound_linewidth: list = None): """ Fits each inelastically scattered peak individually. The linewidth can be estimated beforehand using the function estimate_width_inelastic_peaks. If not estimated, a fixed width is used (default_width). The offset can also be guessed or not (guess_offset). In the case the offset is guessed, the minimum of the data on the selected window is used as an initial guess. When applying the fit to data acquired successively, it might be interesting to update the initial position of the peak by selecting the last fitted position. This can be done by setting update_point_position to True. Parameters ---------- default_width : float, optional If the width has not been estimated, the default width to use, by default 1 GHz guess_offset : bool, optional If True, the offset is guessed based on the minimum of the data on the selected window. If false, the data is supposed to be normalized and the offset guess is set to 0, by default False update_point_position : bool, optional If True, the position of the peak is updated based on the fitted shift. If False, the position of the peak is not updated, by default True bound_shift : list, optional The lower and upper bounds of the shift, by default None means no restrictions are applied bound_linewidth : list, optional The lower and upper bounds of the linewidth, by default None means no restrictions are applied """ def reinitialize_fitted_list(): self.shift_sample = [] self.shift_err_sample = [] self.linewidth_sample = [] self.linewidth_err_sample = [] self.amplitude_sample = [] self.amplitude_err_sample = [] self.offset_sample = [] self.slope_sample = [] def extract_point_window_gammaGuess(): """Extracts all the inelastic peaks from the list of points and their corresponding windows. Also extracts the guess for the width of the peaks or if it is not defined, uses the default width. Returns ------- 3-tuple of lists peaks: the list of peaks windows: the list of windows around the peaks guess_gamma: the list of guess for the width of the peaks """ peaks, windows, guess_gamma = [], [], [] for i in range(len(self.points)): # If the selected peak is an inelastic peak, extract the peak position and the window around it if self.points[i][0].split("_")[0] in ["Anti-Stokes", "Stokes"]: peaks.append(self.points[i][1]) windows.append(self.windows[i]) # If the width has been estimated, use it, otherwise use the default width if len(self.width_estimator) > 0: guess_gamma.append(self.width_estimator[i]) else: guess_gamma.append(default_width) return peaks, windows, guess_gamma def update_results(popt = None, pcov = None, all_nan=False): """ Stores the results of the fit in the corresponding sample lists. If all_nan is True, the fitted parameters are set to NaN. Parameters ---------- popt : list, optional The fitted parameters, by default None pcov : list, optional The covariance matrix of the fit, by default None all_nan : bool, optional Wether to set all the fitted parameters to NaN, by default False """ # If the all_nan is set to True, set all the fitted parameters to NaN if all_nan: popt = [np.nan for i in range(5)] pcov = [[np.nan for i in range(5)] for i in range(5)] # Save the fitted parameters in the corresponding sample lists self.offset_sample.append(popt[0]) self.amplitude_sample.append(popt[1]) self.shift_sample.append(popt[2]) self.linewidth_sample.append(np.abs(popt[3])) if "elastic" in self.fit_model: self.slope_sample.append(popt[4]) # Save the standard deviation of the fitted parameters in the corresponding sample lists self.amplitude_err_sample.append(np.sqrt(pcov[1][1])) self.shift_err_sample.append(np.sqrt(pcov[2][2])) self.linewidth_err_sample.append(np.sqrt(pcov[3][3])) # Initializes the list of fitted parameters reinitialize_fitted_list() # Raise an error if the model has not been defined if self.fit_model is None: raise ValueError("The model has not been defined. Please use the function 'define_model' to define the model before calling 'fit_all_inelastic_of_curve'.") # Extract the points to fit, select only the ones of type Stokes or Anti-Stokes. Also extract the guess for the width of the peaks or if it is not defined, use the default width peaks, windows, guess_gamma = extract_point_window_gammaGuess() # Fit each peak that has been selected for peak, window, gamma, bs, bl in zip(peaks, windows, guess_gamma, bound_shift, bound_linewidth): # Extract the peak position and the window around the peak pos_peak = np.argmin(np.abs(self.frequency_sample - peak)) pos_window = np.where((self.frequency_sample >= window[0]) & (self.frequency_sample <= window[1])) # Guess the amplitude of the peak by selecting its intensity amplitude_guess = self.PSD_sample[pos_peak] # Set the offset guess to the minimum of the intensity array on the selected window or to 0 if guess_offset: offset_guess = np.min(self.PSD_sample[pos_window]) else: offset_guess = 0 # Check if we want to correct for the presence of an elastic peak and apply the fitting models = Models() if "elastic" in self.fit_model: # Define the bounds for the fit (ensure the linewidth is positive) bounds = [[-np.inf, -np.inf, -np.inf, 0, -np.inf], [np.inf, np.inf, np.inf, np.inf, np.inf]] # If bounds are provided for the shift or the linewidth , update them accordingly if bound_shift is not None: bounds[0][2] = bs[0] bounds[1][2] = bs[1] peak = min(max(peak, bounds[0][2]), bounds[1][2]) # Ensure the peak is within the bounds if bound_linewidth is not None: bounds[0][3] = bl[0] bounds[1][3] = bl[1] gamma = min(max(gamma, bounds[0][3]), bounds[1][3]) # Ensure the gamma is within the bounds # Estimate the slope from the first and last points of the window slope_guess = (self.PSD_sample[pos_window[0][-1]] - self.PSD_sample[pos_window[0][0]])/(self.frequency_sample[pos_window[0][-1]] - self.frequency_sample[pos_window[0][0]]) # Adjust the offset accordingly if self.PSD_sample[pos_window[0][0]] < self.PSD_sample[pos_window[0][-1]]: offset_guess = offset_guess - slope_guess*self.frequency_sample[pos_window[0][0]] else: offset_guess = offset_guess - slope_guess*self.frequency_sample[pos_window[0][-1]] # Adjust the amplitude accordingly amplitude_guess = amplitude_guess - offset_guess - slope_guess * (self.frequency_sample[pos_peak]) error_fit = False try: popt, pcov = optimize.curve_fit(f = models.models[self.fit_model], xdata = self.frequency_sample[pos_window], ydata = self.PSD_sample[pos_window], p0 = [offset_guess, amplitude_guess, peak, gamma, slope_guess], bounds = bounds) except Exception as e: print(e) error_fit = True else: # Define the bounds for the fit (ensure the linewidth and amplitude are positive) bounds = [[-np.inf, 0, -np.inf, 0], [np.inf, np.inf, np.inf, np.inf]] # If bounds are provided for the shift or the linewidth , update them accordingly if bound_shift is not None: bounds[0][2] = bs[0] bounds[1][2] = bs[1] peak = min(max(peak, bounds[0][2]), bounds[1][2]) # Ensure the peak is within the bounds if bound_linewidth is not None: bounds[0][3] = bl[0] bounds[1][3] = bl[1] gamma = min(max(gamma, bounds[0][3]), bounds[1][3]) # Ensure the gamma is within the bounds error_fit = False try: popt, pcov = optimize.curve_fit(f = models.models[self.fit_model], xdata = self.frequency_sample[pos_window], ydata = self.PSD_sample[pos_window], p0 = [offset_guess, amplitude_guess, peak, gamma], bounds = bounds) except Exception as e: print(e) error_fit = True # If the fit succeeded, update the parameters, if not store np.nan if not error_fit: update_results(popt, pcov) else: update_results(all_nan = True) # If the user want to update the peak position with the fitted shift, update the point positions if update_point_position: for i, elt in enumerate(self.points): if elt[1] == peak: index = i pos_peak = self.shift_sample[-1] self.points[index][1] = pos_peak break
[docs] def multi_fit_all_inelastic(self, default_width: float = 1, guess_offset: bool = False, update_point_position: bool = True, bound_shift: list = None, bound_linewidth: list = None): """ Fits all inelastically scattered peak as a single curve. The linewidth of the individual peaks can be estimated beforehand using the function estimate_width_inelastic_peaks. If not estimated, a fixed width is used (default_width). The offset can also be guessed or not (guess_offset). In the case the offset is guessed, the minimum of the data on the window defined as the combination of all the peaks windoes is used as an initial guess. When applying the fit to data acquired successively, it might be interesting to update the initial position of the peak by selecting the last fitted position. This can be done by setting update_point_position to True. Parameters ---------- default_width : float, optional If the width has not been estimated, the default width to use, by default 1 GHz guess_offset : bool, optional If True, the offset is guessed based on the minimum of the data on the selected window. If false, the data is supposed to be normalized and the offset guess is set to 0, by default False update_point_position : bool, optional If True, the position of the peak is updated based on the fitted shift. If False, the position of the peak is not updated, by default True bound_shift : list, optional The lower and upper bounds of the shift, by default None means no restrictions are applied bound_linewidth : list, optional The lower and upper bounds of the linewidth, by default None means no restrictions are applied """ def reinitialize_fitted_list(): self.shift_sample = [] self.shift_err_sample = [] self.linewidth_sample = [] self.linewidth_err_sample = [] self.amplitude_sample = [] self.amplitude_err_sample = [] self.offset_sample = [] self.slope_sample = [] def extract_point_window_gammaGuess(): """Extracts all the inelastic peaks from the list of points and their corresponding windows. Also extracts the guess for the width of the peaks or if it is not defined, uses the default width. Returns ------- 3-tuple of lists peaks: the list of peaks windows: the list of windows around the peaks guess_gamma: the list of guess for the width of the peaks """ peaks, windows, guess_gamma = [], [], [] for i in range(len(self.points)): # If the selected peak is an inelastic peak, extract the peak position and the window around it if self.points[i][0].split("_")[0] in ["Anti-Stokes", "Stokes"]: peaks.append(self.points[i][1]) windows.append(self.windows[i]) # If the width has been estimated, use it, otherwise use the default width if len(self.width_estimator) > 0: guess_gamma.append(self.width_estimator[i]) else: guess_gamma.append(default_width) return peaks, windows, guess_gamma def update_results(popt = None, pcov = None, all_nan=False): """ Stores the results of the fit in the corresponding sample lists. If all_nan is True, the fitted parameters are set to NaN. Parameters ---------- popt : list, optional The fitted parameters, by default None pcov : list, optional The covariance matrix of the fit, by default None all_nan : bool, optional Wether to set all the fitted parameters to NaN, by default False """ # If the all_nan is set to True, set all the fitted parameters to NaN if all_nan: popt = np.array([np.nan for i in range(len(popt))]) pcov = np.array([[np.nan for i in range(len(popt))] for i in range(len(popt))]) popt = popt.reshape((-1,4)) std = np.sqrt(np.diag(pcov)).reshape((-1,4)) # Save the fitted parameters in the corresponding sample lists self.offset_sample = popt[:, 0].tolist() self.amplitude_sample = popt[:, 1].tolist() self.shift_sample = popt[:, 2].tolist() self.linewidth_sample = np.abs(popt[:, 3]).tolist() # Save the standard deviation of the fitted parameters in the corresponding sample lists self.amplitude_err_sample = std[:, 1].tolist() self.shift_err_sample = std[:, 2].tolist() self.linewidth_err_sample = std[:, 3].tolist() # Initializes the list of fitted parameters reinitialize_fitted_list() # Raise an error if the model has not been defined if self.fit_model is None: raise ValueError("The model has not been defined. Please use the function 'define_model' to define the model before calling 'fit_all_inelastic_of_curve'.") # Extract the points to fit, select only the ones of type Stokes or Anti-Stokes. Also extract the guess for the width of the peaks or if it is not defined, use the default width peaks, windows, guess_gamma = extract_point_window_gammaGuess() # Initializes the initial conditions and boundary conditions for the fit p0 = [] bounds = [[], []] # Initializes the window for the fit wndw_fit = np.array([]) # Fit each peak that has been selected for peak, window, gamma, bs, bl in zip(peaks, windows, guess_gamma, bound_shift, bound_linewidth): # Extract the peak position and the window around the peak pos_peak = np.argmin(np.abs(self.frequency_sample - peak)) pos_window = np.where((self.frequency_sample >= window[0]) & (self.frequency_sample <= window[1])) wndw_fit = np.append(wndw_fit, pos_window) # Guess the amplitude of the peak by selecting its intensity amplitude_guess = self.PSD_sample[pos_peak] # Set the offset guess to the minimum of the intensity array on the selected window or to 0 if guess_offset: offset_guess = np.min(self.PSD_sample[pos_window]) else: offset_guess = 0 # Check if we want to correct for the presence of an elastic peak and apply the fitting models = Models() # Check if the model is elastic and if so, raise an error if the fit_model is not compatible with the elastic correction if "elastic" in self.fit_model: raise ValueError("The multi-fit for inelastic peaks does not support the elastic correction. Please use the single fit for inelastic peaks instead.") # Define the bounds for the fit (ensure the linewidth and amplitude are positive) temp_bounds = [[-np.inf, 0, -np.inf, 0], [np.inf, np.inf, np.inf, np.inf]] # If bounds are provided for the shift or the linewidth , update them accordingly if bound_shift is not None: temp_bounds[0][2] = bs[0] temp_bounds[1][2] = bs[1] peak = min(max(peak, bs[0]), bs[1]) # Ensure the peak is within the bounds if bound_linewidth is not None: temp_bounds[0][3] = bl[0] temp_bounds[1][3] = bl[1] gamma = min(max(gamma, bl[0]), bl[1]) # Ensure the gamma is within the bounds # Append the initial conditions to the list of initial conditions if len(p0) == 0: # If this is the first peak, we initialize the initial conditions p0 += [offset_guess, amplitude_guess, peak, gamma] else: # Ensure that only one constant parameter is used for all the curve temp_bounds[0][0] = -1e-10 temp_bounds[1][0] = 1e-10 p0 += [0, amplitude_guess, peak, gamma] # Append the bounds to the list of bounds bounds[0] += temp_bounds[0] bounds[1] += temp_bounds[1] def func(x, *p0): """ The function to fit the data. It sums the contributions of all the peaks defined in p0. Parameters ---------- x : array-like The frequency array to fit p0 : list The list of initial conditions for each peak Returns ------- array-like The sum of the contributions of all the peaks at the given frequencies """ p0 = np.array(p0).reshape((-1, 4)) # Sum the contributions of all the peaks defined in p0 return sum(models.models[self.fit_model](x, *params) for params in p0) wndw_fit = np.unique(wndw_fit.astype(int)) error_fit = False try: popt, pcov = optimize.curve_fit(f = func, xdata = self.frequency_sample[wndw_fit], ydata = self.PSD_sample[wndw_fit], p0 = p0, bounds = bounds) except Exception as e: print(e) error_fit = True # If the fit succeeded, update the parameters, if not store np.nan if not error_fit: update_results(popt, pcov) else: p0 = np.array(p0).flatten() update_results(p0, np.zeros((len(p0), len(p0))), all_nan = True) # If the user want to update the peak position with the fitted shift, update the point positions if update_point_position: for i, elt in enumerate(self.points): if elt[1] == peak: index = i pos_peak = self.shift_sample[-1] self.points[index][1] = pos_peak break
[docs] def fit_all_inelastic_of_curve(self, default_width: float = 1, guess_offset: bool = False, update_point_position: bool = True, bound_shift: list = None, bound_linewidth: list = None): """ Fits a lineshape to each of the inelastic peaks using the points stored as Stokes or Anti-Stokes peaks in the points attribute. The linewidth can be estimated beforehand using the function estimate_width_inelastic_peaks. If not estimated, a fixed width is used (default_width). The offset can also be guessed or not (guess_offset). In the case the offset is guessed, the minimum of the data on the selected window is used as an initial guess. The position of the peak can also be updated based on the fitted shift if update_point_position is set to True. If set to False, the positions are not updated. Parameters ---------- default_width : float, optional If the width has not been estimated, the default width to use, by default 1 GHz guess_offset : bool, optional If True, the offset is guessed based on the minimum of the data on the selected window. If false, the data is supposed to be normalized and the offset guess is set to 0, by default False update_point_position : bool, optional If True, the position of the peak is updated based on the fitted shift. If False, the position of the peak is not updated, by default True bound_shift : list, optional The lower and upper bounds of the shift, by default None means no restrictions are applied bound_linewidth : list, optional The lower and upper bounds of the linewidth, by default None means no restrictions are applied """ def reinitialize_fitted_list(): self.shift_sample = [] self.shift_err_sample = [] self.linewidth_sample = [] self.linewidth_err_sample = [] self.amplitude_sample = [] self.amplitude_err_sample = [] self.offset_sample = [] self.slope_sample = [] def extract_point_window_gammaGuess(): peaks, windows, guess_gamma = [], [], [] for i in range(len(self.points)): if self.points[i][0].split("_")[0] in ["Anti-Stokes", "Stokes"]: peaks.append(self.points[i][1]) windows.append(self.windows[i]) if len(self.width_estimator) > 0: guess_gamma.append(self.width_estimator[i]) else: guess_gamma.append(default_width) return peaks, windows, guess_gamma def update_results(popt = None, pcov = None, all_nan=False): if all_nan: popt = [np.nan for i in range(5)] pcov = [[np.nan for i in range(5)] for i in range(5)] self.offset_sample.append(popt[0]) self.amplitude_sample.append(popt[1]) self.shift_sample.append(popt[2]) self.linewidth_sample.append(np.abs(popt[3])) if "elastic" in self.fit_model: self.slope_sample.append(popt[4]) self.amplitude_err_sample.append(np.sqrt(pcov[1][1])) self.shift_err_sample.append(np.sqrt(pcov[2][2])) self.linewidth_err_sample.append(np.sqrt(pcov[3][3])) # Initializes the list of fitted parameters reinitialize_fitted_list() # Raise an error if the model has not been defined if self.fit_model is None: raise ValueError("The model has not been defined. Please use the function 'define_model' to define the model before calling 'fit_all_inelastic_of_curve'.") # Extract the points to fit, select only the ones of type Stokes or Anti-Stokes. Also extract the guess for the width of the peaks or if it is not defined, use the default width peaks, windows, guess_gamma = extract_point_window_gammaGuess() # Fit each peak that has been selected for peak, window, gamma in zip(peaks, windows, guess_gamma): # Extract the peak position and the window around the peak pos_peak = np.argmin(np.abs(self.frequency_sample - peak)) pos_window = np.where((self.frequency_sample >= window[0]) & (self.frequency_sample <= window[1])) # Guess the amplitude of the peak by selecting its intensity amplitude_guess = self.PSD_sample[pos_peak] # Set the offset guess to the minimum of the intensity array on the selected window or to 0 if guess_offset: offset_guess = np.min(self.PSD_sample[pos_window]) else: offset_guess = 0 # Check if we want to correct for the presence of an elastic peak and apply the fitting models = Models() if "elastic" in self.fit_model: # Estimate the slope from the first and last points of the window slope_guess = (self.PSD_sample[pos_window[0][-1]] - self.PSD_sample[pos_window[0][0]])/(self.frequency_sample[pos_window[0][-1]] - self.frequency_sample[pos_window[0][0]]) # Adjust the offset accordingly if self.PSD_sample[pos_window[0][0]] < self.PSD_sample[pos_window[0][-1]]: offset_guess = offset_guess - slope_guess*self.frequency_sample[pos_window[0][0]] else: offset_guess = offset_guess - slope_guess*self.frequency_sample[pos_window[0][-1]] # Adjust the amplitude accordingly amplitude_guess = amplitude_guess - offset_guess - slope_guess * (self.frequency_sample[pos_peak]) error_fit = False try: popt, pcov = optimize.curve_fit(models.models[self.fit_model], self.frequency_sample[pos_window], self.PSD_sample[pos_window], p0=[offset_guess, amplitude_guess, peak, gamma, slope_guess]) except: error_fit = True else: error_fit = False try: popt, pcov = optimize.curve_fit(models.models[self.fit_model], self.frequency_sample[pos_window], self.PSD_sample[pos_window], p0=[offset_guess, amplitude_guess, peak, gamma]) except: error_fit = True # If the fit succeeded, check that the bounds are not violated and update the parameters, if not store np.nan if not error_fit: if bound_shift is not None: if abs(popt[2]) < bound_shift[0] or abs(popt[2]) > bound_shift[1]: error_fit = True if bound_linewidth is not None: if popt[3] < bound_linewidth[0] or popt[3] > bound_linewidth[1]: error_fit = True update_results(popt, pcov) else: update_results(all_nan = True) # Update the points with the fitted shift of the peak. if update_point_position: for i, elt in enumerate(self.points): if elt[1] == peak: index = i pos_peak = self.shift_sample[-1] self.points[index][1] = pos_peak break
# Post-treatment functions
[docs] def blind_deconvolution(self, default_width: float = None): """Subtracts a constant width to all the linewidth array and recomputes the BLT array. If default_width is not specified, the width is estimated by fitting a Lorentzian to the most proeminent elastic peak. If no elastic peak are specified, and default_width is not specified, no deconvolution is performed. Parameters ---------- default_width : float, optional The value of the width to subtract to the linewidth array, by default None. If None, the width is estimated by fitting a Lorentzian to the most proeminent elastic peak. """ # If default_width is not specified, we estimate the width by fitting a Lorentzian to the most proeminent elastic peak if default_width is None: # Extract the peaks that are not elastic or not windows peaks = [p[1] for p in self.points if p[0][0] == "E"] windows = [k for p, k in zip(self.points, self.windows) if p[0][0] == "E"] # Return if there are no peaks if len(peaks) == 0: return # Select the most proeminent peak sel = 0 I = 0 for i, p in enumerate(peaks): pos = np.argmin(np.abs(self.frequency_sample - p)) if self.PSD_sample[pos] > I: sel = i I = self.PSD_sample[pos] peak = peaks[sel] window = windows[sel] # Estimate the width of the peaks models = Models() lorentzian = models.models["Lorentzian"] # Fit the peak try: popt, pcov = curve_fit(lorentzian, self.frequency_sample[window[0]:window[1]], self.PSD_sample[window[0]:window[1]], p0=[0, I, peak, 1]) except: return default_width = popt[3] self.linewidth = self.linewidth-default_width self.BLT = self.linewidth/self.shift
[docs] def combine_results_FSR(self, FSR: float = 15, keep_max_amplitude: bool = False, amplitude_weight: bool = False, shift_err_weight: bool = False, position: list = None): """ Combines the results of the algorithm to have a value for frequency shift based on a known Free Spectral Range (FSR) value. The end shift value is obtained by "moving" the peak by a FSR value until the peak is within the [-FSR/2, FSR/2] range. Then the absolute value of the shift is taken as the end shift value. The combination of the result is done by taking the average of all the values by default. Alternatively, the user can choose to keep the maximum of the amplitude of the peak by setting the "keep_max_amplitude" parameter to True. The user can also choose to weight the shift and linewidth by the amplitude of the peak by setting the "amplitude_weight" parameter to True. Note that in the latter case, the precise knowledge of the frequency axis is a must as averaging slightly uncentered peaks will lead to a wrong result. Parameters ---------- FSR : float, optional The Free Spectral Range of the spectrometer, by default 15Ghz keep_max_amplitude : bool, optional If True, the maximum of the peak amplitude is stored in the amplitude array. If False, an average of all the amplitudes is stored. Default is False. amplitude_weight : bool, optional If True, the amplitude of the spectra is used to weight the shift and linewidth. If set to false, a simple average is performed. Default is False. shift_err_weight : bool, optional If True, the inverse of the standard deviation of the shift is used to weight the shift and linewidth. If set to false, a simple average is performed. Default is False. position: list, optional The position of the spectrum to be updated in case we combine the sampled results. This is used to update the values of a spectrum that has been re-fitted. """ def nature_peaks(): """ Returns the nature (Stokes or Anti-Stokes) of peaks that are not elastic or not windows """ tpe = [] for i in range(len(self.points)): # If the selected peak is an inelastic peak, extract the peak position and the window around it if self.points[i][0].split("_")[0] in ["Anti-Stokes", "Stokes"]: tpe.append(self.points[i][0].split("_")[0]) return tpe # Check if the user has set both combining methods to True, if so raise an error if keep_max_amplitude and amplitude_weight: raise ValueError("The parameters 'keep_max_amplitude' and 'amplitude_weight' cannot be both set to True.") if position is None: if len(self.shift.shape) == 1: shift = self.shift else: # Average the values of shift on all the points taken for the PSD. This is to only have the values of the shift of the interesting peaks. shift = np.nanmean(self.shift, axis = 0) while shift.ndim > 1: shift = np.mean(shift, axis = 0) else: shift = self.shift_sample # Get the nature of the peaks (Stokes or Anti-Stokes) nature = nature_peaks() # Compute the correction factor to ensure the shift is contained in [-FSR/2, FSR/2] k = [] for s in shift: temp = -(s+FSR/2)/FSR k.append(np.ceil(temp)) if position is None: # Realign the shift values if len(self.shift.shape) > 1: self.shift = np.moveaxis(self.shift, [0, -1], [-1, 0]) for i in range(len(k)): self.shift[i] += k[i] * FSR if nature[i] == "Anti-Stokes": self.shift[i] = -self.shift[i] self.shift = np.moveaxis(self.shift, [-1, 0], [0, -1]) else: for i in range(len(k)): self.shift[i] += k[i] * FSR if nature[i] == "Anti-Stokes": self.shift[i] = -self.shift[i] else: for i in range(len(k)): self.shift_sample[i] += k[i] * FSR if nature[i] == "Anti-Stokes": self.shift_sample[i] = -self.shift_sample[i] offset_sample = np.array(self.offset_sample) shift_sample = np.array(self.shift_sample) linewidth_sample = np.array(self.linewidth_sample) amplitude_sample = np.array(self.amplitude_sample) shift_err_sample = np.array(self.shift_err_sample) linewidth_err_sample = np.array(self.linewidth_err_sample) amplitude_err_sample = np.array(self.amplitude_err_sample) # If keep_max_amplitude is set to True, we select only the shift corresponding to the maximum amplitude if keep_max_amplitude: if position is None: # For each set of amplitudes along the last axis, find the index of the maximum amplitude max_indices = np.argmax(self.amplitude, axis=-1) # Use advanced indexing to select the corresponding shift values self.offset = np.take_along_axis(self.offset, np.expand_dims(max_indices, axis=-1), axis=-1).squeeze(-1) self.shift = np.take_along_axis(self.shift, np.expand_dims(max_indices, axis=-1), axis=-1).squeeze(-1) self.amplitude = np.take_along_axis(self.amplitude, np.expand_dims(max_indices, axis=-1), axis=-1).squeeze(-1) self.linewidth = np.take_along_axis(self.linewidth, np.expand_dims(max_indices, axis=-1), axis=-1).squeeze(-1) self.shift_var = np.take_along_axis(self.shift_var, np.expand_dims(max_indices, axis=-1), axis=-1).squeeze(-1) self.linewidth_var = np.take_along_axis(self.linewidth_var, np.expand_dims(max_indices, axis=-1), axis=-1).squeeze(-1) self.amplitude_var = np.take_along_axis(self.amplitude_var, np.expand_dims(max_indices, axis=-1), axis=-1).squeeze(-1) else: pos = np.argmax(amplitude_sample) self.offset[tuple(position)] = offset_sample[pos] self.shift[tuple(position)] = shift_sample[pos] self.linewidth[tuple(position)] = linewidth_sample[pos] self.shift_var[tuple(position)] = shift_err_sample[pos] self.linewidth_var[tuple(position)] = linewidth_err_sample[pos] self.amplitude[tuple(position)] = amplitude_sample[pos] self.amplitude_var[tuple(position)] = amplitude_err_sample[pos] elif amplitude_weight: if position is None: # For each set of amplitudes along the last axis, calculate the weighted average of the shift and linewidth self.offset = np.average(self.offset, axis=-1, weights=self.amplitude) self.shift = np.average(self.shift, axis=-1, weights=self.amplitude) self.linewidth = np.average(self.linewidth, axis=-1, weights=self.amplitude) self.amplitude = np.average(self.amplitude, axis=-1, weights=self.amplitude) self.shift_var = np.sum(self.shift_var**2 * self.amplitude**2, axis = -1) / np.sum(self.amplitude**2, axis=-1) self.linewidth_var = np.sum(self.linewidth_var**2 * self.amplitude**2, axis = -1) / np.sum(self.amplitude**2, axis=-1) self.amplitude_var = np.sum(self.amplitude_var**4, axis = -1) / np.sum(self.amplitude**2, axis=-1) else: self.offset[tuple(position)] = np.average(offset_sample) self.shift[tuple(position)] = np.average(shift_sample) self.linewidth[tuple(position)] = np.average(linewidth_sample) self.shift_var[tuple(position)] = np.sum(shift_err_sample**2 * amplitude_sample**2) / np.sum(amplitude_sample**2, axis=-1) self.linewidth_var[tuple(position)] = np.sum(linewidth_err_sample**2 * amplitude_sample**2) / np.sum(amplitude_sample**2, axis=-1) self.amplitude[tuple(position)] = np.average(amplitude_sample) self.amplitude_var[tuple(position)] = np.sum(amplitude_err_sample**4) / np.sum(amplitude_sample**2, axis=-1) elif shift_err_weight: if position is None: # For each set of amplitudes along the last axis, calculate the weighted average of the shift and linewidth self.offset = np.average(self.offset, axis=-1, weights=1/self.shift_var) self.shift = np.average(self.shift, axis=-1, weights=1/self.shift_var) self.linewidth = np.average(self.linewidth, axis=-1, weights=1/self.shift_var) self.amplitude = np.average(self.amplitude, axis=-1, weights=1/self.shift_var) self.linewidth_var = np.sum(self.linewidth_var**2 / self.shift_var**2, axis = -1) / np.sum(1/self.shift_var**2, axis=-1) self.amplitude_var = np.sum(self.amplitude_var**2 / self.shift_var**2, axis = -1) / np.sum(1/self.shift_var**2, axis=-1) self.shift_var = self.shift_var.shape[-1] / np.sum(1/self.shift_var**2, axis=-1) else: self.offset[tuple(position)] = np.average(offset_sample, weights=1/shift_err_sample) self.shift[tuple(position)] = np.average(shift_sample, weights=1/shift_err_sample) self.linewidth[tuple(position)] = np.average(linewidth_sample, weights=1/shift_err_sample) self.amplitude[tuple(position)] = np.average(amplitude_sample, weights=1/shift_err_sample) self.shift_var[tuple(position)] = len(shift_err_sample) / np.sum(1/shift_err_sample**2) self.linewidth_var[tuple(position)] = np.sum(linewidth_err_sample**2 / shift_err_sample**2) / np.sum(1 / shift_err_sample**2) self.amplitude_var[tuple(position)] = np.sum(amplitude_err_sample**2 / shift_err_sample**2) / np.sum(1 / shift_err_sample**2) else: if position is None: self.offset = np.nanmean(self.offset, axis=-1) self.shift = np.nanmean(self.shift, axis=-1) self.linewidth = np.nanmean(self.linewidth, axis=-1) self.amplitude = np.nanmean(self.amplitude, axis=-1) self.linewidth_var = np.nanmean(self.linewidth_var**2, axis = -1) self.amplitude_var = np.nanmean(self.amplitude_var**2, axis = -1) self.shift_var = np.nanmean(self.shift_var**2, axis = -1) else: self.offset[tuple(position)] = np.nanmean(offset_sample) self.shift[tuple(position)] = np.nanmean(shift_sample) self.linewidth[tuple(position)] = np.nanmean(linewidth_sample) self.amplitude[tuple(position)] = np.nanmean(amplitude_sample) self.linewidth_var[tuple(position)] = np.nanmean(linewidth_err_sample**2) self.amplitude_var[tuple(position)] = np.nanmean(amplitude_err_sample**2) self.shift_var[tuple(position)] = np.nanmean(shift_err_sample**2) if position is None: self.BLT = self.linewidth / self.shift self.BLT_var = self.BLT**2 * ((self.linewidth_var / self.linewidth)**2 + (self.shift_var / self.shift)**2) else: self.BLT[tuple(position)] = self.linewidth[tuple(position)] / self.shift[tuple(position)] self.BLT_var[tuple(position)] = self.BLT[tuple(position)]**2 * ((self.linewidth_var[tuple(position)] / self.linewidth[tuple(position)])**2 + (self.shift_var[tuple(position)] / self.shift[tuple(position)])**2)
# Outliers
[docs] def mark_errors_shift(self, min_shift: float = 0, max_shift: float = 10): """Marks the points that present a value of shift above or below given thresholds. Parameters ---------- min_shift: float, optional The threshold below which the shift is marked as an error , by default 0GHz max_shift: float, optional The threshold above which the shift is marked as an error , by default 10GHz """ positions = np.where(self.shift > max_shift) for i in range(len(positions[0])): pos = [int(p[i]) for p in positions] self.point_error.append(pos) self.point_error_type.append("shift_max") self.point_error_value.append(self.shift[tuple(pos)]) self.shift_var[positions] = np.nan self.shift[positions] = np.nan self.linewidth_var[positions] = np.nan self.linewidth[positions] = np.nan self.amplitude_var[positions] = np.nan self.amplitude[positions] = np.nan self.BLT_var[positions] = np.nan self.BLT[positions] = np.nan positions = np.where(self.shift < min_shift) for i in range(len(positions[0])): pos = [int(p[i]) for p in positions] self.point_error.append(pos) self.point_error_type.append("shift_min") self.point_error_value.append(self.shift[tuple(pos)]) self.shift_var[positions] = np.nan self.shift[positions] = np.nan self.linewidth_var[positions] = np.nan self.linewidth[positions] = np.nan self.amplitude_var[positions] = np.nan self.amplitude[positions] = np.nan self.BLT_var[positions] = np.nan self.BLT[positions] = np.nan
[docs] def mark_errors_std_shift(self, max_error_shift_variance: float = 0.005): """Marks the points that present a variance of the shift greater than a certain threshold. Parameters ---------- max_error_shift_err : float, optional The threshold above which the shift is marked as an error , by default 5MHz """ positions = np.where(self.shift_var > max_error_shift_variance) for i in range(len(positions[0])): pos = [int(p[i]) for p in positions] self.point_error.append(pos) self.point_error_type.append("shift_var") self.point_error_value.append(self.shift_var[tuple(pos)]) self.shift_var[positions] = np.nan self.shift[positions] = np.nan self.linewidth_var[positions] = np.nan self.linewidth[positions] = np.nan self.amplitude_var[positions] = np.nan self.amplitude[positions] = np.nan self.BLT_var[positions] = np.nan self.BLT[positions] = np.nan
[docs] def mark_errors_linewidth(self, min_linewidth: float = 0, max_linewidth: float = 10): """Marks the points that present a value of linewidth above or below given thresholds. Parameters ---------- min_linewidth: float, optional The threshold below which the linewidth is marked as an error , by default 0GHz max_linewidth: float, optional The threshold above which the linewidth is marked as an error , by default 10GHz """ positions = np.where(self.linewidth > max_linewidth) for i in range(len(positions[0])): pos = [int(p[i]) for p in positions] self.point_error.append(pos) self.point_error_type.append("linewidth_max") self.point_error_value.append(self.linewidth[tuple(pos)]) self.shift_var[positions] = np.nan self.shift[positions] = np.nan self.linewidth_var[positions] = np.nan self.linewidth[positions] = np.nan self.amplitude_var[positions] = np.nan self.amplitude[positions] = np.nan self.BLT_var[positions] = np.nan self.BLT[positions] = np.nan positions = np.where(self.linewidth < min_linewidth) for i in range(len(positions[0])): pos = [int(p[i]) for p in positions] self.point_error.append(pos) self.point_error_type.append("linewidth_min") self.point_error_value.append(self.linewidth[tuple(pos)]) self.shift_var[positions] = np.nan self.shift[positions] = np.nan self.linewidth_var[positions] = np.nan self.linewidth[positions] = np.nan self.amplitude_var[positions] = np.nan self.amplitude[positions] = np.nan self.BLT_var[positions] = np.nan self.BLT[positions] = np.nan
[docs] def mark_errors_std_linewidth(self, max_error_linewidth_variance: float = 0.005): """Marks the points that present a variance of the linewidth greater than a certain threshold. Parameters ---------- max_error_linewidth_variance : float, optional The threshold above which the linewidth is marked as an error , by default 5MHz """ positions = np.where(self.linewidth_var > max_error_linewidth_variance) for i in range(len(positions[0])): pos = [int(p[i]) for p in positions] self.point_error.append(pos) self.point_error_type.append("linewidth_var") self.point_error_value.append(self.linewidth_var[tuple(pos)]) self.shift_var[positions] = np.nan self.shift[positions] = np.nan self.linewidth_var[positions] = np.nan self.linewidth[positions] = np.nan self.amplitude_var[positions] = np.nan self.amplitude[positions] = np.nan self.BLT_var[positions] = np.nan self.BLT[positions] = np.nan
[docs] def mark_errors_BLT(self, min_BLT: float = 0, max_BLT: float = 10): """Marks the points that present a value of BLT above or below given thresholds. Parameters ---------- min_shift: float, optional The threshold below which the shift is marked as an error , by default 0GHz max_shift: float, optional The threshold above which the shift is marked as an error , by default 10GHz """ positions = np.where(self.BLT > max_BLT) for i in range(len(positions[0])): pos = [int(p[i]) for p in positions] self.point_error.append(pos) self.point_error_type.append("BLT_max") self.point_error_value.append(self.BLT[tuple(pos)]) self.shift_var[positions] = np.nan self.shift[positions] = np.nan self.linewidth_var[positions] = np.nan self.linewidth[positions] = np.nan self.amplitude_var[positions] = np.nan self.amplitude[positions] = np.nan self.BLT_var[positions] = np.nan self.BLT[positions] = np.nan positions = np.where(self.BLT < min_BLT) for i in range(len(positions[0])): pos = [int(p[i]) for p in positions] self.point_error.append(pos) self.point_error_type.append("BLT_min") self.point_error_value.append(self.BLT[tuple(pos)]) self.shift_var[positions] = np.nan self.shift[positions] = np.nan self.linewidth_var[positions] = np.nan self.linewidth[positions] = np.nan self.amplitude_var[positions] = np.nan self.amplitude[positions] = np.nan self.BLT_var[positions] = np.nan self.BLT[positions] = np.nan
[docs] def mark_errors_std_BLT(self, max_error_BLT_variance: float = 0.005): """Marks the points that present a variance of the BLT greater than a certain threshold. Parameters ---------- max_error_shift_err : float, optional The threshold above which the shift is marked as an error , by default 5MHz """ positions = np.where(self.BLT_var > max_error_BLT_variance) for i in range(len(positions[0])): pos = [int(p[i]) for p in positions] self.point_error.append(pos) self.point_error_type.append("BLT_var") self.point_error_value.append(self.BLT_var[tuple(pos)]) self.shift_var[positions] = np.nan self.shift[positions] = np.nan self.linewidth_var[positions] = np.nan self.linewidth[positions] = np.nan self.amplitude_var[positions] = np.nan self.amplitude[positions] = np.nan self.BLT_var[positions] = np.nan self.BLT[positions] = np.nan
[docs] def mark_point_error(self, position : list): """ Forces a point located at the position "position" to be considered as an error. Parameters ---------- position : list The position of the point error to be marked. Returns ------- None """ self.point_error.append(position) self.point_error_type.append("point_error") self.point_error_value.append(np.nan) self.shift_var[tuple(position)] = np.nan self.shift[tuple(position)] = np.nan self.linewidth_var[tuple(position)] = np.nan self.linewidth[tuple(position)] = np.nan self.amplitude_var[tuple(position)] = np.nan self.amplitude[tuple(position)] = np.nan self.BLT_var[tuple(position)] = np.nan self.BLT[tuple(position)] = np.nan