Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.jlab.jnp.hipo4.data.Bank;
import org.jlab.jnp.hipo4.data.Event;
Expand All @@ -22,6 +23,9 @@
import org.jlab.detector.scalers.DaqScalersSequence;
import org.jlab.detector.helicity.HelicityBit;
import org.jlab.detector.helicity.HelicitySequenceDelayed;
import org.jlab.jnp.hipo4.io.HipoWriterSorted;
import org.jlab.utils.options.OptionParser;
import org.jlab.utils.system.ClasUtilsFile;

/**
*
Expand All @@ -35,10 +39,13 @@ public class Processor {

private final String outputPrefix = "tmp_";

private Bank runConfig = null;
private Bank recEvent = null;
private ConstantsManager conman = null;
private SchemaFactory schemaFactory = null;
private DaqScalersSequence chargeSequence = null;
private HelicitySequenceDelayed helicitySequence = null;
private TreeMap<Integer,Integer> eventUnix = null;

public Processor(File file, boolean restream, boolean rebuild) {
configure(Arrays.asList(file.getAbsolutePath()), restream, rebuild);
Expand All @@ -52,25 +59,34 @@ public Processor(String dir, String glob, boolean restream, boolean rebuild) {
configure(findPreloadFiles(dir,glob), restream, rebuild);
}

public Processor(List<String> files, boolean restream, boolean rebuild) {
configure(files, restream, rebuild);
}

public Processor(SchemaFactory schema, HelicitySequenceDelayed h, DaqScalersSequence s) {
conman = new ConstantsManager();
conman.init(CCDB_TABLES);
schemaFactory = schema;
helicitySequence = h;
chargeSequence = s;
runConfig = new Bank(schemaFactory.getSchema("RUN::config"));
recEvent = new Bank(schemaFactory.getSchema("REC::Event"));
}

private void configure(List<String> preloadFiles, boolean restream, boolean rebuild) {
if (!preloadFiles.isEmpty()) {
HipoReader r = new HipoReader();
r.open(preloadFiles.get(0));
schemaFactory = r.getSchemaFactory();
r.close();
runConfig = new Bank(schemaFactory.getSchema("RUN::config"));
recEvent = new Bank(schemaFactory.getSchema("REC::Event"));
conman = new ConstantsManager();
conman.init(CCDB_TABLES);
schemaFactory = r.getSchemaFactory();
helicitySequence = Util.getHelicity(preloadFiles, schemaFactory, restream, conman);
if (rebuild) chargeSequence = DaqScalersSequence.rebuildSequence(1, conman, preloadFiles);
else chargeSequence = DaqScalersSequence.readSequence(preloadFiles);
r.close();
eventUnix = getEventUnixMap(schemaFactory, preloadFiles);
}
}

Expand All @@ -92,117 +108,182 @@ private static List<String> findPreloadFiles(String dir, String glob) {
return ret;
}

/**
* Load the mapping from event number to unix time
* @param schema
* @param files
* @return map
*/
public static TreeMap<Integer,Integer> getEventUnixMap(SchemaFactory schema, List<String> files) {
Bank unix = new Bank(schema.getSchema("RUN::unix"));
TreeMap<Integer,Integer> m = new TreeMap<>();
Event e = new Event();
for (String f : files) {
HipoReader r = new HipoReader();
r.setTags(1);
r.open(f);
while (r.hasNext()) {
r.nextEvent(e);
e.read(unix);
int size = unix.getRows();
for (int i=0; i<size; i++) {
m.put(unix.getInt("event",i), unix.getInt("unixtime",i));
}
}
r.close();
}
return m;
}

