"""A module containing functionality for working with spectral lines.
The primary class is Line which holds information about an individual or
blended emission line, including its identification, wavelength, luminosity,
and the strength of the continuum. From these the equivalent width is
automatically calculated. Combined with a redshift and cosmology the flux can
also be calcualted.
A second class is LineCollection which holds a collection of Line objects and
provides additional functionality such as calcualting line ratios and diagrams
(e.g. BPT-NII, OHNO).
Several functions exist for obtaining line, ratio, and diagram labels for use
in plots etc.
"""
import numpy as np
from unyt import Angstrom, cm, unyt_array, unyt_quantity
from synthesizer import exceptions, line_ratios
from synthesizer.conversions import lnu_to_llam, standard_to_vacuum
from synthesizer.units import Quantity
from synthesizer.warnings import deprecation
[docs]def get_line_id(id):
"""
A function for converting a line id possibly represented as a list to
a single string.
Args
id (str, list, tuple)
a str, list, or tuple containing the id(s) of the lines
Returns
id (str)
string representation of the id
"""
if isinstance(id, list):
return ", ".join(id)
else:
return id
[docs]def get_line_label(line_id):
"""
Get a line label for a given line_id, ratio, or diagram. Where the line_id
is one of several predifined lines in line_ratios.line_labels this label
is used, otherwise the label is constructed from the line_id.
Argumnents
line_id (str or list)
The line_id either as a list of individual lines or a string. If
provided as a list this is automatically converted to a single
string so it can be used as a key.
Returns
line_label (str)
A nicely formatted line label.
"""
# if the line_id is a list (denoting a doublet or higher)
if isinstance(line_id, list):
line_id = ", ".join(line_id)
if line_id in line_ratios.line_labels.keys():
line_label = line_ratios.line_labels[line_id]
else:
line_id = line_id.split(",")
line_labels = []
for line_id_ in line_id:
# get the element, ion, and wavelength
element, ion, wavelength = line_id_.split(" ")
# extract unit and convert to latex str
unit = wavelength[-1]
if unit == "A":
unit = r"\AA"
if unit == "m":
unit = r"\mu m"
wavelength = wavelength[:-1] + unit
line_labels.append(
f"{element}{get_roman_numeral(int(ion))}{wavelength}"
)
line_label = "+".join(line_labels)
return line_label
[docs]def flatten_linelist(list_to_flatten):
"""
Flatten a mixed list of lists and strings and remove duplicates.
Used when converting a line list which may contain single lines
and doublets.
Args:
list_to_flatten (list)
list containing lists and/or strings and integers
Returns:
(list)
flattened list
"""
flattened_list = []
for lst in list_to_flatten:
if isinstance(lst, list) or isinstance(lst, tuple):
for ll in lst:
flattened_list.append(ll)
elif isinstance(lst, str):
# If the line is a doublet, resolve it and add each line
# individually
if len(lst.split(",")) > 1:
flattened_list += lst.split(",")
else:
flattened_list.append(lst)
else:
raise Exception(
(
"Unrecognised type provided. Please provide"
"a list of lists and strings"
)
)
return list(set(flattened_list))
[docs]def get_ratio_label(ratio_id):
"""
Get a label for a given ratio_id.
Args:
ratio_id (str)
The ratio identificantion, e.g. R23.
Returns:
label (str)
A string representation of the label.
"""
# get the list of lines for a given ratio_id
# if the id is a string get the lines from the line_ratios sub-module
if isinstance(ratio_id, str):
ratio_line_ids = line_ratios.ratios[ratio_id]
if isinstance(ratio_id, list):
ratio_line_ids = ratio_id
numerator = get_line_label(ratio_line_ids[0])
denominator = get_line_label(ratio_line_ids[1])
label = f"{numerator}/{denominator}"
return label
[docs]def get_diagram_labels(diagram_id):
"""
Get a x and y labels for a given diagram_id
Args:
diagram_id (str)
The diagram identificantion, e.g. OHNO.
Returns:
xlabel (str)
A string representation of the x-label.
ylabel (str)
A string representation of the y-label.
"""
# get the list of lines for a given ratio_id
diagram_line_ids = line_ratios.diagrams[diagram_id]
xlabel = get_ratio_label(diagram_line_ids[0])
ylabel = get_ratio_label(diagram_line_ids[1])
return xlabel, ylabel
[docs]def get_roman_numeral(number):
"""
Function to convert an integer into a roman numeral str.
Used for renaming emission lines from the cloudy defaults.
Args:
number (int)
The number to convert into a roman numeral.
Returns:
number_representation (str)
String reprensentation of the roman numeral.
"""
num = [1, 4, 5, 9, 10, 40, 50, 90, 100, 400, 500, 900, 1000]
sym = [
"I",
"IV",
"V",
"IX",
"X",
"XL",
"L",
"XC",
"C",
"CD",
"D",
"CM",
"M",
]
i = 12
roman = ""
while number:
div = number // num[i]
number %= num[i]
while div:
roman += sym[i]
div -= 1
i -= 1
return roman
[docs]class LineCollection:
"""
A class holding a collection of emission lines. This enables additional
functionality such as quickly calculating line ratios or line diagrams.
Args
lines (dictionary of Line objects)
A dictionary of line objects.
Methods
"""
def __init__(self, lines):
"""
Initialise LineCollection.
Args:
lines (dict)
A dictionary of synthesizer.line.Line objects.
"""
# Dictionary of synthesizer.line.Line objects.
self.lines = lines
# Create an array of line_ids
self.line_ids = np.array(list(self.lines.keys()))
self._individual_line_ids = np.array(
[li for lis in self.line_ids for li in lis.split(",")]
)
# Atrributes to enable looping
self._current_ind = 0
self.nlines = len(self.line_ids)
# Create list of line wavelengths
self.wavelengths = (
np.array(
[
line.wavelength.to("Angstrom").value
for line in self.lines.values()
]
)
* Angstrom
)
# Get the arguments that would sort wavelength
sorted_arguments = np.argsort(self.wavelengths)
# Sort the line_ids and wavelengths
self.line_ids = self.line_ids[sorted_arguments]
self.wavelengths = self.wavelengths[sorted_arguments]
# Include line ratio and diagram definitions
self.line_ratios = line_ratios
# Create list of available line ratios
self.available_ratios = []
for ratio_id, ratio in self.line_ratios.ratios.items():
# Create a set from the ratio line ids while also unpacking
# any comma separated lines
ratio_line_ids = set()
for lis in ratio:
ratio_line_ids.update({li.strip() for li in lis.split(",")})
# Check if line ratio is available
if ratio_line_ids.issubset(self._individual_line_ids):
self.available_ratios.append(ratio_id)
# Create list of available line diagnostics
self.available_diagrams = []
for diagram_id, diagram in self.line_ratios.diagrams.items():
# Create a set from the diagram line ids while also unpacking
# any comma separated lines
diagram_line_ids = set()
for ratio in diagram:
for lis in ratio:
diagram_line_ids.update(
{li.strip() for li in lis.split(",")}
)
# Check if line ratio is available
if set(diagram_line_ids).issubset(self.line_ids):
self.available_diagrams.append(diagram_id)
def __getitem__(self, line_id):
"""
Simply returns one particular line from the collection.
Returns:
line (synthesizer.line.Line)
A synthesizer.line.Line object.
"""
return self.lines[line_id]
[docs] def concatenate(self, other):
"""
Concatenate two LineCollection objects together.
Note that any duplicate lines will be taken from other (i.e. the
LineCollection passed to concatenate).
Args:
other (LineCollection)
A LineCollection object to concatenate with the current
LineCollection object.
Returns:
LineCollection
A new LineCollection object containing the lines from
both LineCollection objects.
"""
# Ensure other is a line collection
if not isinstance(other, LineCollection):
raise TypeError(
"Can only concatenate LineCollection objects together"
)
# Combine the lines from each LineCollection object
my_lines = self.lines.copy()
my_lines.update(other.lines)
return LineCollection(my_lines)
def __str__(self):
"""
Function to print a basic summary of the LineCollection object.
Returns a string containing the id, wavelength, luminosity,
equivalent width, and flux if generated.
Returns:
summary (str)
Summary string containing the total mass formed and
lists of the available SEDs, lines, and images.
"""
# Set up string for printing
summary = ""
# Add the content of the summary to the string to be printed
summary += "-" * 10 + "\n"
summary += "LINE COLLECTION\n"
summary += f"number of lines: {len(self.line_ids)}\n"
summary += f"lines: {self.line_ids}\n"
summary += f"available ratios: {self.available_ratios}\n"
summary += f"available diagrams: {self.available_diagrams}\n"
summary += "-" * 10
return summary
def __iter__(self):
"""
Overload iteration to allow simple looping over Line objects,
combined with __next__ this enables for l in LineCollection syntax
"""
return self
def __next__(self):
"""
Overload iteration to allow simple looping over Line objects,
combined with __iter__ this enables for l in LineCollection syntax
"""
# Check we haven't finished
if self._current_ind >= self.nlines:
self._current_ind = 0
raise StopIteration
else:
# Increment index
self._current_ind += 1
# Return the filter
return self.lines[self.line_ids[self._current_ind - 1]]
[docs] def sum(self):
"""
For collections containing lines from multiple particles calculate the
integrated line properties and create a new LineCollection object.
"""
summed_lines = {}
for line_id, line in self.lines.items():
summed_lines[line_id] = line.sum()
return LineCollection(summed_lines)
def _get_ratio(self, line1, line2):
"""
Measure (and return) a line ratio.
Args:
line1 (str)
The line or lines in the numerator.
line2 (str)
The line or lines in the denominator.
Returns:
float
a line ratio
"""
# If either line is a combination of lines check if we need to split
if line1 in self.lines:
line1 = [line1]
else:
line1 = [li.strip() for li in line1.split(",")]
if line2 in self.lines:
line2 = [line2]
else:
line2 = [li.strip() for li in line2.split(",")]
return np.sum(
[self.lines[_line].luminosity for _line in line1], axis=0
) / np.sum([self.lines[_line].luminosity for _line in line2], axis=0)
[docs] def get_ratio(self, ratio_id):
"""
Measure (and return) a line ratio.
Args:
ratio_id (str, list)
Either a ratio_id where the ratio lines are defined in
line_ratios or a list of lines.
Returns:
float
a line ratio
"""
# If ratio_id is a string interpret as a ratio_id for the ratios
# defined in the line_ratios module...
if isinstance(ratio_id, str):
# Check if ratio_id exists
if ratio_id not in self.line_ratios.available_ratios:
raise exceptions.UnrecognisedOption(
f"ratio_id not recognised ({ratio_id})"
)
# Check if ratio_id exists
elif ratio_id not in self.available_ratios:
raise exceptions.UnrecognisedOption(
"LineCollection is missing the lines required for "
f"this ratio ({ratio_id})"
)
line1, line2 = self.line_ratios.ratios[ratio_id]
# Otherwise interpret as a list
elif isinstance(ratio_id, list):
line1, line2 = ratio_id
return self._get_ratio(line1, line2)
[docs] def get_diagram(self, diagram_id):
"""
Return a pair of line ratios for a given diagram_id (E.g. BPT).
Args:
diagram_id (str, list)
Either a diagram_id where the pairs of ratio lines are defined
in line_ratios or a list of lists defining the ratios.
Returns:
tuple (float)
a pair of line ratios
"""
# If ratio_id is a string interpret as a ratio_id for the ratios
# defined in the line_ratios module...
if isinstance(diagram_id, str):
# check if ratio_id exists
if diagram_id not in self.line_ratios.available_diagrams:
raise exceptions.UnrecognisedOption(
f"diagram_id not recognised ({diagram_id})"
)
# check if ratio_id exists
elif diagram_id not in self.available_diagrams:
raise exceptions.UnrecognisedOption(
"LineCollection is missing the lines required for "
f"this diagram ({diagram_id})"
)
ab, cd = self.line_ratios.diagrams[diagram_id]
# Otherwise interpret as a list
elif isinstance(diagram_id, list):
ab, cd = diagram_id
return self._get_ratio(*ab), self._get_ratio(*cd)
[docs] def get_ratio_label(self, ratio_id):
"""
Wrapper around get_ratio_label
"""
return get_ratio_label(ratio_id)
[docs] def get_diagram_labels(self, diagram_id):
"""
Wrapper around get_ratio_label
"""
return get_diagram_labels(diagram_id)
[docs]class Line:
"""
A class representing a spectral line or set of lines (e.g. a doublet).
Although a Line can be instatiated directly most users should generate
them using the various different "get_line" methods implemented across
Synthesizer.
A Line object can either be a single line or a combination of multiple,
individually unresolved lines.
A collection of Line objects are stored within a LineCollection which
provides an interface to interact with multiple lines at once.
Attributes:
wavelength (Quantity)
The standard (not vacuum) wavelength of the line.
vacuum_wavelength (Quantity)
The vacuum wavelength of the line.
continuum (Quantity)
The continuum at the line.
luminosity (Quantity)
The luminosity of the line.
flux (Quantity)
The flux of the line.
equivalent_width (Quantity)
The equivalent width of the line.
individual_lines (list)
A list of individual lines that make up this line.
element (list)
A list of the elements that make up this line.
"""
# Define quantities
wavelength = Quantity()
vacuum_wavelength = Quantity()
continuum = Quantity()
luminosity = Quantity()
flux = Quantity()
equivalent_width = Quantity()
def __init__(
self,
*lines,
line_id=None,
wavelength=None,
luminosity=None,
continuum=None,
):
"""
Initialise the Line object.
Args:
lines (Line)
Any number of Line objects to combine into a single Line. If
these are passed all other kwargs are ignored.
line_id (str)
The id of the line. If creating a >=doublet the line id will be
derived while combining lines. This will not be used if lines
are passed.
wavelength (unyt_quantity)
The standard (not vacuum) wavelength of the line. This
will not be used if lines are passed.
luminosity (unyt_quantity)
The luminosity the line. This will not be used if
lines are passed.
continuum (unyt_quantity)
The continuum at the line. This will not be used if
lines are passed.
"""
# Flag deprecation of list and tuple ids
if isinstance(line_id, (list, tuple)):
deprecation(
"Line objects should be created with a string id, not a list"
" or tuple. This will be removed in a future version."
)
# We need to check which version of the inputs we've been given, 3
# values describing a single line or a set of lines to combine?
if (
len(lines) == 0
and line_id is not None
and wavelength is not None
and luminosity is not None
and continuum is not None
):
self._make_line_from_values(
line_id,
wavelength,
luminosity,
continuum,
)
elif len(lines) > 0:
self._make_line_from_lines(*lines)
else:
raise exceptions.InconsistentArguments(
"A Line needs either its wavelength, luminosity, and continuum"
" passed, or an arbitrary number of Lines to combine"
)
# Initialise an attribute to hold any individual lines used to make
# this one.
self.individual_lines = lines if len(lines) > 0 else [self]
# Initialise the flux (populated by get_flux when called)
self.flux = None
# Calculate the vacuum wavelength.
self.vacuum_wavelength = standard_to_vacuum(self.wavelength)
# Continuum at line wavelength
self.continuum_lam = lnu_to_llam(self.wavelength, self.continuum)
self.equivalent_width = self.luminosity / self.continuum_lam
# Element
self.element = [li.strip().split(" ")[0] for li in self.id.split(",")]
def _make_line_from_values(
self, line_id, wavelength, luminosity, continuum
):
"""
Create line from explicit values.
Args:
line_id (str)
The identifier for the line.
wavelength (unyt_quantity)
The standard (not vacuum) wavelength of the line.
luminoisty (unyt_quantity)
The luminoisty of the line.
continuum (unyt_quantity)
The continuum of the line.
"""
# Ensure we have units
if not isinstance(wavelength, (unyt_quantity, unyt_array)):
raise exceptions.MissingUnits(
"Wavelength, luminosity, and continuum must all have units. "
"Wavelength units missing..."
)
if not isinstance(luminosity, (unyt_quantity, unyt_array)):
raise exceptions.MissingUnits(
"Wavelength, luminosity, and continuum must all have units. "
"Luminosity units missing..."
)
if not isinstance(continuum, (unyt_quantity, unyt_array)):
raise exceptions.MissingUnits(
"Wavelength, luminosity, and continuum must all have units. "
"Continuum units missing..."
)
# Set the line attributes
self.wavelength = wavelength
self.luminosity = luminosity
self.continuum = continuum
self.id = get_line_id(line_id)
def _make_line_from_lines(self, *lines):
"""
Create a line by combining other lines.
Args:
lines (Line)
Any number of Line objects to combine into a single line.
"""
# Ensure we've been handed lines
if any([not isinstance(line, Line) for line in lines]):
raise exceptions.InconsistentArguments(
"args passed to a Line must all be Lines. Did you mean to "
"pass keyword arguments for wavelength, luminosity and "
f"continuum? (Got: {[*lines]})"
)
# Combine the Line attributes (units are guaranteed here since the
# quantities are coming directly from a Line)
self.wavelength = np.mean([line._wavelength for line in lines], axis=0)
self.luminosity = np.sum([line._luminosity for line in lines], axis=0)
self.continuum = np.sum([line._continuum for line in lines], axis=0)
# Derive the line id
self.id = get_line_id([line.id for line in lines])
def __str__(self):
"""
Return a basic summary of the Line object.
Returns a string containing the id, wavelength, luminosity,
equivalent width, and flux if generated.
Returns:
summary (str)
Summary string containing the total mass formed and
lists of the available SEDs, lines, and images.
"""
# Set up string for printing
pstr = ""
# Add the content of the summary to the string to be printed
pstr += "-" * 10 + "\n"
pstr += f"SUMMARY OF {self.id}" + "\n"
pstr += f"wavelength: {self.wavelength:.1f}" + "\n"
if isinstance(self.luminosity, np.ndarray):
mean_lum = np.mean(self._luminosity)
pstr += f"Npart: {self.luminosity.size}\n"
pstr += (
f"<log10(luminosity/{self.luminosity.units})>: "
f"{np.log10(mean_lum):.2f}\n"
)
mean_eq = np.mean(self.equivalent_width)
pstr += f"<equivalent width>: {mean_eq:.0f}" + "\n"
mean_flux = np.mean(self.flux) if self.flux is not None else None
pstr += (
f"<log10(flux/{self.flux.units}): {np.log10(mean_flux):.2f}"
if self.flux is not None
else ""
)
else:
pstr += (
f"log10(luminosity/{self.luminosity.units}): "
f"{np.log10(self.luminosity):.2f}\n"
)
pstr += f"equivalent width: {self.equivalent_width:.0f}" + "\n"
pstr += (
f"log10(flux/{self.flux.units}): {np.log10(self.flux):.2f}"
if self.flux is not None
else ""
)
pstr += "-" * 10
return pstr
def __add__(self, second_line):
"""
Add another line to self.
Overloads + operator to allow direct addition of Line objects.
Returns
(Line)
New instance of Line containing both lines.
"""
return Line(self, second_line)
[docs] def sum(self):
"""
For objects containing lines of multiple particles sum them to produce
the integrated quantities.
"""
return Line(
line_id=self.id,
wavelength=self.wavelength,
luminosity=np.sum(self.luminosity),
continuum=np.sum(self.continuum),
)
[docs] def get_flux(self, cosmo, z):
"""
Calculate the line flux.
Args:
cosmo (astropy.cosmology.)
Astropy cosmology object.
z (float)
The redshift.
Returns:
flux (float)
Flux of the line in units of erg/s/cm2 by default.
"""
# Get the luminosity distance
luminosity_distance = (
cosmo.luminosity_distance(z).to("cm").value
) * cm
# Compute flux
self.flux = self.luminosity / (4 * np.pi * luminosity_distance**2)
return self.flux
[docs] def combine(self, lines):
"""
Combine this line with an arbitrary number of other lines.
This is important for combing >2 lines together since the simple
line1 + line2 + line3 addition of multiple lines will not correctly
average over all lines.
Args:
lines (Line)
Any number of Line objects to combine into a single line.
Returns:
(Line)
A new Line object containing the combined lines.
"""
# Ensure we've been handed lines
if any([not isinstance(line, Line) for line in lines]):
raise exceptions.InconsistentArguments(
"args passed to a Line must all be Lines. Did you mean to "
"pass keyword arguments for wavelength, luminosity and "
"continuum"
)
return Line(self, *lines)