ICP  1
NeXusEventCallback.cpp
Go to the documentation of this file.
1 #include "stdafx.h"
2 #include "isiscrpt.h"
3 #include "isisraw.h"
4 #include "icputils.h"
5 #include "selogger.h"
6 #include "NeXusEventCallback.h"
7 
8 #define PPP_TO_UAMPH 1.738E-6
9 
12 
16 
17 
18 NeXusEventCallback::NeXusEventCallback(const ISISCRPT_STRUCT* crpt, IXNeXusFile* file, const std::vector<int>& ev_source_ids) :
19  m_current_frame(-1), m_current_period(-1), m_period("period_number", crpt->start_time),m_vetoing("is_vetoing",crpt->start_time),
20  m_crpt(crpt), m_file(file), m_discarded_events(0),m_monitor_events_not_saved(0),m_good_frames(0),m_raw_frames(0),m_is_vetoing(-1),
21  m_mempool(4 * EventStore::maxNumCallbackEvents()),m_entry_name("raw_data_1"),
22  m_good_uamph_sum(0), m_raw_uamph_sum(0), m_good_events_sum(0),m_raw_events_sum(0),m_worker(m_queue),
23  m_last_frame_time(0),m_worker_thread("NeXusWorker"),m_refeventdetcard(-1)
24 {
25  setLoggerName("NeXusEventCallback");
26  m_refeventdetcard = getRefEventDetCard(ev_source_ids);
28  while( !m_queue.hasIdleThreads() )
29  {
30  Poco::Thread::sleep(100); // give thread time to start and connect to queue - avoids possible race condition in destructor?
31  }
32  BOOST_FOREACH(int id, ev_source_ids)
33  {
34  m_card_frame[id] = std::pair<int,time_t>(0,0); // Not INVALID_FRAME_MARKER as we should only have event enabled sources
35  }
36  int nspec = crpt->getNumSpectra(true);
37  m_det_id.resize(nspec);
38  std::string det_name;
39  det_name_t::left_const_iterator it;
40  for(int i=0, j=0; i <nspec; ++i)
41  {
42  makeDetName(i, det_name);
43  m_det_tr[det_name] = m_crpt->spectrumDAETR(i);
44  if ( m_crpt->isEventSpectrum(i) )
45  {
46  m_event_det.insert(det_name);
47  }
48  if ( (it = m_det_name.left.find(det_name)) == m_det_name.left.end() )
49  {
50  it = m_det_name.left.insert(det_name_t::left_value_type(det_name, j++)).first;
51  }
52  m_det_id[i] = it->second; // we are building a fast lookup of spectrum -> det_name
53  }
54  if ( Poco::Util::Application::instance().config().getBool("isisicp.snseventfile", false) )
55  {
56  m_entry_name = "entry";
57  }
58 }
59 
62 int NeXusEventCallback::getRefEventDetCard(const std::vector<int>& ev_source_ids)
63 {
64  int refeventdetcard = Poco::Util::Application::instance().config().getInt("isisicp.refeventdetcard", -1);
65  if ( (refeventdetcard == -1) || (std::find(ev_source_ids.begin(), ev_source_ids.end(), refeventdetcard) != ev_source_ids.end()) )
66  {
67  return refeventdetcard;
68  }
69  if ( ev_source_ids.size() == 0 ) // histogram mode, no cards in event mode
70  {
71  LOGSTR_INFORMATION("No event sources, assuming histogram mode and setting refeventdetcard=-1");
72  return -1;
73  }
74  static const int INVALID_CARD = std::numeric_limits<int>::max();
75  int crate0_low = INVALID_CARD, crate1_low = INVALID_CARD;
76  BOOST_FOREACH(int id, ev_source_ids)
77  {
78  if ( (id / 100 == 0) && (id < crate0_low) )
79  {
80  crate0_low = id;
81  }
82  if ( (id / 100 == 1) && (id < crate1_low) )
83  {
84  crate1_low = id;
85  }
86  }
87  if (crate1_low != INVALID_CARD)
88  {
89  LOGSTR_WARNING("Using card number " << crate1_low << " as reference detecor card as " << refeventdetcard << " not present");
90  refeventdetcard = crate1_low;
91  }
92  else if (crate0_low != INVALID_CARD)
93  {
94  LOGSTR_WARNING("Using card number " << crate0_low << " as reference detecor card as " << refeventdetcard << " not present");
95  refeventdetcard = crate0_low;
96  }
97  else
98  {
99  LOGSTR_ERROR("Reference event detector card number " << refeventdetcard << " not present and cannot find alternative");
100  refeventdetcard = -1;
101  }
102  return refeventdetcard;
103 }
104 
106 {
107  static bool random_shift = Poco::Util::Application::instance().config().getBool("isisicp.randomeventbinshift", true);
108  close();
109  LOGSTR_INFORMATION("GOOD uAh summed from events = " << m_good_uamph_sum * PPP_TO_UAMPH);
110  LOGSTR_INFORMATION("RAW uAh summed from events = " << m_raw_uamph_sum * PPP_TO_UAMPH);
111  LOGSTR_INFORMATION("GOOD events = " << m_good_events_sum);
112  LOGSTR_INFORMATION("RAW events = " << m_raw_events_sum);
113  LOGSTR_INFORMATION("DISCARDED events = " << m_discarded_events);
114  LOGSTR_INFORMATION("Monitor events not saved = " << m_monitor_events_not_saved);
115  LOGSTR_INFORMATION("GOOD / RAW frames from events = " << m_good_frames << " / " << m_raw_frames);
116  int nframes = m_frame_number.size();
117  LOGSTR_INFORMATION("Number of frames in event list = " << nframes);
118  if ( (m_raw_events_sum > 0) && (nframes == 0) )
119  {
120  LOGSTR_ERROR("Events with no frames to write - something is very wrong. Last time we saw this was on WISH when the first crate generated events/frames but the second one didn't");
121  return;
122  }
123 // for(std::map<int,int>::const_iterator it = m_first_data_frame.begin(); it != m_first_data_frame.end(); ++it)
124 // {
125 // LOGSTR_INFORMATION("Spectrum " << it->first << " first data frame " << it->second);
126 // }
127  int i, j, frame_no, nevents, n_no_events;
128  std::vector<uint64_t> event_index(nframes);
129  std::vector<int> event_frame_number(nframes);
130  std::vector<double> event_frame_time(nframes);
131  std::vector<int> frame_events(nframes, 0);
132  for(std::set<std::string>::const_iterator it=m_event_det.begin(); it != m_event_det.end(); ++it)
133  {
134  const std::map<int,int>& this_det = m_frame_events_write[*it];
135  n_no_events = 0;
136  if (nframes > 0) // nframes == 0 if we have no events at all in event mode
137  {
138  event_index[0] = 0;
139  }
140  for(i = 0; i < nframes; ++i)
141  {
142  frame_no = m_frame_number[i];
143  event_frame_time[i] = static_cast<double>(m_frame_time[i]);
144  event_frame_number[i] = m_frame_number[i];
145  std::map<int,int>::const_iterator v = this_det.find(frame_no);
146  if ( v != this_det.end() )
147  {
148  nevents = v->second;
149  }
150  else
151  {
152  nevents = 0;
153  ++n_no_events;
154  }
155  if ( i < (nframes - 1) )
156  {
157  event_index[i+1] = event_index[i] + nevents;
158  }
159  frame_events[i] += nevents;
160  }
161  LOGSTR_INFORMATION("Inserted " << n_no_events << " zero event frames for " << *it);
162 
163  NXlink tof_link;
164  memset(&tof_link, 0, sizeof(NXlink));
165  if ( m_det_tr.find(*it) != m_det_tr.end() )
166  {
167  m_file->openGroup("instrument", "NXinstrument");
168  m_file->openGroup("dae","IXdae");
169  m_file->openGroup(Poco::format("time_channels_%d", m_det_tr[*it]).c_str(), "IXtime_channels");
170  m_file->getLink("time_of_flight", tof_link);
171  m_file->closeGroup();
172  m_file->closeGroup();
173  m_file->closeGroup();
174  }
175  else
176  {
177  LOGSTR_WARNING("Cannot find event_time_bins link for " << *it);
178  }
179  openDataGroup(*it, true);
180  m_file->writeData("event_index", event_index);
181  m_file->writeData("event_frame_number", event_frame_number);
182  m_file->writeData("event_time_zero", event_frame_time);
183  m_file->writeAttribute("event_time_zero", "offset", m_file->ISOtime(m_crpt->start_time));
184  m_file->writeAttribute("event_time_zero", "units", "second");
185  m_file->writeData("total_counts", m_events_write_total[*it]);
186  if (strlen(tof_link.targetPath) > 0)
187  {
188  m_file->makeLink("event_time_bins", tof_link);
189  }
190  if (random_shift)
191  {
192  m_file->writeData("event_time_offset_shift", "random");
193  }
194  else
195  {
196  m_file->writeData("event_time_offset_shift", "centre");
197  }
198  std::vector<NXlink> links(7);
199  m_file->getLink("event_index", links[0]);
200  m_file->getLink("event_frame_number", links[1]);
201  m_file->getLink("event_time_zero", links[2]);
202  m_file->getLink("total_counts", links[3]);
203  m_file->getLink("event_id", links[4]);
204  m_file->getLink("event_time_offset", links[5]);
205  m_file->getLink("event_time_offset_shift", links[6]);
206  closeDataGroup(*it);
207  // now make necessary links from NXdetector to NXevent_data etc.
208  if (it->find(detector_base) != std::string::npos)
209  {
210  m_file->makeNewGroup((*it+"_events"), "NXevent_data");
211  std::for_each(links.begin(), links.end(), boost::bind(&IXNeXusFile::makeLink, m_file, (const char*)NULL, _1));
212  if (strlen(tof_link.targetPath) > 0)
213  {
214  m_file->makeLink("event_time_bins", tof_link);
215  }
216  m_file->closeGroup();
217  }
218  }
219  // update SE logs
220  m_file->openGroup("framelog", "NXcollection");
221  m_file->addLog("frame_log", m_frame_number, "frame_number", m_frame_time, "second", m_crpt->start_time);
222  m_file->addLog("period_log", m_frame_period, "period_number", m_frame_time, "second", m_crpt->start_time);
223  m_file->addLog("good_frame_log", m_good_frame, "is_good", m_frame_time, "second", m_crpt->start_time);
224  m_file->addLog("proton_charge", m_frame_proton_charge, "uAh", m_frame_time, "second", m_crpt->start_time);
225  m_file->addLog("events_log", frame_events, "events", m_frame_time, "second", m_crpt->start_time);
226  for(i = 0; i < nframes; ++i)
227  {
228  frame_events[i] = m_frame_events_raw[i];
229  }
230  m_file->addLog("raw_events_log", frame_events, "events", m_frame_time, "second", m_crpt->start_time);
231  const char* units;
232  for(uint_map_t::const_iterator it = m_se_uint_values.begin(); it != m_se_uint_values.end(); ++it)
233  {
234  units = "unknown";
235  for(int j=0; j<m_crpt->n_data_dae; ++j)
236  {
237  if (it->first == m_crpt->data_dae[j].name)
238  {
239  units = m_crpt->data_dae[j].scaled_units;
240  }
241  else if ( it->first == (std::string(m_crpt->data_dae[j].name)+"_raw") )
242  {
243  units = m_crpt->data_dae[j].raw_units;
244  }
245  }
246  m_file->addLog(it->first.c_str(), it->second, units, m_se_times[it->first], "second", m_crpt->start_time);
247  }
248  for(float_map_t::const_iterator it = m_se_float_values.begin(); it != m_se_float_values.end(); ++it)
249  {
250  units = "unknown";
251  for(int j=0; j<m_crpt->n_data_dae; ++j)
252  {
253  if (it->first == m_crpt->data_dae[j].name)
254  {
255  units = m_crpt->data_dae[j].scaled_units;
256  }
257  else if ( it->first == (std::string(m_crpt->data_dae[j].name)+"_raw") )
258  {
259  units = m_crpt->data_dae[j].raw_units;
260  }
261  }
262  m_file->addLog(it->first.c_str(), it->second, units, m_se_times[it->first], "second", m_crpt->start_time);
263  }
264  m_file->closeGroup();
265 
266  m_file->openGroup("runlog", "IXrunlog");
267  m_period.setOffset(m_crpt->start_time); // update offset as may have changed since start of run
268  m_period.write(m_file, "period_change_log");
269  m_vetoing.setOffset(m_crpt->start_time); // update offset as may have changed since start of run
270  m_vetoing.write(m_file, "veto_log");
271  m_file->closeGroup();
272 
274  m_file->writeData("total_uncounted_counts", m_discarded_events);
275  m_file->writeData("monitor_events_not_saved", m_monitor_events_not_saved);
276 }
277 
279 {
280  for(det_file_t::const_iterator det_pos = m_det_file.begin(); det_pos != m_det_file.end(); ++det_pos)
281  {
282  BOOST_FOREACH(IXNeXusFile* nx, det_pos->second)
283  {
284  nx->closeData();
285  closeDataGroup(nx, det_pos->first);
286  nx->close();
287  delete nx;
288  }
289  }
290  m_det_file.clear();
291 }
292 
294 {
295  for(det_file_t::const_iterator det_pos = m_det_file.begin(); det_pos != m_det_file.end(); ++det_pos)
296  {
297  BOOST_FOREACH(IXNeXusFile* nx, det_pos->second)
298  {
299  nx->flush();
300  }
301  }
302  m_file->flush();
303 }
304 
306 {
307  close();
308  Poco::Thread::sleep(100);
309  while( !m_queue.empty() )
310  {
311  Poco::Thread::sleep(100);
312  }
313  m_queue.wakeUpAll(); // gets m_worker to finish
314  Poco::Thread::sleep(100);
315  // we have had lots of trouble here! It seems that wakeUpAll() can get ignored some times and we end up with a waiting thread
316  // so try a bit of defensive programming...
317  if ( m_queue.hasIdleThreads() )
318  {
319  LOGSTR_INFORMATION("Retrying wakeUpAll()");
320  while( m_queue.hasIdleThreads() )
321  {
322  m_queue.wakeUpAll(); // gets m_worker to finish
323  Poco::Thread::sleep(100);
324  }
325  }
326  m_worker_thread.join();
327 }
328 
329 void NeXusEventCallback::makeDetName(int spec, std::string& det_name)
330 {
331  int mon_num = m_crpt->spectrumToMonitorNumber(spec);
332  if (mon_num > 0)
333  {
334  det_name = Poco::format("monitor_%d", mon_num);
335  }
336  else
337  {
338  det_name = Poco::format(detector_format, m_crpt->spectrumGroup(spec));
339  }
340 }
341 
342 
344 {
345  int j;
346  m_file->makeNewGroup("raw_data_1", "NXentry");
347  m_file->makeNewGroup("instrument", "NXinstrument");
348  m_file->makeNewGroup("dae","IXdae");
349 
350 // std::set<int> tr_used;
351 // for(std::map<std::string,int>::const_iterator it = m_det_tr.begin(); it != m_det_tr.end(); ++it)
352 // {
353 // tr_used.insert(it->second);
354 // }
355 // BOOST_FOREACH(int tr, tr_used)
356  for(int tr=1; tr <= m_crpt->ntrg; ++tr)
357  {
358  j = tr - 1;
359  m_file->makeNewGroup(Poco::format("time_channels_%d", tr).c_str(), "IXtime_channels");
360  m_file->writeData("time_of_flight", &(m_crpt->rtcb[j][0]), 1 + m_crpt->ntc[j]);
361  m_file->writeAttribute("time_of_flight", "axis", 1);
362  m_file->writeAttribute("time_of_flight", "primary", 1);
363  m_file->writeAttribute("time_of_flight", "units", "microsecond");
364  m_file->writeData("time_of_flight_raw", &(m_crpt->tcb[j][0]), 1 + m_crpt->ntc[j]);
365  m_file->writeAttribute("time_of_flight_raw", "units", "pulses");
366  m_file->writeAttribute("time_of_flight_raw", "frequency", "32 MHz");
367  m_file->closeGroup();
368  }
369  m_file->closeGroup();
370  m_file->closeGroup();
371  for(det_name_t::const_iterator it = m_det_name.begin(); it != m_det_name.end(); ++it)
372  {
373  const std::string& name = it->left;
374  if (name.substr(0, 8) == "monitor_")
375  {
376  m_file->makeNewGroup(name, "NXmonitor", false);
377  }
378  else
379  {
380  m_file->openGroup("instrument", "NXinstrument");
381  m_file->makeNewGroup(name, "NXdetector", false);
382  m_file->closeGroup();
383  m_file->makeNewGroup(name, "NXdata", false);
384  }
385  }
386  m_file->makeNewGroup("runlog", "IXrunlog", false);
387 }
388 
389 
390 void NeXusEventCallback::processDataDae(const DAEEventHeader* head, const DetectorEvent32* det_ev, int n, int event_source_id)
391 {
392  static const uint32_t SIX_MONTHS_IN_SECS = 6 * 28 * 24 * 60 * 60;
393  int i, ndiscarded = 0, device_id, device_index, nignored = 0;
394  union { uint64_t u64; double d; uint32_t u32; float f; unsigned char b[8]; } data_value;
395  float offset_time = -1.0;
396  DAEEventList::DAETimeToOffset(head->time, m_crpt->start_time, offset_time);
397  std::ostringstream msgpfx_oss;
398  msgpfx_oss << "(Time: " << offset_time << ", Frame: " << head->frame_number << ") ";
399  const std::string& msgpfx = msgpfx_oss.str();
400  bool ignore_value;
401  for(i=0; i<n; ++i)
402  {
403  ignore_value = false;
404  if (head->info.bad_frame || det_ev[i].time_channel == 0 ) // ignore bad and out of time range frames
405  {
406  ++ndiscarded;
407  InterlockedIncrement(&m_discarded_events);
408  continue;
409  }
410  if ( (det_ev[i].spectrum >> 8) != 0 )
411  {
412  LOGSTR_ERROR(msgpfx << "Expected device id packet, but found header: " << (det_ev[i].spectrum >> 8) << " value: " << (det_ev[i].spectrum & 0xff));
413  continue;
414  }
415  device_id = det_ev[i].spectrum & 0xff;
416  device_index = -1;
417  for(int j=0; j<m_crpt->n_data_dae; ++j)
418  {
419  if (m_crpt->data_dae[j].id == device_id)
420  {
421  device_index = j;
422  }
423  }
424  if (device_index == -1)
425  {
426  LOGSTR_ERROR(msgpfx << "Invalid device id (not specified in XML file) " << device_id);
427  continue;
428  }
429  const data_dae_t& data_dae = m_crpt->data_dae[device_index];
430  if (data_dae.data_bytes != 4)
431  {
432  LOGSTR_ERROR(msgpfx << "Invalid data size " << data_dae.data_bytes << " for device id " << device_id);
433  i += data_dae.data_bytes;
434  continue;
435  }
436  if (data_dae.data_type != 1 && data_dae.data_type != 2)
437  {
438  LOGSTR_ERROR(msgpfx << "Invalid data type " << data_dae.data_type << " for device id " << device_id);
439  i += data_dae.data_bytes;
440  continue;
441  }
442  for(int j=1; j <= data_dae.data_bytes; ++j)
443  {
444  if ( (det_ev[i+j].spectrum >> 8) != j )
445  {
446  LOGSTR_WARNING(msgpfx << "invalid data byte " << j << " for device id " << device_id
447  << " header: " << (det_ev[i+j].spectrum >> 8) << " value: " << (det_ev[i+j].spectrum & 0xff) );
448  ignore_value = true;
449  }
450  data_value.b[j-1] = det_ev[i+j].spectrum & 0xff;
451  }
452  int dae_tr = m_crpt->daeTR(data_dae.tr);
453  if (offset_time < 0.0 || offset_time > SIX_MONTHS_IN_SECS) // no run should last more than this time
454  {
455  LOGSTR_WARNING("Invalid SE DAE time value " << head->time << "(offset " << offset_time << " seconds) for frame " << m_current_frame); // error
456  ignore_value = true;
457  }
458  if (!ignore_value)
459  {
460  if (data_dae.data_type == 1)
461  {
462  m_se_uint_values[std::string(data_dae.name)+"_raw"].push_back(data_value.u32);
463  m_se_float_values[data_dae.name].push_back(data_value.u32 * data_dae.scale + data_dae.offset);
464  }
465  else
466  {
467  m_se_float_values[std::string(data_dae.name)+"_raw"].push_back(data_value.f);
468  m_se_float_values[data_dae.name].push_back(data_value.f * data_dae.scale + data_dae.offset);
469  }
470  m_se_times[std::string(data_dae.name)+"_raw"].push_back(offset_time + getTimeOffset(det_ev, event_source_id, dae_tr, i) / 1e6); // time of first value
471  m_se_times[data_dae.name].push_back(offset_time + getTimeOffset(det_ev, event_source_id, dae_tr, i) / 1e6); // time of first value
472  }
473  else
474  {
475  ++nignored;
476  }
477  i += data_dae.data_bytes;
478  }
479  if (i != n)
480  {
481  LOGSTR_ERROR(msgpfx << "SE Data count used mismatch " << i << " != " << n);
482  }
483  if (nignored > 0)
484  {
485  LOGSTR_WARNING("Ignored " << nignored << " SE data to dae values");
486  }
487 }
488 
489 
492 void NeXusEventCallback::allEventCallback(const DAEEventHeader* head, const DetectorEvent32* det_ev, int n, int event_source_id, const int* mapping)
493 {
494  if (m_mempool.blockSize() < 4 * n)
495  {
496  throw std::exception("NeXusEventCallback::allEventCallback");
497  }
498  if (n == 0)
499  {
500  updateCurrentWriteFrame(head->frame_number, event_source_id);
501  return;
502  }
503  // all spectra from a card (event_source_id) must have the same DAE TR, however they may have a different CRPT TR
504  // the DAE TR refers to the real event time as proframmed in the detector card, the CRPT TR is the time regime that
505  // was used for on the fly rebinning and is also used for naming the NXdetector objects
506  int spec, dae_tr = -1, dae_tr0 = -1;
507  for(int i=0; i<n; ++i)
508  {
509  spec = getSpectrum(det_ev, event_source_id, i);
510  if (spec == 0)
511  {
512  dae_tr0 = m_crpt->spectrumDAETR(spec);
513  }
514  else if (spec > 0) // we may have corrupt values from card (-1); also we want to avoid spectrum 0 as this is possibly in a different time regime
515  {
516  dae_tr = m_crpt->spectrumDAETR(spec);
517  break;
518  }
519  }
520  if (dae_tr == -1 && dae_tr0 != -1)
521  {
522  dae_tr = dae_tr0;
523  }
524  if (dae_tr == -1)
525  {
526  for(int i=0; i < m_crpt->n_data_dae; ++i)
527  {
528  if (m_crpt->data_dae[i].crate == event_source_id)
529  {
530  processDataDae(head, det_ev, n, event_source_id);
531  return;
532  }
533  }
534  m_logstr->warning() << "Ignoring " << n << " events from frame "<< head->frame_number << " event source " << event_source_id << " as all spectra are invalid";
535  return;
536  }
537 // std::string current_det_name;
538 // int_map_t event_id;
539 // float_map_t event_time_offset;
540  int_map_t& event_id = *m_int_map_store.get();
541  float_map_t& event_time_offset = *m_float_map_store.get();
542  float_map_t::iterator event_time_it;
543  int_map_t::iterator event_id_it;
544  int mon_num, det_id, current_det_id = -1, nmonskippedevents = 0, ndiscarded = 0;
545  for(int i=0; i<n; ++i)
546  {
547  spec = getSpectrum(det_ev, event_source_id, i);
548  mon_num = m_crpt->spectrumToMonitorNumber(spec);
549  if ( (mon_num > 0) && !m_crpt->saveMonitorEvents(mon_num) )
550  {
551  ++nmonskippedevents;
552 #ifdef _WIN64
553  InterlockedIncrement64(&m_monitor_events_not_saved);
554 #endif /* _WIN64 */
555  continue;
556  }
557  if (head->info.bad_frame || det_ev[i].time_channel == 0 || spec == -1) // ignore bad and out of time range frames
558  {
559  ++ndiscarded;
560  InterlockedIncrement(&m_discarded_events);
561  }
562  else
563  {
564  det_id = m_det_id[spec];
565  if (current_det_id != det_id)
566  {
567  const std::string& det_name = m_det_name.right.at(det_id);
568  if ( (event_time_it = event_time_offset.find(det_name)) == event_time_offset.end() )
569  {
570  event_time_offset[det_name];
571  event_time_it = event_time_offset.find(det_name);
572  }
573  if (event_time_it->second.capacity() < n)
574  {
575  event_time_it->second.reserve(n);
576  }
577  if ( (event_id_it = event_id.find(det_name)) == event_id.end() )
578  {
579  event_id[det_name];
580  event_id_it = event_id.find(det_name);
581  }
582  if (event_id_it->second.capacity() < n)
583  {
584  event_id_it->second.reserve(n);
585  }
586  current_det_id = det_id;
587  }
588  event_time_it->second.push_back(getTimeOffset(det_ev, event_source_id, dae_tr, i));
589 // event_id_it->second.push_back(m_crpt->udetFromSpec(spec)); ///< @todo for SNS compatibility - conver to detector ID
590  event_id_it->second.push_back(spec);
591  }
592  }
593  writeEvents(head, event_source_id, n - nmonskippedevents /* - ndiscarded */, event_id, event_time_offset);
594  for(int_map_t::iterator it = event_id.begin(); it != event_id.end(); ++it)
595  {
596  it->second.resize(0);
597  }
598  m_int_map_store.put(&event_id);
599  for(float_map_t::iterator it = event_time_offset.begin(); it != event_time_offset.end(); ++it)
600  {
601  it->second.resize(0);
602  }
603  m_float_map_store.put(&event_time_offset);
604 }
605 
607 void NeXusEventCallback::allFrameCallback(const DAEEventHeader* head, int event_source_id)
608 {
609  Poco::Mutex::ScopedLock _lock(m_frame_callback_mutex);
610  static bool warneventpppmismatch = Poco::Util::Application::instance().config().getBool("isisicp.warneventpppmismatch", false);
611  static const uint32_t MAX_ISIS_PPP = static_cast<uint32_t>( 0.5 + (300.0 * 3600.0) / (PPP_TO_UAMPH * 50.0) );
612  static const uint32_t SIX_MONTHS_IN_SECS = 6 * 28 * 24 * 60 * 60;
613  uint32_t the_protons = head->protons;
614  m_frame_events_raw[static_cast<int>(head->frame_number)] += head->num_events;
615  if (m_refeventdetcard != -1 && m_refeventdetcard != event_source_id)
616  {
617  return; // don't trust the information
618  }
619  if ( warneventpppmismatch && (static_cast<int>(head->frame_number) == m_current_frame) )
620  {
621  unsigned ref_protons = static_cast<unsigned>(0.5 + m_frame_proton_charge.back() / PPP_TO_UAMPH);
622  if ( head->protons != ref_protons )
623  {
624  LOGSTR_WARNING("proton mismatch " << head->protons << " != " << ref_protons);
625  }
626  float ref_time = m_frame_time.back();
627  float this_time;
629  if (this_time != ref_time)
630  {
631  LOGSTR_WARNING("absolute time mismatch " << this_time << " != " << ref_time);
632  }
633  }
634  if ( static_cast<int>(head->frame_number) <= m_current_frame )
635  {
636  return; // we have already seen this frame from another source
637  }
638  if ( (head->frame_number - m_current_frame) > 1 ) // we should only jump by one frame at a time;
639  {
640  LOGSTR_WARNING("Jumping more whan one frame " << head->frame_number << " -> " << m_current_frame); // error
641  }
643  float previous_offset_time = 0.0;
644  uint32_t previous_protons = 0;
645  if (m_frame_proton_charge.size() > 0)
646  {
647  previous_protons = static_cast<uint32_t>(0.5 + m_frame_proton_charge.back() / PPP_TO_UAMPH);
648  }
649  if (m_frame_time.size() > 0)
650  {
651  previous_offset_time = m_frame_time.back();
652  }
653  if (the_protons > MAX_ISIS_PPP)
654  {
655  LOGSTR_WARNING("Invalid DAE PPP value " << the_protons << " (" << the_protons * PPP_TO_UAMPH << " uAh) for frame " << m_current_frame); // error
656 // the_protons = previous_protons;
657  }
658  float offset_time = -1.0;
659  DAEEventList::DAETimeToOffset(head->time, m_crpt->start_time, offset_time);
660  if (offset_time < 0.0 || offset_time > SIX_MONTHS_IN_SECS) // no run should last more than this time
661  {
662  LOGSTR_WARNING("Invalid DAE time value " << head->time << "(offset " << offset_time << " seconds) for frame " << m_current_frame); // error
663 // offset_time = previous_offset_time + 0.01; // add 1/100 second, half of a 50Hz frame
664  }
665  if ( m_current_period != head->period )
666  {
667  m_current_period = head->period;
668  m_period.addValue(offset_time, m_current_period + 1);
669  }
670  if ( m_is_vetoing != head->info.bad_frame )
671  {
672  m_is_vetoing = head->info.bad_frame;
673  m_vetoing.addValue(offset_time, head->info.bad_frame);
674  }
675  ++m_raw_frames;
676  m_raw_uamph_sum += the_protons;
677  if (!head->info.bad_frame)
678  {
679  ++m_good_frames;
680  m_good_uamph_sum += the_protons;
681  }
682  m_frame_period.push_back(m_current_period + 1);
683  m_frame_time.push_back(offset_time);
684  m_frame_number.push_back(m_current_frame);
685  m_frame_proton_charge.push_back(the_protons * PPP_TO_UAMPH);
686  m_good_frame.push_back(!head->info.bad_frame);
687 // head->info.bad_frame;
688 // head->vetos;
689 // std::cerr << "Frame " << head->frame_number << " time " << DAEEventList::DAETimeAsString(head->time) << std::endl;
690 }
691 
692 void NeXusEventCallback::updateCurrentWriteFrame(int frame, int event_source_id)
693 {
694  Poco::Mutex::ScopedLock _lock(m_event_write_mutex);
695  int nvalid = 0;
696  for(card_frame_t::const_iterator cit = m_card_frame.begin(); cit != m_card_frame.end(); ++cit)
697  {
698  if (cit->second.first != INVALID_FRAME_MARKER)
699  {
700  ++nvalid;
701  break; // only interested in nvalid != 0
702  }
703  }
704  card_frame_t::iterator it;
705  time_t ref_time = time(NULL);
706  m_last_frame_time = ref_time;
707  if (nvalid == 0) // we are the first card to call, other cards should be at this frame number or less so set this to make sure we do not lose them
708  {
709  for(it = m_card_frame.begin(); it != m_card_frame.end(); ++it)
710  {
711  it->second.first = frame;
712  it->second.second = ref_time;
713  }
714  m_event_write_cond.broadcast();
715  }
716  else
717  {
718  it = m_card_frame.find(event_source_id);
719  if ( it != m_card_frame.end() )
720  {
721  it->second.first = frame;
722  it->second.second = ref_time;
723  m_event_write_cond.broadcast();
724  }
725  else
726  {
727  LOGSTR_WARNING("Data from invalid source id " << event_source_id << " in frame " << frame);
728  }
729  }
730 }
731 
732 // this will get called when we have no data in the card, which when reading from a file means we have finished
733 void NeXusEventCallback::noFrameCallbackEnd(bool end_present, NoFrameCallbackMode mode, int event_source_id)
734 {
735  card_frame_t::const_iterator it = m_card_frame.find(event_source_id);
736  if ( it != m_card_frame.end() )
737  {
738  if ( it->first != INVALID_FRAME_MARKER )
739  {
741  }
742  }
743  else
744  {
745  LOGSTR_WARNING("Data from invalid source id " << event_source_id);
746  }
747 }
748 
749 // this will get called when we have no data in the card, which when reading from the real DAE could be just beam off
750 // what we haven't covered here is if a detector card failed and stopped returning headers at all
751 void NeXusEventCallback::noFrameCallbackNull(bool end_present, NoFrameCallbackMode mode, int event_source_id)
752 {
753  card_frame_t::iterator it = m_card_frame.find(event_source_id);
754  if ( it != m_card_frame.end() )
755  {
756  if ( mode != NoFrameRunning )
757  {
758  noFrameCallbackEnd(end_present, mode, event_source_id);
759  }
760  }
761  else
762  {
763  LOGSTR_WARNING("Data from invalid source id " << event_source_id);
764  }
765 }
766 
768 {
769  static const int INITIAL_VAL = std::numeric_limits<int>::max();
770  int current_frame = INITIAL_VAL;
771  for(card_frame_t::const_iterator it = m_card_frame.begin(); it != m_card_frame.end(); ++it)
772  {
773  if (it->second.first != INVALID_FRAME_MARKER)
774  {
775  current_frame = std::min(current_frame, it->second.first);
776  }
777  }
778  if (current_frame == INITIAL_VAL)
779  {
780  current_frame = 0;
781  }
782  return current_frame;
783  // this should work but doesn't - a std::copy with value_iterator gives values, but std::min_element doesn't work
784  // return *(std::min_element(value_iterator<int,int>(m_card_frame.begin()), value_iterator<int,int>(m_card_frame.end())));
785 }
786 
787 void NeXusEventCallback::writeEvents(const DAEEventHeader* head, int event_source_id, int nev, const int_map_t& event_id, const float_map_t& event_time_offset)
788 {
789  static bool single_nexus_writer_thread = Poco::Util::Application::instance().config().getBool("isisicp.singlenexuswriterthread", true);
790  Poco::Mutex::ScopedLock _lock(m_event_write_mutex);
791  updateCurrentWriteFrame(head->frame_number, event_source_id);
792  int current_frame = currentWriteFrame();
793  while(head->frame_number > current_frame)
794  {
795  //m_event_write_cond.tryWait(m_event_write_mutex, 10000); // we should be able to just use wait(), but this covers us if a broadcast is not received
797  current_frame = currentWriteFrame();
798  }
799  if (head->frame_number < current_frame)
800  {
801  LOGSTR_WARNING("Writing data arriving too late: passed frame " << head->frame_number << " when current is " << current_frame << " for source " << event_source_id);
802  }
803  if (single_nexus_writer_thread)
804  {
805  Poco::Event ev;
806  NeXusWorkerData d(this, ev, head, event_source_id, nev, event_id, event_time_offset);
807  m_queue.enqueueNotification(new GenericWorkNotification<NeXusWorkerData>(&d));
808  ev.wait();
809  }
810  else
811  {
812  writeEventsImpl(head, event_source_id, nev, event_id, event_time_offset);
813  }
814 }
815 
816 void NeXusEventCallback::writeEventsImpl(const DAEEventHeader* head, int event_source_id, int nev, const int_map_t& event_id, const float_map_t& event_time_offset)
817 {
818  int current_frame = currentWriteFrame();
819  poco_assert(head->frame_number == current_frame);
820  m_raw_events_sum += nev;
821  std::string current_group;
822  det_file_t::iterator det_pos;
823  uint64_t *group_total_events = NULL;
824  int *group_frame_events = NULL;
825  int64_t dims_array[1], slab_start[1], dest_start[1];
826  const float_map_t::mapped_type* event_times = NULL;
827  for(int_map_t::const_iterator it = event_id.begin(); it != event_id.end(); ++it)
828  {
829  if (it->second.size() == 0)
830  {
831  continue;
832  }
833  if (current_group != it->first)
834  {
835  current_group = it->first;
836  det_pos = m_det_file.find(current_group);
837  if (det_pos == m_det_file.end())
838  {
839  std::vector<IXNeXusFile*> det_tmp(2, NULL);
840  BOOST_FOREACH(IXNeXusFile*& det_nx, det_tmp)
841  {
842  det_nx = m_file->clone();
843  det_nx->openGroup(m_entry_name.c_str(), "NXentry");
844  openDataGroup(det_nx, current_group);
845  group_total_events = &(m_events_write_total[current_group]); // this is here as openDataGroup check its to know whether to create group
846  }
847  det_tmp[0]->openData("event_id");
848  det_tmp[1]->openData("event_time_offset");
849  m_det_file[current_group] = det_tmp;
850  det_pos = m_det_file.find(current_group);
851  }
852  group_total_events = &(m_events_write_total[current_group]);
853  group_frame_events = &(m_frame_events_write[current_group][current_frame]);
854  event_times = &(event_time_offset.at(current_group));
855  }
856  std::vector<IXNeXusFile*>& det_nx_v = det_pos->second;
857  (*group_frame_events) += it->second.size();
858  // write events
859  dims_array[0] = it->second.size();
860  slab_start[0] = 0;
861  dest_start[0] = *group_total_events;
862  det_nx_v[0]->addSlab(it->second.data(), dims_array, 1, slab_start, dims_array, dest_start);
863  det_nx_v[1]->addSlab(event_times->data(), dims_array, 1, slab_start, dims_array, dest_start);
864  (*group_total_events) += it->second.size();
865  if (!head->info.bad_frame)
866  {
867  m_good_events_sum += it->second.size();
868  }
869 // for(int i=0; i<it->second.size(); ++i)
870 // {
871 // if ( m_first_data_frame.find(it->second[i]) == m_first_data_frame.end() )
872 // {
873 // m_first_data_frame[it->second[i]] = current_frame;
874 // }
875 // }
876  }
877 }
878 
879 void NeXusEventCallback::openDataGroup(IXNeXusFile* nx, const std::string& name, bool create_zero_size_if_missing)
880 {
881  int dims_array[1];
882  int chunks_array[1] = { m_nexus_chunk_length };
883  bool is_monitor = (name.find("monitor") != std::string::npos);
884  if (is_monitor)
885  {
886  nx->openGroup(name.c_str(), "NXmonitor");
887  }
888  else
889  {
890  nx->openGroup("instrument", "NXinstrument");
891  nx->openGroup(name.c_str(), "NXdetector");
892  }
893  if (m_events_write_total.find(name) == m_events_write_total.end()) // create data items if they do not exist already
894  {
895  if (create_zero_size_if_missing) // we are writing an empty event structure
896  {
897  dims_array[0] = 0;
898  nx->makeData("event_id", NX_UINT32, dims_array, 1);
899  nx->makeData("event_time_offset", NX_FLOAT32, dims_array, 1);
900  }
901  else
902  {
903  dims_array[0] = NX_UNLIMITED;
904  nx->makeDataChunked("event_id", NX_UINT32, dims_array, chunks_array, 1);
905  nx->makeDataChunked("event_time_offset", NX_FLOAT32, dims_array, chunks_array, 1);
906  }
907  nx->writeAttribute("event_time_offset", "units", "microsecond");
908  nx->flush();
909  }
910 }
911 
913 void NeXusEventCallback::closeDataGroup(IXNeXusFile* nx, const std::string& name)
914 {
915  if (name.size() == 0)
916  {
917  return;
918  }
919  bool is_monitor = (name.find("monitor") != std::string::npos);
920  nx->closeGroup();
921  if (!is_monitor) // detectors are one level lower than monitors
922  {
923  nx->closeGroup();
924  }
925 }
int openGroup(const char *group_name, const char *group_class)
void allEventCallback(const DAEEventHeader *head, const DetectorEvent32 *det_ev, int n, int event_source_id, const int *mapping)
void noFrameCallbackNull(bool end_present, NoFrameCallbackMode mode, int event_source_id)
int data_type
0=unknown, 1=int, 2=float
Definition: isiscrpt.h:182
const char * ISOtime(time_t time)
int getSpectrum(const DetectorEvent32 *det_ev, int event_source_id, int i)
returns -1 if spectrum out of range
int getNumSpectra(bool include_spectrum_zero=false) const
Definition: isiscrpt.cpp:926
void openDataGroup(IXNeXusFile *nx, const std::string &name, bool create_zero_size_if_missing=false)
Poco::MemoryPool m_mempool
static const int m_nexus_chunk_length
number of items in NeXus event list data chunk
uint32_t num_events
Definition: dae_events.h:60
float rtcb[ISISCRPT_MAX_NTRG][ISISCRPT_MAX_TIMECHANB]
Definition: isiscrpt.h:344
std::vector< float > m_frame_time
monitor events we did not save to file, instead just saving histograms
int clone(const IXNeXusFile &orig)
int makeNewGroup(const char *name, const char *nxclass, bool open_group=true)
int m_is_vetoing
last period number seen, used for change period log
float_map_t m_se_float_values
int flush()
Definition: nexuswriter.h:29
double offset
Definition: isiscrpt.h:185
fixed header marker for DAEEventHeader
Definition: dae_events.h:44
time_t start_time
Definition: isiscrpt.h:254
int makeData(const char *data_name, int nxtype, const int *dims_array, int ndims)
int currentWriteFrame() const
unsigned period
Definition: dae_events.h:57
void makeDetName(int spec, std::string &det_name)
bool saveMonitorEvents(int monitor_number) const
Definition: isiscrpt.h:589
std::set< std::string > m_event_det
std::vector< float > m_frame_proton_charge
int m_good_frames
1 yes, 0 no (-1 means unknown, but just used for initialisation)
int addLog(const char *nxname, const char *filename, const char *log_name, const char *log_units, time_t ref_time)
void addValue(float time, T value)
Definition: nexuswriter.h:205
static const int INVALID_FRAME_MARKER
static Poco::Mutex m_event_write_mutex
int crate
Definition: isiscrpt.h:180
char scaled_units[64]
Definition: isiscrpt.h:179
void updateCurrentWriteFrame(int frame, int event_source_id)
Poco::NotificationQueue m_queue
void allFrameCallback(const DAEEventHeader *head, int event_source_id)
this will get called for each frame for EACH event source
const char * detector_format
int makeDataChunked(const char *data_name, int nxtype, const int *dims_array, const int *chunk_array, int ndims)
#define LOGSTR_WARNING(__arg)
Definition: IsisBase.h:92
static Poco::Mutex m_frame_callback_mutex
int m_current_period
frame number we are currently processing
int closeGroup()
void processDataDae(const DAEEventHeader *head, const DetectorEvent32 *det_ev, int n, int event_source_id)
int spectrumDAETR(int spec) const
Definition: isiscrpt.h:799
data_dae_t data_dae[ISISCRPT_MAX_DATADAE]
Definition: isiscrpt.h:474
unsigned protons
Definition: dae_events.h:63
std::vector< int > m_frame_period
int writeAttribute(const std::string &data_name, const std::string &attr_name, const std::string &value)
Definition: nexuswriter.h:103
SimpleStore< int_map_t > m_int_map_store
int spectrumToMonitorNumber(int spec) const
return monitor number, or 0 if not monitor
Definition: isiscrpt.h:607
DAEEventHeaderInfo info
should be DAEEventHeaderMarker
Definition: dae_events.h:48
char name[256]
Definition: isiscrpt.h:177
#define LOGSTR_INFORMATION(__arg)
Definition: IsisBase.h:78
int tcb[ISISCRPT_MAX_NTRG][ISISCRPT_MAX_TIMECHANB]
Definition: isiscrpt.h:343
const ISISCRPT_STRUCT * m_crpt
void put(T *val)
Definition: icputils.h:687
#define LOGSTR_ERROR(__arg)
Definition: IsisBase.h:99
std::map< std::string, std::vector< int > > int_map_t
struct DAEEventHeader::DAETime time
void setOffset(time_t offset)
Definition: nexuswriter.h:207
static void DAETimeToOffset(const DAEEventHeader::DAETime &daetime, time_t base, T &offset)
Definition: dae_events.cpp:226
void writeEventsImpl(const DAEEventHeader *head, int event_source_id, int nev, const int_map_t &event_id, const float_map_t &event_time_offset)
static Poco::Condition m_event_write_cond
int spectrumGroup(int spec) const
Definition: isiscrpt.h:815
static float getTimeOffset(const ISISCRPT_STRUCT *crpt, const DetectorEvent32 *det_ev, int event_source_id, int tr, int i)
std::map< std::string, int > m_det_tr
const char * detector_base
int getLink(const char *name, const char *nxclass, NXlink &link)
frame_events_write_t m_frame_events_write
unsigned bad_frame
Definition: dae_events.h:13
std::vector< int > m_good_frame
unsigned time_channel
Definition: dae_events.h:78
void write(IXNeXusFile *file, const std::string &name)
Definition: nexuswriter.h:206
uint32_t frame_number
Definition: dae_events.h:49
unsigned spectrum
Definition: dae_events.h:79
double scale
Definition: isiscrpt.h:184
Poco::LogStream * m_logstr
Definition: IsisBase.h:15
void writeEvents(const DAEEventHeader *head, int event_source_id, int nev, const int_map_t &event_id, const float_map_t &event_time_offset)
bool isEventSpectrum(int spec) const
Definition: isiscrpt.h:821
char raw_units[64]
Definition: isiscrpt.h:178
boost::array< int, ISISCRPT_MAX_NTRG > ntc
number of time channels per CRPT time regime
Definition: isiscrpt.h:346
SimpleStore< float_map_t > m_float_map_store
U sum_map_values(const std::map< T, U > &m)
Definition: icputils.h:507
void setLoggerName(const std::string &logger_name)
Definition: IsisBase.h:17
int writeData(const char *data_name, const std::vector< T > &value)
Definition: nexuswriter.h:35
int data_bytes
Definition: isiscrpt.h:183
std::vector< int > m_frame_number
std::map< int, uint32_t > m_frame_events_raw
NoFrameCallbackMode
Definition: event_store.h:20
#define PPP_TO_UAMPH
int makeLink(const char *name, const char *nxclass, NXlink &link)
T * get()
Definition: icputils.h:673
NXlogWriter< int > m_vetoing
NXlogWriter< int > m_period
int daeTR(int tr) const
Definition: isiscrpt.h:749
int getRefEventDetCard(const std::vector< int > &ev_source_ids)
void noFrameCallbackEnd(bool end_present, NoFrameCallbackMode mode, int event_source_id)
std::vector< int > m_det_id
NeXusEventCallback(const ISISCRPT_STRUCT *crpt, IXNeXusFile *file, const std::vector< int > &ev_source_ids)
LONGLONG m_monitor_events_not_saved
events discarded as outside tcb time window
std::map< std::string, uint64_t > m_events_write_total
int closeData()
Definition: nexuswriter.h:85
Poco::Thread m_worker_thread
void closeDataGroup(IXNeXusFile *nx, const std::string &name)
if name is null string, do nothing
std::map< std::string, std::vector< float > > float_map_t