/**
* Modify REC::Event/HEL::scaler with the delay-corrected helicity
* @param event
* @param runConfig
* @param recEvent
* @param runcfg
* @param recevt
*/
private void processEventHelicity(DataEvent event, DataBank runConfig, DataBank recEvent) {
HelicityBit hb = helicitySequence.search(runConfig.getLong("timestamp", 0));
private void processEventHelicity(DataEvent event, DataBank runcfg, DataBank recevt) {
HelicityBit hb = helicitySequence.search(runcfg.getLong("timestamp", 0));
HelicityBit hbraw = helicitySequence.getHalfWavePlate() ? HelicityBit.getFlipped(hb) : hb;
recEvent.setByte("helicity",0,hb.value());
recEvent.setByte("helicityRaw",0,hbraw.value());
recevt.setByte("helicity",0,hb.value());
recevt.setByte("helicityRaw",0,hbraw.value());
DataBank helScaler = event.getBank("HEL::scaler");
if (helScaler.rows()>0) {
event.removeBank("HEL::scaler");
Util.assignScalerHelicity(runConfig.getLong("timestamp",0), ((HipoDataBank)helScaler).getBank(), helicitySequence);
Util.assignScalerHelicity(runcfg.getLong("timestamp",0), ((HipoDataBank)helScaler).getBank(), helicitySequence);
event.appendBank(helScaler);
}
}

/**
* Modify REC::Event/HEL::scaler with the delay-corrected helicity
* @param event
* @param runConfig
* @param recEvent
* @param runcfg
* @param recevt
*/
private void processEventHelicity(Event event, Bank runConfig, Bank recEvent) {
HelicityBit hb = helicitySequence.search(runConfig.getLong("timestamp", 0));
private void processEventHelicity(Event event, Bank runcfg, Bank recevt) {
HelicityBit hb = helicitySequence.search(runcfg.getLong("timestamp", 0));
HelicityBit hbraw = helicitySequence.getHalfWavePlate() ? HelicityBit.getFlipped(hb) : hb;
recEvent.setByte("helicity",0,hb.value());
recEvent.setByte("helicityRaw",0,hbraw.value());
recevt.setByte("helicity",0,hb.value());
recevt.setByte("helicityRaw",0,hbraw.value());
Bank helScaler = new Bank(schemaFactory.getSchema("HEL::scaler"));
event.read(helScaler);
if (helScaler.getRows()>0) {
event.remove(schemaFactory.getSchema("HEL::scaler"));
Util.assignScalerHelicity(runConfig.getLong("timestamp",0), helScaler, helicitySequence);
Util.assignScalerHelicity(runcfg.getLong("timestamp",0), helScaler, helicitySequence);
event.write(helScaler);
}
}

/**
* Modify REC::Event for beam charge and livetime
* @param runConfig
* @param recEvent
* @param runcfg
* @param recevt
*/
private void processEventScalers(DataBank runConfig, DataBank recEvent) {
DaqScalers ds = chargeSequence.get(runConfig.getLong("timestamp", 0));
private void processEventScalers(DataBank runcfg, DataBank recevt) {
DaqScalers ds = chargeSequence.get(runcfg.getLong("timestamp", 0));
if (ds != null) {
recEvent.setFloat("beamCharge",0, (float) ds.dsc2.getBeamChargeGated());
recEvent.setDouble("liveTime",0,ds.dsc2.getLivetime());
recevt.setFloat("beamCharge",0, (float) ds.dsc2.getBeamChargeGated());
recevt.setDouble("liveTime",0,ds.dsc2.getLivetime());
}
}

/**
* Modify REC::Event for beam charge and livetime
* @param runConfig
* @param recEvent
* @param runcfg
* @param recevt
*/
private void processEventScalers(Bank runConfig, Bank recEvent) {
DaqScalers ds = chargeSequence.get(runConfig.getLong("timestamp", 0));
private void processEventScalers(Bank runcfg, Bank recevt) {
DaqScalers ds = chargeSequence.get(runcfg.getLong("timestamp", 0));
if (ds != null) {
recEvent.putFloat("beamCharge",0, (float) ds.dsc2.getBeamChargeGated());
recEvent.putDouble("liveTime",0,ds.dsc2.getLivetime());
recevt.putFloat("beamCharge",0, (float) ds.dsc2.getBeamChargeGated());
recevt.putDouble("liveTime",0,ds.dsc2.getLivetime());
}
}

/**
* Modify REC::Event for beam charge and livetime
* @param runcfg
* @param runcfg
*/
private void processEventUnix(Event event, Bank runcfg) {
if (runcfg.getRows() > 0) {
Integer unix = eventUnix.get(eventUnix.floorKey(runcfg.getInt("event",0)));
if (unix != null) {
event.remove(runcfg.getSchema());
runcfg.putInt("unixtime", 0, unix);
event.write(runcfg);
}
}
}

/**
* Modify REC::Event for beam charge and livetime
* @param runcfg
* @param runcfg
*/
private void processEventUnix(DataEvent event, DataBank runcfg) {
if (runcfg.rows() > 0) {
Integer unix = eventUnix.get(eventUnix.floorKey(runcfg.getInt("event",0)));
if (unix != null) {
event.removeBank(runcfg.getDescriptor().getName());
runcfg.setInt("unixtime", 0, unix);
event.appendBank(runcfg);
}
}
}

/**
* Postprocess one event
* @param e
* @param event
*/
public void processEvent(DataEvent e) {
if (!e.hasBank("RUN::config")) return;
if (!e.hasBank("REC::Event")) return;
DataBank runConfig = e.getBank("RUN::config");
DataBank recEvent = e.getBank("REC::Event");
if (runConfig.rows()<1 || recEvent.rows()<1) return;
e.removeBank("REC::Event");
if (helicitySequence != null) processEventHelicity(e, runConfig, recEvent);
if (chargeSequence != null) processEventScalers(runConfig, recEvent);
e.appendBank(recEvent);
public void processEvent(DataEvent event) {
if (event.hasBank("RUN::config")) {
DataBank runcfg = event.getBank("RUN::config");
if (runcfg.rows() > 0) {
processEventUnix(event, runcfg);
if (event.hasBank("REC::Event")) {
DataBank recevt = event.getBank("REC::Event");
if (recevt.rows() > 0) {
event.removeBank("REC::Event");
if (helicitySequence != null) processEventHelicity(event, runcfg, recevt);
if (chargeSequence != null) processEventScalers(runcfg, recevt);
event.appendBank(recevt);
}
}
}
}
}

/**
* Postprocess one event
* @param e
* @param event
*/
public void processEvent(Event e) {
if (!e.hasBank(schemaFactory.getSchema("RUN::config"))) return;
if (!e.hasBank(schemaFactory.getSchema("REC::Event"))) return;
Bank runConfig = new Bank(schemaFactory.getSchema("RUN::config"));
Bank recEvent = new Bank(schemaFactory.getSchema("REC::Event"));
e.read(runConfig);
e.read(recEvent);
if (runConfig.getRows()<1 || recEvent.getRows()<1) return;
e.remove(schemaFactory.getSchema("REC::Event"));
if (helicitySequence != null) processEventHelicity(e, runConfig, recEvent);
if (chargeSequence != null) processEventScalers(runConfig, recEvent);
e.write(recEvent);
public void processEvent(Event event) {
event.read(runConfig);
event.read(recEvent);
if (runConfig.getRows() > 0) {
processEventUnix(event, runConfig);
if (recEvent.getRows() > 0) {
event.remove(recEvent.getSchema());
if (helicitySequence != null) processEventHelicity(event, runConfig, recEvent);
if (chargeSequence != null) processEventScalers(runConfig, recEvent);
event.write(recEvent);
}
}
}

/**
* Create rebuilt files from preload files.
* @param preloadFiles
* @param files
* @return map of rebuilt:preload files
*/
private Map<String,String> rebuild(String dir, List<String> preloadFiles) {
private Map<String,String> rebuild(String dir, List<String> files) {
File d = new File(dir);
if (!d.canWrite()) {
throw new RuntimeException("No write permissions on "+dir);
}
Map<String,String> rebuiltFiles = new HashMap<>();
for (String preloadFile : preloadFiles) {
for (String preloadFile : files) {
String rebuiltFile = dir+"/"+outputPrefix+preloadFile.replace(dir+"/","");
Util.rebuildScalers(conman, preloadFile, rebuiltFile);
rebuiltFiles.put(rebuiltFile,preloadFile);
Expand All @@ -221,8 +302,48 @@ private static void replace(Map<String,String> files) {
}
}

/**
* The "postprocess" program.
* @param args
*/
public static void main(String args[]) {
Processor p = new Processor(System.getenv("HOME")+"/tmp","r*.hipo",false,false);

OptionParser o = new OptionParser("postprocess");
o.addOption("-f","0","reflip: rebuild the HEL::flip bank");
o.addOption("-c","0","recharge: rebuild the RUN/HEL::scaler banks");
o.addOption("-o",null,"merged output file path");
o.setRequiresInputList(true);
o.parse(args);

boolean restream = !o.getOption("-f").isDefault();
boolean rebuild = !o.getOption("-c").isDefault();

Processor post = new Processor(o.getInputList(), restream, rebuild);

HipoWriterSorted writer = null;

if (!o.getOption("-o").isDefault()) {
writer = new HipoWriterSorted();
SchemaFactory schema = writer.getSchemaFactory();
schema.initFromDirectory(ClasUtilsFile.getResourceDir("CLAS12DIR", "etc/bankdefs/hipo4"));
writer.setCompressionType(2);
writer.open(o.getOption("-o").stringValue());
}

for (String f : o.getInputList()) {
HipoReader reader = new HipoReader();
reader.setTags(0);
reader.open(f);
Event event = new Event();
while (reader.hasNext()) {
reader.nextEvent(event);
post.processEvent(event);
if (writer != null) writer.addEvent(event);
}
reader.close();
}

if (writer != null) writer.close();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jlab.analysis.postprocess;

import java.util.TreeMap;
import java.util.logging.Logger;
import org.jlab.clas.reco.ReconstructionEngine;
import org.jlab.detector.calib.utils.ConstantsManager;
Expand Down Expand Up @@ -100,6 +101,9 @@ public static void main(String[] args) {
helSeq.initialize(parser.getInputList());
}

// Initialize the unix-event map:
TreeMap<Integer,Integer> eventUnix = Processor.getEventUnixMap(schema, parser.getInputList());

// Loop over the input HIPO files:
LOGGER.info("\n>>> Starting post-processing ...\n");
for (String filename : parser.getInputList()) {
Expand Down Expand Up @@ -147,6 +151,17 @@ public static void main(String[] args) {
event.write(recEventBank);
event.write(helScalerBank);

// Update RUN::config.unixtime:
if (runConfigBank.getRows() > 0) {
int evno = runConfigBank.getByte("event", 0);
Integer unix = eventUnix.get(eventUnix.floorKey(evno));
if (unix != null) {
event.remove(runConfigBank.getSchema());
runConfigBank.putInt("unixtime", 0, unix);
event.write(runConfigBank);
}
}

// Write out the original event:
writer.addEvent(event, event.getEventTag());

Expand Down