Source code for synthesizer.line

"""A module containing functionality for working with spectral lines.

The primary class is Line which holds information about an individual or
blended emission line, including its identification, wavelength, luminosity,
and the strength of the continuum. From these the equivalent width is
automatically calculated. Combined with a redshift and cosmology the flux can
also be calcualted.

A second class is LineCollection which holds a collection of Line objects and
provides additional functionality such as calcualting line ratios and diagrams
(e.g. BPT-NII, OHNO).

Several functions exist for obtaining line, ratio, and diagram labels for use
in plots etc.

"""

import numpy as np
from unyt import Angstrom, Hz, angstrom, cm, erg, s

from synthesizer import exceptions, line_ratios
from synthesizer.conversions import lnu_to_llam, standard_to_vacuum
from synthesizer.units import Quantity, accepts
from synthesizer.warnings import deprecation


[docs] def get_line_id(id): """ A function for converting a line id possibly represented as a list to a single string. Args id (str, list, tuple) a str, list, or tuple containing the id(s) of the lines Returns id (str) string representation of the id """ if isinstance(id, list): return ", ".join(id) else: return id
[docs] def get_line_label(line_id): """ Get a line label for a given line_id, ratio, or diagram. Where the line_id is one of several predifined lines in line_ratios.line_labels this label is used, otherwise the label is constructed from the line_id. Argumnents line_id (str or list) The line_id either as a list of individual lines or a string. If provided as a list this is automatically converted to a single string so it can be used as a key. Returns line_label (str) A nicely formatted line label. """ # if the line_id is a list (denoting a doublet or higher) if isinstance(line_id, list): line_id = ", ".join(line_id) if line_id in line_ratios.line_labels.keys(): line_label = line_ratios.line_labels[line_id] else: line_id = line_id.split(",") line_labels = [] for line_id_ in line_id: # get the element, ion, and wavelength element, ion, wavelength = line_id_.split(" ") # extract unit and convert to latex str unit = wavelength[-1] if unit == "A": unit = r"\AA" if unit == "m": unit = r"\mu m" wavelength = wavelength[:-1] + unit line_labels.append( f"{element}{get_roman_numeral(int(ion))}{wavelength}" ) line_label = "+".join(line_labels) return line_label
[docs] def flatten_linelist(list_to_flatten): """ Flatten a mixed list of lists and strings and remove duplicates. Used when converting a line list which may contain single lines and doublets. Args: list_to_flatten (list) list containing lists and/or strings and integers Returns: (list) flattened list """ flattened_list = [] for lst in list_to_flatten: if isinstance(lst, list) or isinstance(lst, tuple): for ll in lst: flattened_list.append(ll) elif isinstance(lst, str): # If the line is a doublet, resolve it and add each line # individually if len(lst.split(",")) > 1: flattened_list += lst.split(",") else: flattened_list.append(lst) else: raise Exception( ( "Unrecognised type provided. Please provide" "a list of lists and strings" ) ) return list(set(flattened_list))
[docs] def get_ratio_label(ratio_id): """ Get a label for a given ratio_id. Args: ratio_id (str) The ratio identificantion, e.g. R23. Returns: label (str) A string representation of the label. """ # get the list of lines for a given ratio_id # if the id is a string get the lines from the line_ratios sub-module if isinstance(ratio_id, str): ratio_line_ids = line_ratios.ratios[ratio_id] if isinstance(ratio_id, list): ratio_line_ids = ratio_id numerator = get_line_label(ratio_line_ids[0]) denominator = get_line_label(ratio_line_ids[1]) label = f"{numerator}/{denominator}" return label
[docs] def get_diagram_labels(diagram_id): """ Get a x and y labels for a given diagram_id Args: diagram_id (str) The diagram identificantion, e.g. OHNO. Returns: xlabel (str) A string representation of the x-label. ylabel (str) A string representation of the y-label. """ # get the list of lines for a given ratio_id diagram_line_ids = line_ratios.diagrams[diagram_id] xlabel = get_ratio_label(diagram_line_ids[0]) ylabel = get_ratio_label(diagram_line_ids[1]) return xlabel, ylabel
[docs] def get_roman_numeral(number): """ Function to convert an integer into a roman numeral str. Used for renaming emission lines from the cloudy defaults. Args: number (int) The number to convert into a roman numeral. Returns: number_representation (str) String reprensentation of the roman numeral. """ num = [1, 4, 5, 9, 10, 40, 50, 90, 100, 400, 500, 900, 1000] sym = [ "I", "IV", "V", "IX", "X", "XL", "L", "XC", "C", "CD", "D", "CM", "M", ] i = 12 roman = "" while number: div = number // num[i] number %= num[i] while div: roman += sym[i] div -= 1 i -= 1 return roman
[docs] class LineCollection: """ A class holding a collection of emission lines. This enables additional functionality such as quickly calculating line ratios or line diagrams. Attributes: lines (dict) A dictionary of synthesizer.line.Line objects. line_ids (list) A list of line ids. wavelengths (unyt_array) An array of line wavelengths. available_ratios (list) A list of available line ratios. available_diagrams (list) A list of available line diagrams. """ def __init__(self, lines): """ Initialise LineCollection. Args: lines (dict) A dictionary of synthesizer.line.Line objects. """ # Dictionary of synthesizer.line.Line objects. self.lines = lines # Create an array of line_ids self.line_ids = np.array(list(self.lines.keys())) self._individual_line_ids = np.array( [li for lis in self.line_ids for li in lis.split(",")] ) # Atrributes to enable looping self._current_ind = 0 self.nlines = len(self.line_ids) # Create list of line wavelengths self.wavelengths = ( np.array( [ line.wavelength.to("Angstrom").value for line in self.lines.values() ] ) * Angstrom ) # Get the arguments that would sort wavelength sorted_arguments = np.argsort(self.wavelengths) # Sort the line_ids and wavelengths self.line_ids = self.line_ids[sorted_arguments] self.wavelengths = self.wavelengths[sorted_arguments] # Include line ratio and diagram definitions self.line_ratios = line_ratios # Create list of available line ratios self.available_ratios = [] for ratio_id, ratio in self.line_ratios.ratios.items(): # Create a set from the ratio line ids while also unpacking # any comma separated lines ratio_line_ids = set() for lis in ratio: ratio_line_ids.update({li.strip() for li in lis.split(",")}) # Check if line ratio is available if ratio_line_ids.issubset(self._individual_line_ids): self.available_ratios.append(ratio_id) # Create list of available line diagnostics self.available_diagrams = [] for diagram_id, diagram in self.line_ratios.diagrams.items(): # Create a set from the diagram line ids while also unpacking # any comma separated lines diagram_line_ids = set() for ratio in diagram: for lis in ratio: diagram_line_ids.update( {li.strip() for li in lis.split(",")} ) # Check if line ratio is available if set(diagram_line_ids).issubset(self.line_ids): self.available_diagrams.append(diagram_id) def __getitem__(self, line_id): """ Simply returns one particular line from the collection. Returns: line (synthesizer.line.Line) A synthesizer.line.Line object. """ return self.lines[line_id]
[docs] def concatenate(self, other): """ Concatenate two LineCollection objects together. Note that any duplicate lines will be taken from other (i.e. the LineCollection passed to concatenate). Args: other (LineCollection) A LineCollection object to concatenate with the current LineCollection object. Returns: LineCollection A new LineCollection object containing the lines from both LineCollection objects. """ # Ensure other is a line collection if not isinstance(other, LineCollection): raise TypeError( "Can only concatenate LineCollection objects together" ) # Combine the lines from each LineCollection object my_lines = self.lines.copy() my_lines.update(other.lines) return LineCollection(my_lines)
def __str__(self): """ Function to print a basic summary of the LineCollection object. Returns a string containing the id, wavelength, luminosity, equivalent width, and flux if generated. Returns: summary (str) Summary string containing the total mass formed and lists of the available SEDs, lines, and images. """ # Set up string for printing summary = "" # Add the content of the summary to the string to be printed summary += "-" * 10 + "\n" summary += "LINE COLLECTION\n" summary += f"number of lines: {len(self.line_ids)}\n" summary += f"lines: {self.line_ids}\n" summary += f"available ratios: {self.available_ratios}\n" summary += f"available diagrams: {self.available_diagrams}\n" summary += "-" * 10 return summary def __iter__(self): """ Overload iteration to allow simple looping over Line objects, combined with __next__ this enables for l in LineCollection syntax """ return self def __next__(self): """ Overload iteration to allow simple looping over Line objects, combined with __iter__ this enables for l in LineCollection syntax """ # Check we haven't finished if self._current_ind >= self.nlines: self._current_ind = 0 raise StopIteration else: # Increment index self._current_ind += 1 # Return the filter return self.lines[self.line_ids[self._current_ind - 1]]
[docs] def sum(self): """ For collections containing lines from multiple particles calculate the integrated line properties and create a new LineCollection object. """ summed_lines = {} for line_id, line in self.lines.items(): summed_lines[line_id] = line.sum() return LineCollection(summed_lines)
def _get_ratio(self, line1, line2): """ Measure (and return) a line ratio. Args: line1 (str) The line or lines in the numerator. line2 (str) The line or lines in the denominator. Returns: float a line ratio """ # If either line is a combination of lines check if we need to split if line1 in self.lines: line1 = [line1] else: line1 = [li.strip() for li in line1.split(",")] if line2 in self.lines: line2 = [line2] else: line2 = [li.strip() for li in line2.split(",")] return np.sum( [self.lines[_line].luminosity for _line in line1], axis=0 ) / np.sum([self.lines[_line].luminosity for _line in line2], axis=0)
[docs] def get_ratio(self, ratio_id): """ Measure (and return) a line ratio. Args: ratio_id (str, list) Either a ratio_id where the ratio lines are defined in line_ratios or a list of lines. Returns: float a line ratio """ # If ratio_id is a string interpret as a ratio_id for the ratios # defined in the line_ratios module... if isinstance(ratio_id, str): # Check if ratio_id exists if ratio_id not in self.line_ratios.available_ratios: raise exceptions.UnrecognisedOption( f"ratio_id not recognised ({ratio_id})" ) # Check if ratio_id exists elif ratio_id not in self.available_ratios: raise exceptions.UnrecognisedOption( "LineCollection is missing the lines required for " f"this ratio ({ratio_id})" ) line1, line2 = self.line_ratios.ratios[ratio_id] # Otherwise interpret as a list elif isinstance(ratio_id, list): line1, line2 = ratio_id return self._get_ratio(line1, line2)
[docs] def get_diagram(self, diagram_id): """ Return a pair of line ratios for a given diagram_id (E.g. BPT). Args: diagram_id (str, list) Either a diagram_id where the pairs of ratio lines are defined in line_ratios or a list of lists defining the ratios. Returns: tuple (float) a pair of line ratios """ # If ratio_id is a string interpret as a ratio_id for the ratios # defined in the line_ratios module... if isinstance(diagram_id, str): # check if ratio_id exists if diagram_id not in self.line_ratios.available_diagrams: raise exceptions.UnrecognisedOption( f"diagram_id not recognised ({diagram_id})" ) # check if ratio_id exists elif diagram_id not in self.available_diagrams: raise exceptions.UnrecognisedOption( "LineCollection is missing the lines required for " f"this diagram ({diagram_id})" ) ab, cd = self.line_ratios.diagrams[diagram_id] # Otherwise interpret as a list elif isinstance(diagram_id, list): ab, cd = diagram_id return self._get_ratio(*ab), self._get_ratio(*cd)
[docs] def get_ratio_label(self, ratio_id): """ Wrapper around get_ratio_label """ return get_ratio_label(ratio_id)
[docs] def get_diagram_labels(self, diagram_id): """ Wrapper around get_ratio_label """ return get_diagram_labels(diagram_id)
[docs] class Line: """ A class representing a spectral line or set of lines (e.g. a doublet). Although a Line can be instatiated directly most users should generate them using the various different "get_line" methods implemented across Synthesizer. A Line object can either be a single line or a combination of multiple, individually unresolved lines. A collection of Line objects are stored within a LineCollection which provides an interface to interact with multiple lines at once. Attributes: wavelength (Quantity) The standard (not vacuum) wavelength of the line. vacuum_wavelength (Quantity) The vacuum wavelength of the line. continuum (Quantity) The continuum at the line. luminosity (Quantity) The luminosity of the line. flux (Quantity) The flux of the line. equivalent_width (Quantity) The equivalent width of the line. individual_lines (list) A list of individual lines that make up this line. element (list) A list of the elements that make up this line. """ # Define quantities wavelength = Quantity() vacuum_wavelength = Quantity() continuum = Quantity() luminosity = Quantity() flux = Quantity() @accepts( wavelength=angstrom, luminosity=erg / s, continuum=erg / s / Hz, ) def __init__( self, line_id=None, wavelength=None, luminosity=None, continuum=None, combine_lines=(), ): """ Initialise the Line object. Args: line_id (str) The id of the line. If creating a >=doublet the line id will be derived while combining lines. This will not be used if lines are passed. wavelength (unyt_quantity) The standard (not vacuum) wavelength of the line. This will not be used if lines are passed. luminosity (unyt_quantity) The luminosity the line. This will not be used if lines are passed. continuum (unyt_quantity) The continuum at the line. This will not be used if lines are passed. combine_lines (tuple, Line) Any number of Line objects to combine into a single Line. If these are passed all other kwargs are ignored. """ # Flag deprecation of list and tuple ids if isinstance(line_id, (list, tuple)): deprecation( "Line objects should be created with a string id, not a list" " or tuple. This will be removed in a future version." ) # We need to check which version of the inputs we've been given, 3 # values describing a single line or a set of lines to combine? if ( len(combine_lines) == 0 and line_id is not None and wavelength is not None and luminosity is not None and continuum is not None ): self._make_line_from_values( line_id, wavelength, luminosity, continuum, ) elif len(combine_lines) > 0: self._make_line_from_lines(combine_lines) else: raise exceptions.InconsistentArguments( "A Line needs either its wavelength, luminosity, and continuum" " passed, or an arbitrary number of Lines to combine" ) # Initialise an attribute to hold any individual lines used to make # this one. self.individual_lines = ( combine_lines if len(combine_lines) > 0 else [self] ) # Initialise the flux (populated by get_flux when called) self.flux = None # Calculate the vacuum wavelength. self.vacuum_wavelength = standard_to_vacuum(self.wavelength) # Element self.element = [li.strip().split(" ")[0] for li in self.id.split(",")] @property def continuum_llam(self): """Return the continuum in units of Llam (erg/s/AA).""" return lnu_to_llam(self.wavelength, self.continuum) @property def equivalent_width(self): """Return the equivalent width.""" return self.luminosity / self.continuum_llam @accepts( wavelength=angstrom, luminosity=erg / s, continuum=erg / s / Hz, ) def _make_line_from_values( self, line_id, wavelength, luminosity, continuum ): """ Create line from explicit values. Args: line_id (str) The identifier for the line. wavelength (unyt_quantity) The standard (not vacuum) wavelength of the line. luminoisty (unyt_quantity) The luminoisty of the line. continuum (unyt_quantity) The continuum of the line. """ # Set the line attributes self.wavelength = wavelength self.luminosity = luminosity self.continuum = continuum self.id = get_line_id(line_id) def _make_line_from_lines(self, lines): """ Create a line by combining other lines. Args: lines (tuple, Line) Any number of Line objects to combine into a single line. """ # Ensure we've been handed lines if any([not isinstance(line, Line) for line in lines]): raise exceptions.InconsistentArguments( "args passed to a Line must all be Lines. Did you mean to " "pass keyword arguments for wavelength, luminosity and " f"continuum? (Got: {[*lines]})" ) # Combine the Line attributes (units are guaranteed here since the # quantities are coming directly from a Line) self.wavelength = np.mean([line.wavelength for line in lines], axis=0) self.luminosity = np.sum([line.luminosity for line in lines], axis=0) self.continuum = np.sum([line.continuum for line in lines], axis=0) # Derive the line id self.id = get_line_id([line.id for line in lines]) def __str__(self): """ Return a basic summary of the Line object. Returns a string containing the id, wavelength, luminosity, equivalent width, and flux if generated. Returns: summary (str) Summary string containing the total mass formed and lists of the available SEDs, lines, and images. """ # Set up string for printing pstr = "" # Add the content of the summary to the string to be printed pstr += "-" * 10 + "\n" pstr += f"SUMMARY OF {self.id}" + "\n" pstr += f"wavelength: {self.wavelength:.1f}" + "\n" if isinstance(self.luminosity, np.ndarray): mean_lum = np.mean(self._luminosity) pstr += f"Npart: {self.luminosity.size}\n" pstr += ( f"<log10(luminosity/{self.luminosity.units})>: " f"{np.log10(mean_lum):.2f}\n" ) mean_eq = np.mean(self.equivalent_width) pstr += f"<equivalent width>: {mean_eq:.0f}" + "\n" mean_flux = np.mean(self.flux) if self.flux is not None else None pstr += ( f"<log10(flux/{self.flux.units}): {np.log10(mean_flux):.2f}" if self.flux is not None else "" ) else: pstr += ( f"log10(luminosity/{self.luminosity.units}): " f"{np.log10(self.luminosity):.2f}\n" ) pstr += f"equivalent width: {self.equivalent_width:.0f}" + "\n" pstr += ( f"log10(flux/{self.flux.units}): {np.log10(self.flux):.2f}" if self.flux is not None else "" ) pstr += "-" * 10 return pstr def __add__(self, second_line): """ Add another line to self. Overloads + operator to allow direct addition of Line objects. Returns (Line) New instance of Line containing both lines. """ return Line(combine_lines=(self, second_line))
[docs] def sum(self): """ For objects containing lines of multiple particles sum them to produce the integrated quantities. """ return Line( line_id=self.id, wavelength=self.wavelength, luminosity=np.sum(self.luminosity), continuum=np.sum(self.continuum), )
[docs] def get_flux(self, cosmo, z): """ Calculate the line flux. Args: cosmo (astropy.cosmology.) Astropy cosmology object. z (float) The redshift. Returns: flux (float) Flux of the line in units of erg/s/cm2 by default. """ # Get the luminosity distance luminosity_distance = ( cosmo.luminosity_distance(z).to("cm").value ) * cm # Compute flux self.flux = self.luminosity / (4 * np.pi * luminosity_distance**2) return self.flux
[docs] def combine(self, *lines): """ Combine this line with an arbitrary number of other lines. This is important for combing >2 lines together since the simple line1 + line2 + line3 addition of multiple lines will not correctly average over all lines. Args: lines (Line) Any number of Line objects to combine into a single line. Returns: (Line) A new Line object containing the combined lines. """ # Ensure we've been handed lines if any([not isinstance(line, Line) for line in lines]): raise exceptions.InconsistentArguments( "args passed to a Line must all be Lines. Did you mean to " "pass keyword arguments for wavelength, luminosity and " "continuum" ) return Line(self, combine_lines=lines)
[docs] def apply_attenuation( self, tau_v, dust_curve, mask=None, ): """ Apply attenuation to this line. Args: tau_v (float/array-like, float) The V-band optical depth for every star particle. dust_curve (synthesizer.emission_models.attenuation.*) An instance of one of the dust attenuation models. (defined in synthesizer/emission_models.attenuation.py) mask (array-like, bool) A mask array with an entry for each line. Masked out spectra will be ignored when applying the attenuation. Only applicable for multidimensional lines. Returns: Line A new Line object containing the attenuated line. """ # Ensure the mask is compatible with the spectra if mask is not None: if self._luminosity.ndim < 1: raise exceptions.InconsistentArguments( "Masks are only applicable for Lines containing " "multiple elements" ) if self._luminosity.shape[0] != mask.size: raise exceptions.InconsistentArguments( "Mask and lines are incompatible shapes " f"({mask.shape}, {self._lnu.shape})" ) # If tau_v is an array it needs to match the spectra shape if isinstance(tau_v, np.ndarray): if self._luminosity.ndim < 1: raise exceptions.InconsistentArguments( "Arrays of tau_v values are only applicable for Lines" " containing multiple elements" ) if self._luminosity.shape[0] != tau_v.size: raise exceptions.InconsistentArguments( "tau_v and lines are incompatible shapes " f"({tau_v.shape}, {self._lnu.shape})" ) # Compute the transmission transmission = dust_curve.get_transmission(tau_v, self.wavelength) # Apply the transmision att_lum = self.luminosity att_cont = self.continuum if mask is None: att_lum *= transmission att_cont *= transmission else: att_lum[mask] *= transmission[mask] att_cont[mask] *= transmission[mask] return Line( line_id=self.id, wavelength=self.wavelength, luminosity=att_lum, continuum=att_cont, )