diff --git a/bin/postprocess2 b/bin/postprocess2 new file mode 100755 index 0000000000..b10c9ff07a --- /dev/null +++ b/bin/postprocess2 @@ -0,0 +1,10 @@ +#!/bin/bash + +. `dirname $0`/../libexec/env.sh + +export MALLOC_ARENA_MAX=1 + +java ${JAVA_OPTS-} -Xmx768m -Xms768m -XX:+UseSerialGC \ + -cp ${COATJAVA_CLASSPATH:-''} \ + org.jlab.analysis.postprocess.Processor \ + $* diff --git a/common-tools/clara-io/src/main/java/org/jlab/io/clara/Clas12Writer.java b/common-tools/clara-io/src/main/java/org/jlab/io/clara/Clas12Writer.java index 6a36e60488..a094b939cd 100644 --- a/common-tools/clara-io/src/main/java/org/jlab/io/clara/Clas12Writer.java +++ b/common-tools/clara-io/src/main/java/org/jlab/io/clara/Clas12Writer.java @@ -2,6 +2,7 @@ import java.io.File; import java.nio.file.Path; +import java.util.List; import java.util.TreeMap; import java.util.TreeSet; import org.jlab.analysis.postprocess.Processor; @@ -145,9 +146,9 @@ private Event getUnixEvent(Bank config) { */ private void postprocess() { int d = conman.getConstants(getRunNumber(), "/runcontrol/helicity").getIntValue("delay",0,0,0); - HelicitySequenceDelayed h = new HelicitySequenceDelayed(d); - h.addStream(helicities); - Processor p = new Processor(fullSchema, h, scalers); + HelicitySequenceDelayed helicity = new HelicitySequenceDelayed(d); + helicity.addStream(helicities); + Processor p = new Processor(List.of(filename), fullSchema, helicity, scalers); HipoReader r = new HipoReader(); r.open(filename); Event e = new Event(); diff --git a/common-tools/clas-analysis/src/main/java/org/jlab/analysis/postprocess/Processor.java b/common-tools/clas-analysis/src/main/java/org/jlab/analysis/postprocess/Processor.java index f5f4ca1cee..70bdf6ddba 100644 --- a/common-tools/clas-analysis/src/main/java/org/jlab/analysis/postprocess/Processor.java +++ b/common-tools/clas-analysis/src/main/java/org/jlab/analysis/postprocess/Processor.java @@ -1,13 +1,7 @@ package org.jlab.analysis.postprocess; -import java.io.File; -import java.nio.file.FileSystems; -import java.nio.file.PathMatcher; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.TreeMap; import org.jlab.jnp.hipo4.data.Bank; import org.jlab.jnp.hipo4.data.Event; @@ -22,6 +16,9 @@ import org.jlab.detector.scalers.DaqScalersSequence; import org.jlab.detector.helicity.HelicityBit; import org.jlab.detector.helicity.HelicitySequenceDelayed; +import org.jlab.jnp.hipo4.io.HipoWriterSorted; +import org.jlab.utils.options.OptionParser; +import org.jlab.utils.system.ClasUtilsFile; /** * @@ -31,82 +28,81 @@ public class Processor { public static final String CCDB_TABLES[] = {"/runcontrol/fcup","/runcontrol/slm", "/runcontrol/helicity","/daq/config/scalers/dsc1","/runcontrol/hwp"}; - public static final String DEF_PRELOAD_GLOB = "*.{hipo,h5}"; - - private final String outputPrefix = "tmp_"; + private Bank runConfig = null; + private Bank recEvent = null; private ConstantsManager conman = null; private SchemaFactory schemaFactory = null; private DaqScalersSequence chargeSequence = null; private HelicitySequenceDelayed helicitySequence = null; + private TreeMap eventUnix = null; - public Processor(File file, boolean restream, boolean rebuild) { - configure(Arrays.asList(file.getAbsolutePath()), restream, rebuild); - } - - public Processor(String dir, boolean restream, boolean rebuild) { - configure(findPreloadFiles(dir,DEF_PRELOAD_GLOB), restream, rebuild); - } - - public Processor(String dir, String glob, boolean restream, boolean rebuild) { - configure(findPreloadFiles(dir,glob), restream, rebuild); - } - - public Processor(SchemaFactory schema, HelicitySequenceDelayed h, DaqScalersSequence s) { + public Processor(List files, boolean restream, boolean rebuild) { + HipoReader r = new HipoReader(); + r.open(files.get(0)); + schemaFactory = r.getSchemaFactory(); + r.close(); + runConfig = new Bank(schemaFactory.getSchema("RUN::config")); + recEvent = new Bank(schemaFactory.getSchema("REC::Event")); conman = new ConstantsManager(); conman.init(CCDB_TABLES); + helicitySequence = Util.getHelicity(files, schemaFactory, restream, conman); + if (rebuild) chargeSequence = DaqScalersSequence.rebuildSequence(1, conman, files); + else chargeSequence = DaqScalersSequence.readSequence(files); + eventUnix = getEventUnixMap(schemaFactory, files); + } + + public Processor(List files, SchemaFactory schema, HelicitySequenceDelayed h, DaqScalersSequence s) { schemaFactory = schema; helicitySequence = h; chargeSequence = s; - } - - private void configure(List preloadFiles, boolean restream, boolean rebuild) { - if (!preloadFiles.isEmpty()) { - HipoReader r = new HipoReader(); - r.open(preloadFiles.get(0)); - conman = new ConstantsManager(); - conman.init(CCDB_TABLES); - schemaFactory = r.getSchemaFactory(); - helicitySequence = Util.getHelicity(preloadFiles, schemaFactory, restream, conman); - if (rebuild) chargeSequence = DaqScalersSequence.rebuildSequence(1, conman, preloadFiles); - else chargeSequence = DaqScalersSequence.readSequence(preloadFiles); - r.close(); - } + runConfig = new Bank(schemaFactory.getSchema("RUN::config")); + recEvent = new Bank(schemaFactory.getSchema("REC::Event")); + conman = new ConstantsManager(); + conman.init(CCDB_TABLES); + eventUnix = getEventUnixMap(schemaFactory, files); } /** - * Get a list of files to preload, from one directory and a glob. - * @param dir - * @param glob - * @return list of preload files + * Load the mapping from event number to unix time + * @param schema + * @param files + * @return map */ - private static List findPreloadFiles(String dir, String glob) { - List ret = new ArrayList<>(); - if (dir != null) { - PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:"+dir+"/"+glob); - for (File f : (new File(dir)).listFiles()) { - if (matcher.matches(f.toPath())) - ret.add(f.getPath()); + public static TreeMap getEventUnixMap(SchemaFactory schema, List files) { + TreeMap m = new TreeMap<>(); + Event e = new Event(); + Bank b = schema.getBank("RUN::unix");//new Bank(schema.getSchema("RUN::unix")); + for (String f : files) { + HipoReader r = new HipoReader(); + r.setTags(1); + r.open(f); + while (r.hasNext()) { + r.nextEvent(e); + e.read(b); + int size = b.getRows(); + for (int i=0; i0) { event.removeBank("HEL::scaler"); - Util.assignScalerHelicity(runConfig.getLong("timestamp",0), ((HipoDataBank)helScaler).getBank(), helicitySequence); + Util.assignScalerHelicity(runcfg.getLong("timestamp",0), ((HipoDataBank)helScaler).getBank(), helicitySequence); event.appendBank(helScaler); } } @@ -114,115 +110,165 @@ private void processEventHelicity(DataEvent event, DataBank runConfig, DataBank /** * Modify REC::Event/HEL::scaler with the delay-corrected helicity * @param event - * @param runConfig - * @param recEvent + * @param runcfg + * @param recevt */ - private void processEventHelicity(Event event, Bank runConfig, Bank recEvent) { - HelicityBit hb = helicitySequence.search(runConfig.getLong("timestamp", 0)); + private void processEventHelicity(Event event, Bank runcfg, Bank recevt) { + HelicityBit hb = helicitySequence.search(runcfg.getLong("timestamp", 0)); HelicityBit hbraw = helicitySequence.getHalfWavePlate() ? HelicityBit.getFlipped(hb) : hb; - recEvent.setByte("helicity",0,hb.value()); - recEvent.setByte("helicityRaw",0,hbraw.value()); + recevt.putByte("helicity",0,hb.value()); + recevt.putByte("helicityRaw",0,hbraw.value()); Bank helScaler = new Bank(schemaFactory.getSchema("HEL::scaler")); event.read(helScaler); if (helScaler.getRows()>0) { event.remove(schemaFactory.getSchema("HEL::scaler")); - Util.assignScalerHelicity(runConfig.getLong("timestamp",0), helScaler, helicitySequence); + Util.assignScalerHelicity(runcfg.getLong("timestamp",0), helScaler, helicitySequence); event.write(helScaler); } } /** * Modify REC::Event for beam charge and livetime - * @param runConfig - * @param recEvent + * @param runcfg + * @param recevt */ - private void processEventScalers(DataBank runConfig, DataBank recEvent) { - DaqScalers ds = chargeSequence.get(runConfig.getLong("timestamp", 0)); + private void processEventScalers(DataBank runcfg, DataBank recevt) { + DaqScalers ds = chargeSequence.get(runcfg.getLong("timestamp", 0)); if (ds != null) { - recEvent.setFloat("beamCharge",0, (float) ds.dsc2.getBeamChargeGated()); - recEvent.setDouble("liveTime",0,ds.dsc2.getLivetime()); + recevt.setFloat("beamCharge",0, (float) ds.dsc2.getBeamChargeGated()); + recevt.setDouble("liveTime",0,ds.dsc2.getLivetime()); } } /** * Modify REC::Event for beam charge and livetime - * @param runConfig - * @param recEvent + * @param runcfg + * @param recevt */ - private void processEventScalers(Bank runConfig, Bank recEvent) { - DaqScalers ds = chargeSequence.get(runConfig.getLong("timestamp", 0)); + private void processEventScalers(Bank runcfg, Bank recevt) { + DaqScalers ds = chargeSequence.get(runcfg.getLong("timestamp", 0)); if (ds != null) { - recEvent.putFloat("beamCharge",0, (float) ds.dsc2.getBeamChargeGated()); - recEvent.putDouble("liveTime",0,ds.dsc2.getLivetime()); + recevt.putFloat("beamCharge",0, (float) ds.dsc2.getBeamChargeGated()); + recevt.putDouble("liveTime",0,ds.dsc2.getLivetime()); } } /** - * Postprocess one event - * @param e + * Modify REC::Event for beam charge and livetime + * @param runcfg + * @param runcfg */ - public void processEvent(DataEvent e) { - if (!e.hasBank("RUN::config")) return; - if (!e.hasBank("REC::Event")) return; - DataBank runConfig = e.getBank("RUN::config"); - DataBank recEvent = e.getBank("REC::Event"); - if (runConfig.rows()<1 || recEvent.rows()<1) return; - e.removeBank("REC::Event"); - if (helicitySequence != null) processEventHelicity(e, runConfig, recEvent); - if (chargeSequence != null) processEventScalers(runConfig, recEvent); - e.appendBank(recEvent); + private void processEventUnix(Event event, Bank runcfg) { + if (runcfg.getRows() > 0) { + Integer key = eventUnix.floorKey(runcfg.getInt("event",0)); + if (key != null) { + Integer unix = eventUnix.get(key); + if (unix != null) { + event.remove(runcfg.getSchema()); + runcfg.putInt("unixtime", 0, unix); + event.write(runcfg); + } + } + } } /** - * Postprocess one event - * @param e + * Modify REC::Event for beam charge and livetime + * @param runcfg + * @param runcfg */ - public void processEvent(Event e) { - if (!e.hasBank(schemaFactory.getSchema("RUN::config"))) return; - if (!e.hasBank(schemaFactory.getSchema("REC::Event"))) return; - Bank runConfig = new Bank(schemaFactory.getSchema("RUN::config")); - Bank recEvent = new Bank(schemaFactory.getSchema("REC::Event")); - e.read(runConfig); - e.read(recEvent); - if (runConfig.getRows()<1 || recEvent.getRows()<1) return; - e.remove(schemaFactory.getSchema("REC::Event")); - if (helicitySequence != null) processEventHelicity(e, runConfig, recEvent); - if (chargeSequence != null) processEventScalers(runConfig, recEvent); - e.write(recEvent); + private void processEventUnix(DataEvent event, DataBank runcfg) { + if (runcfg.rows() > 0) { + Integer unix = eventUnix.get(eventUnix.floorKey(runcfg.getInt("event",0))); + if (unix != null) { + event.removeBank(runcfg.getDescriptor().getName()); + runcfg.setInt("unixtime", 0, unix); + event.appendBank(runcfg); + } + } } /** - * Create rebuilt files from preload files. - * @param preloadFiles - * @return map of rebuilt:preload files + * Postprocess one event + * @param event */ - private Map rebuild(String dir, List preloadFiles) { - File d = new File(dir); - if (!d.canWrite()) { - throw new RuntimeException("No write permissions on "+dir); - } - Map rebuiltFiles = new HashMap<>(); - for (String preloadFile : preloadFiles) { - String rebuiltFile = dir+"/"+outputPrefix+preloadFile.replace(dir+"/",""); - Util.rebuildScalers(conman, preloadFile, rebuiltFile); - rebuiltFiles.put(rebuiltFile,preloadFile); + public void processEvent(DataEvent event) { + if (event.hasBank("RUN::config")) { + DataBank runcfg = event.getBank("RUN::config"); + if (runcfg.rows() > 0) { + processEventUnix(event, runcfg); + if (event.hasBank("REC::Event")) { + DataBank recevt = event.getBank("REC::Event"); + if (recevt.rows() > 0) { + event.removeBank("REC::Event"); + if (helicitySequence != null) processEventHelicity(event, runcfg, recevt); + if (chargeSequence != null) processEventScalers(runcfg, recevt); + event.appendBank(recevt); + } + } + } } - return rebuiltFiles; } /** - * Replace files with new ones. - * @param files map of new:old filenames + * Postprocess one event + * @param event */ - private static void replace(Map files) { - for (String rebuiltFile : files.keySet()) { - new File(files.get(rebuiltFile)).delete(); - new File(rebuiltFile).renameTo(new File(files.get(rebuiltFile))); + public void processEvent(Event event) { + event.read(runConfig); + event.read(recEvent); + if (runConfig.getRows() > 0) { + processEventUnix(event, runConfig); + if (recEvent.getRows() > 0) { + event.remove(recEvent.getSchema()); + if (helicitySequence != null) processEventHelicity(event, runConfig, recEvent); + if (chargeSequence != null) processEventScalers(runConfig, recEvent); + event.write(recEvent); + } } } + /** + * The "postprocess" program. + * @param args + */ public static void main(String args[]) { - Processor p = new Processor(System.getenv("HOME")+"/tmp","r*.hipo",false,false); + + OptionParser o = new OptionParser("postprocess"); + o.addOption("-f","0","reflip: rebuild the HEL::flip bank"); + o.addOption("-c","0","recharge: rebuild the RUN/HEL::scaler banks"); + o.addOption("-o",null,"merged output file path"); + o.setRequiresInputList(true); + o.parse(args); + + boolean restream = !o.getOption("-f").isDefault(); + boolean rebuild = !o.getOption("-c").isDefault(); + + Processor post = new Processor(o.getInputList(), restream, rebuild); + + HipoWriterSorted writer = null; + + if (!o.getOption("-o").isDefault()) { + writer = new HipoWriterSorted(); + SchemaFactory schema = writer.getSchemaFactory(); + schema.initFromDirectory(ClasUtilsFile.getResourceDir("CLAS12DIR", "etc/bankdefs/hipo4")); + writer.setCompressionType(2); + writer.open(o.getOption("-o").stringValue()); + } + + for (String f : o.getInputList()) { + HipoReader reader = new HipoReader(); + reader.open(f); + Event event = new Event(); + while (reader.hasNext()) { + reader.nextEvent(event); + post.processEvent(event); + if (writer != null) writer.addEvent(event); + } + reader.close(); + } + + if (writer != null) writer.close(); } } diff --git a/common-tools/clas-analysis/src/main/java/org/jlab/analysis/postprocess/Tag1ToEvent.java b/common-tools/clas-analysis/src/main/java/org/jlab/analysis/postprocess/Tag1ToEvent.java index 857a9962f6..b85efd3a95 100644 --- a/common-tools/clas-analysis/src/main/java/org/jlab/analysis/postprocess/Tag1ToEvent.java +++ b/common-tools/clas-analysis/src/main/java/org/jlab/analysis/postprocess/Tag1ToEvent.java @@ -1,5 +1,6 @@ package org.jlab.analysis.postprocess; +import java.util.TreeMap; import java.util.logging.Logger; import org.jlab.clas.reco.ReconstructionEngine; import org.jlab.detector.calib.utils.ConstantsManager; @@ -100,6 +101,9 @@ public static void main(String[] args) { helSeq.initialize(parser.getInputList()); } + // Initialize the unix-event map: + TreeMap eventUnix = Processor.getEventUnixMap(schema, parser.getInputList()); + // Loop over the input HIPO files: LOGGER.info("\n>>> Starting post-processing ...\n"); for (String filename : parser.getInputList()) { @@ -147,6 +151,20 @@ public static void main(String[] args) { event.write(recEventBank); event.write(helScalerBank); + // Update RUN::config.unixtime: + if (runConfigBank.getRows() > 0) { + int evno = runConfigBank.getInt("event", 0); + Integer key = eventUnix.floorKey(evno); + if (key != null) { + Integer unix = eventUnix.get(key); + if (unix != null) { + event.remove(runConfigBank.getSchema()); + runConfigBank.putInt("unixtime", 0, unix); + event.write(runConfigBank); + } + } + } + // Write out the original event: writer.addEvent(event, event.getEventTag());