From c1cb05b6c97913a82d099abed019b5ebf64db004 Mon Sep 17 00:00:00 2001 From: Eduardo Ploerer Date: Thu, 9 Jun 2022 19:37:00 +0200 Subject: [PATCH] adding coffea example --- examples/FCCee/coffea/analysis.py | 136 +++++++++++ examples/FCCee/coffea/finalSel_coffea.py | 176 +++++++++++++ examples/FCCee/coffea/finalSel_coffea_png.py | 244 +++++++++++++++++++ examples/FCCee/coffea/preSel.py | 20 ++ 4 files changed, 576 insertions(+) create mode 100644 examples/FCCee/coffea/analysis.py create mode 100644 examples/FCCee/coffea/finalSel_coffea.py create mode 100644 examples/FCCee/coffea/finalSel_coffea_png.py create mode 100644 examples/FCCee/coffea/preSel.py diff --git a/examples/FCCee/coffea/analysis.py b/examples/FCCee/coffea/analysis.py new file mode 100644 index 0000000000..95deda4951 --- /dev/null +++ b/examples/FCCee/coffea/analysis.py @@ -0,0 +1,136 @@ +import sys +import ROOT + +print ("Load cxx analyzers ... ",) +ROOT.gSystem.Load("libedm4hep") +ROOT.gSystem.Load("libpodio") +ROOT.gSystem.Load("libFCCAnalyses") +ROOT.gErrorIgnoreLevel = ROOT.kFatal +_edm = ROOT.edm4hep.ReconstructedParticleData() +_pod = ROOT.podio.ObjectID() +_fcc = ROOT.dummyLoader + +print ('edm4hep ',_edm) +print ('podio ',_pod) +print ('fccana ',_fcc) + +class analysis(): + + #__________________________________________________________ + def __init__(self, inputlist, outname, ncpu): + self.outname = outname + if ".root" not in outname: + self.outname+=".root" + + ROOT.ROOT.EnableImplicitMT(ncpu) + + self.df = ROOT.RDataFrame("events", inputlist) + print (" done") + #__________________________________________________________ + def run(self): + df2 = (self.df + + .Alias("Particle0", "Particle#0.index") + #.Alias("Particle_index", "Particle.index") + #.Alias("Particle1", "Particle#1.index") + .Alias("Jet3", "Jet#3.index") #alias for dealing with # in python + + + .Define("MC_px", "MCParticle::get_px(Particle)") + .Define("MC_py", "MCParticle::get_py(Particle)") + .Define("MC_pz", "MCParticle::get_pz(Particle)") + .Define("MC_p", "MCParticle::get_p(Particle)") + .Define("MC_e", "MCParticle::get_e(Particle)") + .Define("MC_theta", "MCParticle::get_theta(Particle)") + .Define("MC_pdg", "MCParticle::get_pdg(Particle)") + .Define("MC_status", "MCParticle::get_genStatus(Particle)") + + #selecting only final particles (status=1) + .Define("MC_final", "MCParticle::sel_genStatus(1)(Particle)") + + #momenta etc of final particles + .Define("MC_px_f", "MCParticle::get_px(MC_final)") + .Define("MC_py_f", "MCParticle::get_py(MC_final)") + .Define("MC_pz_f", "MCParticle::get_pz(MC_final)") + .Define("MC_e_f", "MCParticle::get_e(MC_final)") + .Define("MC_theta_f", "MCParticle::get_theta(MC_final)") + .Define("MC_pdg_f", "MCParticle::get_pdg(MC_final)") + + + #EE-KT ALGORITHM + + #build psedo-jets with the MC final particles (status = 0) + .Define("pseudo_jets", "JetClusteringUtils::set_pseudoJets(MC_px_f, MC_py_f, MC_pz_f, MC_e_f)") + + #run jet clustering with final state MC particles. ee_kt_algorithm, exclusive clustering, exactly 2 jets, E-scheme + .Define("FCCAnalysesJets_ee_kt", "JetClustering::clustering_ee_kt(2, 2, 1, 0)(pseudo_jets)") + + #get the jets out of the structure + .Define("jets_ee_kt", "JetClusteringUtils::get_pseudoJets(FCCAnalysesJets_ee_kt)") + + #get the jet constituents out of the structure + .Define("jetconstituents_ee_kt", "JetClusteringUtils::get_constituents(FCCAnalysesJets_ee_kt)") + + #get some jet variables + .Define("jets_ee_kt_e", "JetClusteringUtils::get_e(jets_ee_kt)") + .Define("jets_ee_kt_px", "JetClusteringUtils::get_px(jets_ee_kt)") + .Define("jets_ee_kt_py", "JetClusteringUtils::get_py(jets_ee_kt)") + .Define("jets_ee_kt_pz", "JetClusteringUtils::get_pz(jets_ee_kt)") + .Define("jets_ee_kt_flavour", "JetTaggingUtils::get_flavour(jets_ee_kt, Particle)") + ) + + + + + # select branches for output file + branchList = ROOT.vector('string')() + for branchName in [ + "MC_px", + "MC_py", + "MC_pz", + "MC_p", + "MC_e", + "MC_theta", + "MC_pdg", + "MC_status", + + "MC_px_f", + "MC_py_f", + "MC_pz_f", + "MC_e_f", + "MC_theta_f", + "MC_pdg_f", + + "jets_ee_kt_e", + "jets_ee_kt_px", + "jets_ee_kt_py", + "jets_ee_kt_pz", + "jets_ee_kt_flavour", + "jetconstituents_ee_kt", + ]: + branchList.push_back(branchName) + df2.Snapshot("events", self.outname, branchList) + +# example call for standalone file +# python examples/FCCee/higgs/mH-recoil/mumu/analysis.py /eos/experiment/fcc/ee/generation/DelphesEvents/fcc_tmp/p8_ee_ZH_ecm240/events_058720051.root +if __name__ == "__main__": + + if len(sys.argv)==1: + print ("usage:") + print ("python ",sys.argv[0]," file.root") + sys.exit(3) + infile = sys.argv[1] + outDir = sys.argv[0].replace(sys.argv[0].split('/')[0],'outputs').replace('analysis.py','')+'/' + import os + os.system("mkdir -p {}".format(outDir)) + outfile = outDir+infile.split('/')[-1] + ncpus = 0 + print ('outfile ',outfile) + analysis = analysis(infile, outfile, ncpus) + analysis.run() + + tf = ROOT.TFile(infile) + entries = tf.events.GetEntries() + p = ROOT.TParameter(int)( "eventsProcessed", entries) + outf=ROOT.TFile(outfile,"UPDATE") + p.Write() diff --git a/examples/FCCee/coffea/finalSel_coffea.py b/examples/FCCee/coffea/finalSel_coffea.py new file mode 100644 index 0000000000..8555b39da6 --- /dev/null +++ b/examples/FCCee/coffea/finalSel_coffea.py @@ -0,0 +1,176 @@ +import uproot +import awkward as ak +import h5py as h5 +import numpy as np +import matplotlib.pyplot as plt +from collections import defaultdict +from coffea import hist, processor +from coffea.nanoevents.methods import vector + + +ak.behavior.update(vector.behavior) + + + +# As detailed in the coffea docs, the core of the analysis script is the Processor class defined below. The Processor.process function will be called on every batch being processed. + +class MyProcessor(processor.ProcessorABC): + def __init__(self): + + + # We begin by defining the accumulator which will hold all quantities we intend to output. In this case it will be only the Z invariant mass and the flavour of the quarks. + + Z_accumulator = { + 'Z_mass': processor.column_accumulator(np.zeros([0])), + 'pid': processor.column_accumulator(np.zeros([0])), + } + self._accumulator = processor.dict_accumulator( Z_accumulator ) + + @property + def accumulator(self): + return self._accumulator + + + + def process(self, events): + + output = self.accumulator.identity() + + + # Here, the jets are defined LorentzVector classes (meaning several attributes and operations, such as jets.eta, will be accessible) + + jets = ak.zip({ + "t": events.jets_ee_kt_e, + "x": events.jets_ee_kt_px, + "y": events.jets_ee_kt_py, + "z": events.jets_ee_kt_pz, + "partonFlavour": events.jets_ee_kt_flavour, + }, with_name="LorentzVector") + + + # The (stable) gen level particles are defined similarly, but with their corresponding arrays + + GenPartsf = ak.zip({ + "t": events.MC_e_f, + "x": events.MC_px_f, + "y": events.MC_py_f, + "z": events.MC_pz_f, + "pdg": events.MC_pdg_f, + }, with_name="LorentzVector") + + + + # Here the flavour mask is defined by requiring both the quark and antiquark initiated jets to have the same flavour (which is non-negative). Unmatched or incorrectly matched jets result in a flavour of 0. + + up_mask = (np.abs(jets.partonFlavour[:,0])==2)&(np.abs(jets.partonFlavour[:,1])==2) + down_mask = (np.abs(jets.partonFlavour[:,0])==1)&(np.abs(jets.partonFlavour[:,1])==1) + strange_mask = (np.abs(jets.partonFlavour[:,0])==3)&(np.abs(jets.partonFlavour[:,1])==3) + charm_mask = (np.abs(jets.partonFlavour[:,0])==4)&(np.abs(jets.partonFlavour[:,1])==4) + bottom_mask = (np.abs(jets.partonFlavour[:,0])==5)&(np.abs(jets.partonFlavour[:,1])==5) + + pid = 2*up_mask+down_mask+3*strange_mask+4*charm_mask+5*bottom_mask + + + + # jets[:,::2] selects only the leading jet of an event, while jets[:,1::2] selects the subleading. In this way the Z candidates are computed, having the same LorentzVector structure as the jets from which they are computed. + + Z_cand = jets[:,::2]+jets[:,1::2] + + + + + #---------------------------------------------------------------------------------------------------------------------------------------------------------------------- + ######################### + #ASIDE: Jet constituents# + ######################### + + # Though not relevant in this simple example, in most cases one will want to have access to the jet constituents clustered into the first and second jets. This is possible by passing events.jetconstituents_ee_kt as a mask to the (stable) gen level particles. The events.jetconstituents_ee_kt array contains the indices of each (stable) gen level particle clustered into a given jet, and can thus be used to define a new collection consisting only the the particles in a given jet. + + # For instance, the jet constituents of the leading jet are defined as + + firstjetGenPartsf = ak.copy(GenPartsf[events.jetconstituents_ee_kt[:,0]]) + + # While the jet consituents of the sub-leading jet are defined as + + secondjetGenPartsf = ak.copy(GenPartsf[events.jetconstituents_ee_kt[:,1]]) + #------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + + # Here the Z candidates with undefined flavours are removed. Note that this syntax is general and could be used for any cuts e.g. kinematic + # Requiring a jet mass > 80 GeV would be + # Z_cand = Z_cand[Z_cand.mass>80] + + Z_cand = Z_cand[pid!=0] + pid = pid[pid!=0] + + + output['Z_mass']+= processor.column_accumulator(np.asarray(ak.flatten(Z_cand.mass))) + output['pid']+= processor.column_accumulator(np.asarray(pid)) + + return output + + def postprocess(self, accumulator): + return accumulator + +filename = "outputs/FCCee/coffea/p8_ee_Zuds_ecm91.root" +fileset = {'Zqq': [filename]} + +# The processor run_uproot_job loads the events via uproot and executes process function on the defined batches. The 'maxchunks' argument defined the maximum number of chunks that will be processed, while the 'chunksize' defines the number of events per chunk. The 'workers' argument defines how many parallel processes are run. + +output = processor.run_uproot_job(fileset, + treename='events', + processor_instance=MyProcessor(), + executor=processor.futures_executor, + executor_args={'schema': None, 'workers':4,}, + maxchunks =20, + chunksize = 5000, +) + + +########## +#PLOTTING# +########## + +Z_mass = output['Z_mass'].value +pid = output['pid'].value + +# Here the Z invariant mass is histogrammed using coffea histograms. Subsequently, the invariant mass and quark flavour is saved as an .h5 file. + +Z_cand = hist.Hist( + "Events", + hist.Cat("flavour", "Quark flavour"), + hist.Bin("m", r"$m_{q \bar{q}}$ [GeV]", 30, 90.75, 91.65), +) + +for i_f, flav in enumerate(["down", "up", "strange"]): + Z_cand.fill( + flavour=flav, + m=Z_mass[(pid==(i_f+1))], + ) + +ax = hist.plot1d( + Z_cand, + overlay="flavour", + stack=True, + fill_opts={'alpha': .5, 'edgecolor': (0,0,0,0.3)} +) + +ax.get_legend().shadow = True +ax.set_title(r'$Z \rightarrow q\bar{q}$ invariant mass') + +plt.savefig('examples/FCCee/coffea/Zpeak.pdf') + +h5_file = h5.File('examples/FCCee/coffea/Zpeak.h5', 'w') +h5_file.create_dataset("Z_mass", data=Z_mass) +h5_file.create_dataset("pid", data=pid) + + + + + + + + + + + + diff --git a/examples/FCCee/coffea/finalSel_coffea_png.py b/examples/FCCee/coffea/finalSel_coffea_png.py new file mode 100644 index 0000000000..ec15d74dcc --- /dev/null +++ b/examples/FCCee/coffea/finalSel_coffea_png.py @@ -0,0 +1,244 @@ +import uproot +import awkward as ak +import h5py as h5 +import numpy as np +import matplotlib.pyplot as plt +from collections import defaultdict +from coffea import hist, processor +from coffea.nanoevents.methods import vector + + +ak.behavior.update(vector.behavior) + + + +# As detailed in the coffea docs, the core of the analysis script is the Processor class defined below. The Processor.process function will be called on every batch being processed. + +class MyProcessor(processor.ProcessorABC): + def __init__(self): + + + # We begin by defining the accumulator which will hold all quantities we intend to output. In this case it will be only the Z invariant mass and the flavour of the quarks. + + Z_accumulator = { + 'Z_mass': processor.column_accumulator(np.zeros([0])), + 'pid': processor.column_accumulator(np.zeros([0])), + 'pid_Kl': processor.column_accumulator(np.zeros([0])), + 'Kl_p': processor.column_accumulator(np.zeros([0])), + 'dKl_theta': processor.column_accumulator(np.zeros([0])), + 'dKl_phi': processor.column_accumulator(np.zeros([0])), + } + self._accumulator = processor.dict_accumulator( Z_accumulator ) + + @property + def accumulator(self): + return self._accumulator + + + + def process(self, events): + + output = self.accumulator.identity() + + + # Here, the jets are defined LorentzVector classes (meaning several attributes and operations, such as jets.eta, will be accessible) + + jets = ak.zip({ + "t": events.jets_ee_kt_e, + "x": events.jets_ee_kt_px, + "y": events.jets_ee_kt_py, + "z": events.jets_ee_kt_pz, + "partonFlavour": events.jets_ee_kt_flavour, + }, with_name="LorentzVector") + + + # The (stable) gen level particles are defined similarly, but with their corresponding arrays + + GenPartsf = ak.zip({ + "t": events.MC_e_f, + "x": events.MC_px_f, + "y": events.MC_py_f, + "z": events.MC_pz_f, + "pdg": events.MC_pdg_f, + }, with_name="LorentzVector") + + + + # Here the flavour mask is defined by requiring both the quark and antiquark initiated jets to have the same flavour (which is non-negative). Unmatched or incorrectly matched jets result in a flavour of 0. + + up_mask = (np.abs(jets.partonFlavour[:,0])==2)&(np.abs(jets.partonFlavour[:,1])==2) + down_mask = (np.abs(jets.partonFlavour[:,0])==1)&(np.abs(jets.partonFlavour[:,1])==1) + strange_mask = (np.abs(jets.partonFlavour[:,0])==3)&(np.abs(jets.partonFlavour[:,1])==3) + charm_mask = (np.abs(jets.partonFlavour[:,0])==4)&(np.abs(jets.partonFlavour[:,1])==4) + bottom_mask = (np.abs(jets.partonFlavour[:,0])==5)&(np.abs(jets.partonFlavour[:,1])==5) + + pid = 2*up_mask+down_mask+3*strange_mask+4*charm_mask+5*bottom_mask + + + + + # Here cuts are implemented theta cut > ~ 14deg |p|>40 GeV + + jets_mask = (np.cos(jets.theta)<0.97) & (jets.p>40) + + event_mask = (ak.num(jets[jets_mask].theta, axis=1)==2) & (pid!=0) + + + jets_cut = ak.copy(jets[event_mask]) + GenPartsf_cut = ak.copy(GenPartsf[event_mask]) + pid = pid[event_mask] + + # A remark: It is important to cut on jet constituents after having assigned to constituents to their respective jet, as these cuts jet consitutent arrays will no longer match the indices + # given in events.jetconstituents_ee_kt. In this example we consider only the leading jet. + + # Here cuts are imposed on the jetConstituents of the leading jet theta cut > ~ 14 deg pt>500 GeV + leadingjetGenPartsf = ak.copy(GenPartsf[events.jetconstituents_ee_kt[:,0]]) + + leadingjetGenPartsf = leadingjetGenPartsf[event_mask] + + jetConstituents_mask = (np.cos(leadingjetGenPartsf.theta)<0.97) & (leadingjetGenPartsf.r>0.5) + leadingjetGenPartsf = leadingjetGenPartsf[jetConstituents_mask] + + + # Here we compute first the Z_candidates + + Z_cand = jets_cut[:,0] + jets_cut[:,1] + + # Here we compute |p| distribution of long Kaons in the leading jet + + longKaon_mask = (np.abs(leadingjetGenPartsf.pdg)==130) + longKaon_p = leadingjetGenPartsf[longKaon_mask].p + + # Here we compute the angular distance of long Kaons in the leading jet + + theta_diff = jets_cut.theta[:,0] - leadingjetGenPartsf[longKaon_mask].theta + phi_diff = jets_cut.phi[:,0] - leadingjetGenPartsf[longKaon_mask].phi + phi_diff = ((phi_diff+np.pi)%(2*np.pi))-np.pi + + + pid_Kl = ak.flatten(ak.broadcast_arrays(pid, phi_diff)[0], axis=None) + + output['Z_mass']+= processor.column_accumulator(np.asarray(Z_cand.mass)) + output['pid']+= processor.column_accumulator(np.asarray(pid)) + output['pid_Kl']+= processor.column_accumulator(np.asarray(pid_Kl)) + output['Kl_p']+= processor.column_accumulator(np.asarray(ak.flatten(longKaon_p))) + output['dKl_theta']+= processor.column_accumulator(np.asarray(ak.flatten(theta_diff))) + output['dKl_phi']+= processor.column_accumulator(np.asarray(ak.flatten(phi_diff))) + + return output + + def postprocess(self, accumulator): + return accumulator + +filename = "outputs/FCCee/Zqq/p8_ee_Zuds_ecm91.root" +fileset = {'Zqq': [filename]} + +# The processor run_uproot_job loads the events via uproot and executes process function on the defined batches. The 'maxchunks' argument defined the maximum number of chunks that will be processed, while the 'chunksize' defines the number of events per chunk. The 'workers' argument defines how many parallel processes are run. + +output = processor.run_uproot_job(fileset, + treename='events', + processor_instance=MyProcessor(), + executor=processor.futures_executor, + executor_args={'schema': None, 'workers':4,}, + maxchunks =20, + chunksize = 5000, +) + + +########## +#PLOTTING# +########## + +Z_mass = output['Z_mass'].value +pid = output['pid'].value +pid_Kl = output['pid_Kl'].value +Kl_p = output['Kl_p'].value +dKl_theta = output['dKl_theta'].value +dKl_phi = output['dKl_phi'].value + + +# Here the Z invariant mass is histogrammed using coffea histograms. + +Z_cand_hist = hist.Hist( + "Events", + hist.Cat("flavour", "Quark flavour"), + hist.Bin("m", r"$m_{q \bar{q}}$ [GeV]", 30, 90.75, 91.65), +) + +for i_f, flav in enumerate(["down", "up", "strange"]): + Z_cand_hist.fill( + flavour=flav, + m=Z_mass[(pid==(i_f+1))], + ) + +ax = hist.plot1d( + Z_cand_hist, + overlay="flavour", + stack=True, + fill_opts={'alpha': .5, 'edgecolor': (0,0,0,0.3)} +) + +ax.get_legend().shadow = True +ax.set_title(r'$Z \rightarrow q\bar{q}$ invariant mass') + +plt.savefig('examples/FCCee/Zqq/Zpeak.png') + +plt.clf() + +Kl_p_hist = hist.Hist( + "Constituent Count", + hist.Cat("flavour", "Quark flavour"), + hist.Bin("p", "|$p$| [GeV]", 38, 0, 37), +) + +for i_f, flav in enumerate(["down", "up", "strange"]): + Kl_p_hist.fill( + flavour=flav, + p=Kl_p[(pid_Kl==(i_f+1))], + ) + +ax = hist.plot1d( + Kl_p_hist, + overlay="flavour", + stack=True, + fill_opts={'alpha': .5, 'edgecolor': (0,0,0,0.3)} +) + + +ax.get_legend().shadow = True +ax.set_title(r'$K_{L}$ |p| distribution in $Z \rightarrow q\bar{q}$ leading jets') + +plt.savefig('examples/FCCee/Zqq/Kl_p.pdf') + +plt.clf() + + +Kl_angle_hist = hist.Hist( + "Constituent Count", + hist.Bin("theta", r"$\Delta \theta$ [rad]", 29, -0.5, 0.5), + hist.Bin("phi", r"$\Delta \phi$ [rad]", 29, -0.5, 0.5), +) + + +Kl_angle_hist.fill(theta=dKl_theta[(pid_Kl==3)], phi=dKl_phi[(pid_Kl==3)]) + +ax = hist.plot2d( + Kl_angle_hist, + xaxis = "phi", +) + +ax.set_title(r'$K_{L}$ distribution in $Z \rightarrow s\bar{s}$ leading jets') + +plt.savefig('examples/FCCee/Zqq/Kl_angle_hist.pdf') + + + + + + + + + + + + diff --git a/examples/FCCee/coffea/preSel.py b/examples/FCCee/coffea/preSel.py new file mode 100644 index 0000000000..640692488a --- /dev/null +++ b/examples/FCCee/coffea/preSel.py @@ -0,0 +1,20 @@ +#python examples/FCCee/KG/preSel.py + +from config.common_defaults import deffccdicts +import os + +basedir=os.path.join(os.getenv('FCCDICTSDIR', deffccdicts), '') + "yaml/FCCee/spring2021/IDEA/" +outdir="outputs/FCCee/coffea/" + +import multiprocessing +NUM_CPUS = int(multiprocessing.cpu_count()-2) + +process_list=['p8_ee_Zuds_ecm91'] + +#Very low fraction chosen to indicate single file +fraction=0.000001 +#fraction=0.0001 + +import config.runDataFrame as rdf +myana=rdf.runDataFrame(basedir,process_list) +myana.run(ncpu=NUM_CPUS,fraction=fraction,outDir=outdir)