from __future__ import absolute_import, division, print_function
from copy import copy
from enum import Enum
from itertools import product, combinations
import graphviz
import attr
import numpy as np
import pandas as pd
from .utilities import split, iter_flatten, filter_lines
from ..particle import Particle, SpinType, Par_mapping
from . import ampline
[docs]class LS(Enum):
'Line shapes supported (currently)'
RBW = 0
GSpline = 1
kMatrix = 2
FOCUS = 3
@attr.s(slots=True)
class AmplitudeChain:
particle = attr.ib()
daughters = attr.ib([], convert=lambda x: x if x else [])
lineshape = attr.ib(None)
spinfactor = attr.ib(None)
amp = attr.ib(1+0j, cmp=False, validator=attr.validators.instance_of(complex))
err = attr.ib(0+0j, cmp=False, validator=attr.validators.instance_of(complex))
fix = attr.ib(True, cmp=False, validator=attr.validators.instance_of(bool))
name = attr.ib(None)
def __attrs_post_init__(self):
if self.name is None:
self.name = self.particle.name
# Class members keep track of additions
all_particles = set()
final_particles = set()
# This is set class-wide, and only used when a line is made
cartesian = False
@classmethod
def from_matched_line(cls, mat):
'''
This operates on an already-matched line.
:param mat: The groupdict output of a match
:return: A new amplitude chain instance
'''
mat['particle'] = Particle.from_AmpGen(mat['name'])
if mat['particle'] not in cls.all_particles:
cls.all_particles |= {mat['particle']}
if mat['daughters']:
mat['daughters'] = [cls.from_matched_line(ampline.partial.match(x).groupdict()) for x in split(mat['daughters'])]
# This is only true if not a parital line
if 'fix_r' in mat:
# Slightly odd, since we use cartesian, so fixing theta or mag
# of amplitude only doesn't work
mat['fix'] = mat['fix_r'] == '2' and mat['fix_i'] == '2'
del mat['fix_r'], mat['fix_i']
if cls.cartesian:
mat['amp'] = float(mat['A']) + float(mat['theta'])*1j
mat['err'] = float(mat['theta']) + float(mat['dtheta'])*1j
else:
A = float(mat['A'])
dA = float(mat['dA'])
theta = float(mat['theta'])
dtheta = float(mat['dtheta'])
mat['amp'] = A* np.exp(theta*1j)
mat['err'] = ( (dA*np.cos(theta) + A*np.sin(dtheta) )
+ (dA*np.sin(theta) + A*np.cos(dtheta))*1j)
del mat['A'], mat['dA'], mat['theta'], mat['dtheta']
return cls(**mat)
def expand_lines(self, linelist):
'''
Take a DecayTree -> list of DecayTrees with each dead-end daughter
expanded to every possible combination. (recursive)
'''
# If the Tree has daugthers, run on those
if self.daughters:
dlist = [d.expand_lines(linelist) for d in self.daughters]
retlist = []
for ds in product(*dlist):
newd = copy(self)
newd.daughters = ds
retlist.append(newd)
return retlist
# If the tree ends
new_trees = [l for line in linelist if line.name == self.name for l in line.expand_lines(linelist)]
if new_trees:
return new_trees
else:
self.__class__.final_particles |= {self.particle}
return [self]
@property
def ls_enum(self):
if not self.lineshape:
return LS.RBW
elif 'GSpline.EFF' == self.lineshape:
return LS.GSpline
elif self.lineshape.startswith('kMatrix'):
return LS.kMatrix
elif self.lineshape.startswith('FOCUS'):
return LS.FOCUS
else:
raise RuntimeError("Unimplemented lineshape {0}".format(self.lineshape))
@property
def full_amp(self):
amp = self.amp
for d in self.daughters:
amp *= d.full_amp
return amp
def is_vertex(self):
return len(self) == 2
def is_strong(self):
if not self.is_vertex():
return None
return set(self.particle.quarks) == set(self[0].particle.quarks).union(set(self[1].particle.quarks))
def L_range(self, conserveParity=False):
S = self.particle.J
s1 = self[0].particle.J
s2 = self[1].particle.J
min_spin = abs(S-s1-s2)
min_spin = min(abs(S+s1-s2), min_spin)
min_spin = min(abs(S-s1+s2), min_spin)
max_spin = S + s1 + s2
if not conserveParity:
return (min_spin, max_spin)
else:
raise RuntimeError("Strong decays not implemented yet")
@property
def L(self):
if self.spinfactor:
return 'S P D F'.split().index(self.spinfactor)
min_L, _ = self.L_range()
return min_L # Ground state unless specified
def __len__(self):
return len(self.daughters)
def __getitem__(self, item):
return self.daughters[item]
def _get_html(self):
name = self.particle.html_name
if self.spinfactor or self.lineshape:
name += '<br/><br/>'
if self.spinfactor:
name += '<font color="blue">[' + self.spinfactor + ']</font>'
if self.lineshape:
name += '<font color="red">[' + self.lineshape + ']</font>'
return name
def _add_nodes(self, drawing):
name = self._get_html()
drawing.node(str(id(self)), "<" + name + ">")
for p in self.daughters:
drawing.edge(str(id(self)), str(id(p)))
p._add_nodes(drawing)
def _make_graphvis(self):
d = graphviz.Digraph()
d.attr(labelloc='t', label=str(self))
self._add_nodes(d)
return d
def _repr_svg_(self):
return self._make_graphvis()._repr_svg_()
@property
def vertexes(self):
verts = []
for d in self.daughters:
if d.is_vertex():
verts.append(d)
verts += d.vertexes
return verts
@property
def structure(self):
'''
The structure of the decay chain, simplified to only final state particles
'''
if self.daughters:
return [d.structure for d in self.daughters]
else:
return self.particle
def list_structure(self, final_states):
'''
The structure in the form [(0,1,2,3)], where the dual-list is used
for permutations for bose symmatrization.
So for final_states=[a,b,c,c], [a,c,[c,b]] would be:
[(0,2,3,1),(0,3,2,1)]
'''
structure = list(iter_flatten(self.structure))
if set(structure) - set(final_states):
raise RuntimeError("The final states must encompass all particles in final states!")
possibilities = [[i for i,v in enumerate(final_states) if v == name] for name in structure]
return [a for a in product(*possibilities) if len(set(a)) == len(a)]
def __str__(self):
name = str(self.particle)
if self.lineshape and self.spinfactor:
name += '[' + self.spinfactor + ';' + self.lineshape + ']'
elif self.lineshape:
name += '[' + self.lineshape + ']'
elif self.spinfactor:
name += '[' + self.spinfactor + ']'
if self.daughters:
name += '{'+','.join(map(str,self.daughters))+'}'
return name
@classmethod
def read_AmpGen(cls, filename=None, text=None):
'''
Read in an ampgen file
:param filename: Filename
:return: array of AmplitudeChains, parameters, constants, event type
'''
# Read the file in, ignore empty lines and comments
if filename is not None:
with open(filename) as f:
valid_lines = [l.strip().rstrip(',') for l in f if l and not l.startswith('#')]
elif text is not None:
valid_lines = [l.strip().rstrip(',') for l in text.splitlines() if l and not l.startswith('#')]
else:
raise RuntimeError("Must have filename or text")
# Collect known options
option_lines, valid_lines = filter_lines(ampline.settings, valid_lines)
# Collect lines with an = in them
relation_lines, valid_lines = filter_lines(ampline.inverted, valid_lines)
# Collect "normal" non-Cartesian lines
real_lines, valid_lines = filter_lines(ampline.dual, valid_lines)
# Collect Cartesian lines (need combining)
cart_lines, valid_lines = filter_lines(ampline.cartesian, valid_lines)
# The other lines do not need explicit filtering
variable_lines = [l for l in valid_lines if len(l.split()) == 4]
constant_lines = [l for l in valid_lines if len(l.split()) == 2]
# Process the options
all_states = None
for mat in option_lines:
if mat['name'] == 'EventType':
all_states = [Particle.from_AmpGen(name) for name in mat['value'].split()]
elif mat['name'] == 'FastCoherentSum::UseCartesian':
cls.cartesian = bool(mat['value'])
# Make sure this exists!
if all_states is None:
raise RuntimeError("EventType is missing! Cannot compute decay.")
# Combine dual line Cartesian lines into traditional cartesian lines
for a, b in combinations(cart_lines, 2):
if a['name'] == b['name']:
if a['cart'] == 'Re' and b['cart'] == 'Im':
pass
elif a['cart'] == 'Im' and b['cart'] == 'Re':
a,b = b,a
else:
raise RuntimeError("Can't process a line with *both* components Re or Im")
new_string = "{a[name]} {a[fix]} {a[amp]} {a[err]} {b[fix]} {b[amp]} {b[err]}".format(a=a,b=b)
real_lines.append(ampline.dual.match(new_string).groupdict())
# Make the partial lines and constants as dataframes
parameters = pd.DataFrame(((v.strip() for v in p.split()) for p in variable_lines),
columns='name fix value error'.split()).set_index('name')
constants = pd.DataFrame(((v.strip() for v in p.split()) for p in constant_lines),
columns='name value'.split()).set_index('name')
# Convert the matches into AmplitudeChains
line_arr = [cls.from_matched_line(a) for a in real_lines]
# Expand partial lines into complete lines
new_line_arr = [l for line in line_arr if line.particle == all_states[0] for l in line.expand_lines(line_arr)]
# Return
return new_line_arr, parameters, constants, all_states