ICP  1
event_store.cpp
Go to the documentation of this file.
1 #include "stdafx.h"
2 #include "event_store.h"
3 #include "dae_events.h"
4 #include "icputils.h"
5 #include "isisraw.h"
6 
7 EventStore::EventStore() : Base(), m_mempool(m_mempool_size)
8 {
9  setLoggerName("EventStore");
10  LOGSTR_DEBUG("Constructing event store");
11  m_threads.addCapacity(50);
12 }
13 
15 {
16  uint64_t n = 0;
17  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
18  {
19  n += ec.card->numRawEvents();
20  }
21  return n;
22 }
23 
25 {
26  uint64_t n = 0;
27  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
28  {
29  n += ec.card->nNewEventWords(status);
30  }
31  return n;
32 }
33 
34 void EventStore::getEventSourceIDs(std::vector<int>& ids)
35 {
36  ids.clear();
37  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
38  {
39  ids.push_back(ec.card->eventSourceID());
40  }
41 }
42 
43 void EventStore::getEnabledEventSourceIDs(std::vector<int>& ids)
44 {
45  ids.clear();
46  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
47  {
48  if (ec.enable)
49  {
50  ids.push_back(ec.card->eventSourceID());
51  }
52  }
53 }
54 
56 {
57  uint64_t n = 0;
58  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
59  {
60  n += ec.card->numGoodEvents();
61  }
62  return n;
63 }
64 
65 void EventStore::findFileInputSources(int run_number, std::set<std::string>& files)
66 {
67  files.clear();
68  std::set<std::string> files1;
69  Poco::Path search_expr(EventStore::baseDir(), "dc_*.dat");
70  search_expr.pushDirectory(Poco::format("run_%d", run_number));
71  Poco::Glob::glob(search_expr, files1, Poco::Glob::GLOB_CASELESS); // poco glob is shell like not RE like
72  BOOST_FOREACH(const std::string& s, files1) // not std::string& as we may erase it
73  {
74  if (s.back() == '/' || s.back() == '\\') // remove a trailing "/" which is present if we are using SplitFile
75  {
76  files.insert(s.substr(0,s.length()-1));
77  }
78  else
79  {
80  files.insert(s);
81  }
82  }
83 }
84 
86 {
87  Poco::RegularExpression re("dc_([0-9]+).dat", Poco::RegularExpression::RE_CASELESS);
88  Poco::RegularExpression::MatchVec matches;
89  std::string s;
90  re.match(file, 0, matches);
91  if (matches.size() == 2)
92  {
93  s = file.substr(matches[1].offset, matches[1].length);
94  return Poco::NumberParser::parse(s);
95  }
96  else
97  {
98  return -1;
99  }
100 }
101 
102 void EventStore::findUnusedFileInputSources(int run_number, std::set<std::string>& files)
103 {
104  findFileInputSources(run_number, files);
105  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
106  {
107  for(std::set<std::string>::const_iterator it = files.begin(); it != files.end(); ++it)
108  {
109  if ( !stricmp(it->c_str(), ec.card->getFileName().c_str()) )
110  {
111  files.erase(it);
112  break;
113  }
114  }
115  }
116 }
117 
119 {
120  LOGSTR_DEBUG("Adding file sources");
121  std::set<std::string> files;
122  info.zero();
123  findFileInputSources(run_number, files);
124  if (files.size() == 0)
125  {
126  LOGSTR_WARNING("No event files found for run " << run_number);
127  }
128  FileEventSourceInfo card_info;
129  BOOST_FOREACH(const std::string& f, files)
130  {
131  int ev_id = eventSourceIDFromFileInputSource(f);
132  if (ev_id != -1)
133  {
134  FileEventSource* fev = new FileEventSource(run_number, ev_id);
135  m_sources.push_back(fev);
136  fev->readInfo(card_info);
137  info.add(card_info);
138  addCard(fev, (card_info.raw_frames > 0 ? true : false) );
139  }
140  }
141 }
142 
143 void EventStore::addRawFileInputSource(ISISRAW* iraw, int source_id, time_t run_start)
144 {
145  LOGSTR_DEBUG("Adding raw file sources");
146  ISISRAWEventSource* fev = new ISISRAWEventSource(iraw, source_id, run_start);
147  m_sources.push_back(fev);
148  addCard(fev, true);
149 }
150 
151 void EventStore::addCard(DAEEventSource* det_card, bool enable)
152 {
153  LOGSTR_DEBUG("Adding source id " << det_card->eventSourceID() << " as " << (enable ? "ENABLED" : "DISABLED") );
154  EventStoreCard* ec = new EventStoreCard(this, det_card);
155  m_stores.push_back(EventStoreCardHolder(ec, enable));
156 }
157 
158 void EventStore::addCards(const std::vector<DAEEventSource*>& det_list)
159 {
160  LOGSTR_DEBUG("Adding sources");
161  BOOST_FOREACH(DAEEventSource* dc, det_list)
162  {
163  addCard(dc);
164  }
165 }
166 
167 void EventStore::setCardStatus(bool enable)
168 {
169  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
170  {
171  setCardStatus(ec.card->eventSourceID(), enable);
172  }
173 }
174 
175 void EventStore::setCardStatus(int id, bool enable)
176 {
177  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
178  {
179  if (ec.card->eventSourceID() == id)
180  {
181  ec.enable = enable;
182  }
183  }
184 }
185 
187 {
188  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
189  {
190  if (ec.enable)
191  {
192  ec.card->invokeNoFrameCallbacks(mode);
193  }
194  }
195 }
196 
198 {
199  LOGSTR_DEBUG("Starting event store");
200  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
201  {
202  if (ec.enable)
203  {
204  m_threads.start(*(ec.card), Poco::format("EventStoreCard %d", ec.card->eventSourceID()));
205  }
206  }
207 }
208 
209 void EventStore::setOutputFileName(int run_number, bool new_run, DAEstatus& status)
210 {
211  ICPTimer timer("EventStore::setOutputFileName", status);
212  LOGSTR_DEBUG("Setting output event file names for run number " << run_number);
213  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
214  {
215  ec.card->setOutputFileName(run_number, new_run, status);
216  }
217  timer.info("After setting of card sources", status);
218  // now remove and old files we are not using
219  std::set<std::string> files;
220  findUnusedFileInputSources(run_number, files);
221  BOOST_FOREACH(const std::string& file, files)
222  {
223  LOGSTR_DEBUG("Removing no longer used source " << file);
224  SplitFile::removeFile(file);
225  }
226 }
227 
229 {
230  ICPTimer timer("EventStore::clearOutputFile", status);
231  LOGSTR_DEBUG("Clearing output files");
232  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
233  {
234  ec.card->clearOutputFile(status);
235  }
236 }
237 
238 void EventStore::closeAndDeleteOutputFile(int run_number, DAEstatus& status)
239 {
240  stop(true);
241  closeOutputFile(status);
242  deleteOutputFile(run_number, status);
243 }
244 
246 {
247  stop(false);
248  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
249  {
250  ec.card->closeFiles();
251  }
252 }
253 
254 void EventStore::deleteOutputFile(int run_number, DAEstatus& status)
255 {
256  // we loop round files in case some are in use, so hopefully get most of them to save disk space
257  std::set<std::string> files;
258  findFileInputSources(run_number, files);
259  BOOST_FOREACH(const std::string& file, files)
260  {
261  SplitFile::removeFile(file);
262  }
263  // clean up anything else
264  Poco::File run_dir(baseRunDir(run_number));
265  if (run_dir.exists())
266  {
267  try
268  {
269  run_dir.remove(true);
270  }
271  catch(const std::exception& ex)
272  {
273  status.addInfoVa(FAC_DAE, "Unable to remove all files in event directory of run %d: %s", run_number, ex.what());
274  }
275  }
276 }
277 
278 void EventStore::stop(bool immediate)
279 {
280  LOGSTR_DEBUG("Stopping event store");
281  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
282  {
283  ec.card->stop(immediate); // request cards to stop when finished current work
284  }
285  m_threads.joinAll(); // wait for above to happen
286 }
287 
289 {
290  bool end_seen = true;
291  unsigned nseen = 0, nchecked = 0;
292 // LOGSTR_TRACE("Checking for end header");
293  BOOST_FOREACH(const EventStoreCardHolder& ec, m_stores)
294  {
295  if (ec.enable)
296  {
297  ++nchecked;
298  if ( ec.card->endHeaderSeen() )
299  {
300  ++nseen;
301  }
302  else
303  {
304  end_seen = false;
305  }
306  }
307  }
308  if (nseen > 0)
309  {
310  LOGSTR_DEBUG("Checking for end header - seen " << nseen << "/" << nchecked);
311  }
312  return end_seen;
313 }
314 
315 template<typename SLOT>
316 std::list<boost::signals2::connection> EventStore::addCallback(boost::signals2::connection (EventStoreCard::*func)(const SLOT&), const SLOT& slot)
317 {
318  std::list<boost::signals2::connection> conn_list;
319  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
320  {
321  conn_list.push_back((ec.card->*func)(slot));
322  }
323  return conn_list;
324 }
325 
326 std::list<boost::signals2::connection> EventStore::addGoodFrameCallback(const FrameCallbackSlotType& slot)
327 {
328  LOGSTR_DEBUG("Adding GOOD frame callback");
330 }
331 
332 std::list<boost::signals2::connection> EventStore::addAllFrameCallback(const FrameCallbackSlotType& slot)
333 {
334  LOGSTR_DEBUG("Adding ALL frame callback");
336 }
337 
338 std::list<boost::signals2::connection> EventStore::addNoFrameCallback(const NoFrameCallbackSlotType& slot)
339 {
340  LOGSTR_DEBUG("Adding NO frame callback");
342 }
343 
344 std::list<boost::signals2::connection> EventStore::addGoodEventCallback(const EventCallbackSlotType& slot)
345 {
346  LOGSTR_DEBUG("Adding GOOD event callback");
348 }
349 
350 std::list<boost::signals2::connection> EventStore::addAllEventCallback(const EventCallbackSlotType& slot)
351 {
352  LOGSTR_DEBUG("Adding ALL event callback");
354 }
355 
357 {
358  LOGSTR_DEBUG("Destructor");
359  stop(false);
360  clear();
361 }
362 
364 {
366  clearSources();
367  clearStores();
368 }
369 
371 {
372  BOOST_FOREACH(FileEventSourceBase* es, m_sources)
373  {
374  delete es;
375  }
376  m_sources.clear();
377 }
378 
380 {
381  BOOST_FOREACH(EventStoreCardHolder& ec, m_stores)
382  {
383  delete ec.card;
384  }
385  m_stores.clear();
386 }
387 
388 
389 const int EventStore::m_mempool_size = 32 * 1024 * 1024;
390 
392 {
393  return reinterpret_cast<isisU32_t*>(m_mempool.get());
394 }
395 
397 {
398  m_mempool.release(buffer);
399 }
400 
401 #define THROW_RETURN_NE(__comm, __val, __mess) \
402  if ( (__comm) != (__val) ) \
403  { \
404  throw std::runtime_error(__mess); \
405  }
406 
407 EventStoreCard::EventStoreCard(EventStore* es, DAEEventSource* dc) : Base(), m_store(es), m_dc(dc),
408  m_run_number(-1), m_keep_going(true), m_immediate_stop(false), m_len_buffer(0), m_buffer(NULL)
409 {
410  m_data.newFile();
411  setLoggerName("EventStoreCard");
412  LOGSTR_DEBUG("Constructing event store card for source id " << m_dc->eventSourceID());
413 }
414 
415 
416 std::string EventStoreCard::fileNameForDC(int run_number, int dc_number)
417 {
418  std::string file_name;
419  Poco::format(file_name, "dc_%d.dat", dc_number);
420  Poco::Path p(EventStore::baseDir(), file_name);
421  p.pushDirectory(Poco::format("run_%d", run_number));
422  return p.toString();
423 }
424 
425 std::string EventStoreCard::infoFileNameForDC(int run_number, int dc_number)
426 {
427  std::string info_file_name;
428  Poco::format(info_file_name, "dc_%d.info", dc_number);
429  Poco::Path p(EventStore::baseDir(), info_file_name);
430  p.pushDirectory(Poco::format("run_%d", run_number));
431  return p.toString();
432 }
433 
434 void EventStoreCard::getInfoFileData(const std::string& file_name, InfoFileData& info)
435 {
436  if ( atomic_read(file_name, ".old", &info, 1) != 0 )
437  {
438  info.newFile();
439  }
440 // need to check the real event file size not the info file size
441 // else if ( (file_size_bytes(file_name) / sizeof(uint32_t)) < info.total )
442 // {
443 // info.total = 0;
444 // LOGSTR_WARNING("File " << file_name << " is short ");
445 // }
446 }
447 
449 {
450  if (m_keep_going)
451  {
452  LOGSTR_WARNING("Clearing output file while running");
453  }
454  LOGSTR_DEBUG("Clearing output for source id " << m_dc->eventSourceID());
455  m_file.seek(0, SEEK_SET);
457  m_data.newFile();
458  updateFilePosition(status);
459 }
460 
461 
462 void EventStoreCard::setOutputFileName(int run_number, bool new_run, DAEstatus& status)
463 {
464  LOGSTR_DEBUG("Setting output run number to " << run_number);
465  if (m_run_number == run_number && m_file.isOpen()) // is it already open?
466  {
467  LOGSTR_DEBUG("file already open");
468  }
469  else
470  {
471  m_run_number = run_number;
472  std::string file_name, info_file_name;
473  m_file_name = fileNameForDC(run_number, m_dc->eventSourceID());
474  Poco::Path dir_name(m_file_name);
475  dir_name.setFileName("");
476  Poco::File(dir_name).createDirectories();
478  LOGSTR_DEBUG("Using event file path " << m_file_name);
479  if (m_file.isOpen())
480  {
481  m_file.close();
482  }
483  m_file.open(m_file_name.c_str(), IFile::ReadWriteCreate, _SH_DENYWR, true);
484  if (!m_file.isOpen())
485  {
486  throw std::runtime_error("fopen");
487  }
489  }
490  isisU32_t last_address = m_dc->getLastAddressRead();
491  if (new_run)
492  {
493  clearOutputFile(status);
495  m_data.last_address_read = last_address;
496  }
497  else
498  {
499  if (last_address != m_data.last_address_read)
500  {
501  LOGSTR_WARNING("Mismatch in last_address_read for source id " << m_dc->eventSourceID() << ": DAE 0x" << std::hex << last_address << " != ICP 0x" << m_data.last_address_read << std::dec);
502  }
503  // if is possible the program may have been terminated during updateFilePosition() after writing
504  // last_address to the info file but before calling updateDAEEventLastRead()
505  // this is to sync the info file to the dae
507  }
508  m_dc->updateDAEEventLastRead(status);
509  if (m_file.seek(m_data.file_words * 4, SEEK_SET) != 0)
510  {
511  LOGSTR_ERROR("Seek failed for " << m_info_file_name);
512  m_data.newFile();
513  }
514  LOGSTR_INFORMATION("Using info file " << m_info_file_name << " words " << m_data.file_words << " LF=" << m_data.last_frame << " GF=" << m_data.good_frames);
515  updateFilePosition(status);
516 }
517 
519 {
520  if (m_file.isOpen())
521  {
523  LOGSTR_DEBUG("Updating file position to " << m_data.file_words << " for " << m_file_name);
524  if (atomic_write(m_info_file_name, ".old", ".new", &m_data, 1) != 0)
525  {
526  throw std::runtime_error("atomic_write");
527  }
528  m_dc->updateDAEEventLastRead(status);
529  }
530 }
531 
532 Poco::Path EventStore::m_base_dir;
533 
535 {
536  DAEstatus status;
537  uint64_t num_unread;
538  m_keep_going = true;
539  m_immediate_stop = false;
540  LOGSTR_DEBUG("Starting event thread for " << m_file_name);
541  while(m_keep_going)
542  {
545  if (num_unread > 0)
546  {
547  LOGSTR_DEBUG("words remaining " << num_unread);
548  }
549  if (m_len_buffer > 0)
550  {
551  Poco::RunnableAdapter<EventStoreCard> histogram_events(*this, &EventStoreCard::checkAndHistogramEvents);
552  m_threads.start(histogram_events, "histogram_events");
553  flushEvents(status);
554  m_threads.joinAll();
555  }
556  else
557  {
559 // LOGSTR_DEBUG("No data - end present " << m_end_header_present);
560  }
561  updateFilePosition(status); // need to do this even as no data so end_header_present is written
563  if (m_len_buffer == 0)
564  {
565  Poco::Thread::sleep( m_data.end_header_present ? 2000 : 200 );
566  }
567  Poco::Thread::yield();
568  }
569  num_unread = m_dc->nNewEventWords(status);
570  if (num_unread > 0)
571  {
572  LOGSTR_WARNING("Terminating event thread for " << m_file_name << " with " << num_unread << " event words remaining");
573  }
574  invokeNoFrameCallbacks(NoFrameStop); // to indicate end of data stream
575 }
576 
578 {
579  return m_dc->nNewEventWords(status);
580 }
581 
583 {
584  m_no_frame_callbacks(m_data.end_header_present, mode, m_dc->eventSourceID()); // to indicate end of data stream
585 }
586 
588 {
589  if (m_file.isOpen())
590  {
591  LOGSTR_DEBUG("Flushing " << m_len_buffer << " words to " << m_file_name);
593  {
594  throw std::runtime_error("fwrite");
595  }
596  if (m_file.flush() != 0)
597  {
598  throw std::runtime_error("flush");
599  }
601  }
602 }
603 
608 {
609  int fnum = m_data.last_frame + 1;
610  if ( head.frame_number != fnum )
611  {
612  head.frame_number = fnum;
613  return true;
614  }
615  else
616  {
617  return false;
618  }
619 }
620 
622 {
623  head.info.bad_frame = 0;
624  head.period = 0;
625  // head.protons , head.time ?
626 }
627 
629 {
630  LOGSTR_DEBUG("Checking events for " << m_file_name << " num_words = " << m_len_buffer);
631  DAEstatus status;
632  static bool frameworkaround_default = Poco::Util::Application::instance().config().getBool("isisicp.frameworkaround", false);
633 // static const std::string workaround_string(Poco::format("isisicp.frameworkaround.card%d", m_dc->eventSourceID()));
634  static bool frameworkaround = frameworkaround_default; // Poco::Util::Application::instance().config().getBool(workaround_string, frameworkaround_default);
635  static bool badheaderworkaround = Poco::Util::Application::instance().config().getBool("isisicp.badheaderworkaround", false);
636  unsigned n;
637  int i, num_words = 0; // num words of buffer read so far
638  std::string error_message;
639  unsigned num_frames_fixed = 0;
640  while(!m_immediate_stop && num_words < m_len_buffer)
641  {
642  if (m_data.header_words < DAE_EVENT_HEADER_WORDS) // incomplete or no current header
643  {
644  n = std::min(DAE_EVENT_HEADER_WORDS - m_data.header_words, m_len_buffer - num_words);
645  memcpy(reinterpret_cast<uint32_t*>(&m_data.header) + m_data.header_words, m_buffer + num_words, n * sizeof(uint32_t));
646  num_words += n;
647  m_data.header_words += n;
648  m_data.num_events = 0;
649  }
651  {
652  LOGSTR_DEBUG("incomplete header " << m_data.header_words << " - waiting for more data");
653  break;
654  }
655  if ( frameworkaround && fixFrameHeader(m_data.header) )
656  {
657  ++num_frames_fixed;
658  }
659  if (badheaderworkaround)
660  {
662  }
663  if ( (i = DAEEventList::checkEventHeader(&m_data.header, error_message)) != 0 )
664  {
665  LOGSTR_WARNING("Event header invalid: " << error_message);
667  m_data.header_words = 0; // so it will get completely re-read next time
668  n = DAEEventList::findValidEventHeaderOffset(m_buffer + num_words, m_len_buffer - num_words, status);
669  if (n == -1)
670  {
671  LOGSTR_WARNING("Cannot find header in buffer");
672  break;
673  }
674  else
675  {
676  num_words += n;
677  LOGSTR_WARNING("Skipped " << n << " words and found header at offset " << num_words);
678  continue;
679  }
680  }
681  int frame_diff = m_data.header.frame_number - m_data.last_frame;
682  if ( frame_diff != 1 )
683  {
684  // we have skipped a frame somewhere? it may go backwards if we begin a new run
685  LOGSTR_WARNING("skipped frame? found " << m_data.header.frame_number << " expected " << m_data.last_frame + 1);
686  }
687  n = std::min(m_data.header.num_events - m_data.num_events, m_len_buffer - num_words);
688  if (n >= 0)
689  {
690  for(i=0; i<n; ++i)
691  {
692  if (m_buffer[num_words+i] == DAEEventHeaderFixedMarker)
693  {
694  LOGSTR_WARNING("found dae marker at wrong place");
695  }
696  }
697  DetectorEvent32* ev = reinterpret_cast<DetectorEvent32*>(m_buffer + num_words);
699  m_data.raw_events += n;
700  if (m_data.header.info.bad_frame == 0)
701  {
703  m_data.good_events += n;
704  }
705  num_words += n;
706  m_data.num_events += n;
707  }
708  if (m_data.num_events == m_data.header.num_events) // is frame complete?
709  {
710  if (!DAEEventList::isEndRunHeader(&(m_data.header), status))
711  {
712  ++(m_data.raw_frames);
714  if (m_data.header.info.bad_frame == 0)
715  {
716  ++(m_data.good_frames);
718  }
720  }
721  else
722  {
724  }
725  m_data.header_words = 0;
726 // LOGSTR_TRACE("frame " << m_last_frame << " complete");
727  }
728  }
729  if ( !m_immediate_stop && (num_words != m_len_buffer) )
730  {
731  LOGSTR_WARNING("buffer not fully used - remaining words = " << (m_len_buffer - num_words) );
732  //throw std::runtime_error("buffer not fully used");
733  }
734  if ( num_frames_fixed > 0 )
735  {
736  LOGSTR_INFORMATION("Frame number in event header was fixed " << num_frames_fixed << " times");
737  }
738 }
739 
741 {
742  LOGSTR_DEBUG("Destructor called");
743  closeFiles();
744 }
745 
747 {
748  LOGSTR_DEBUG("closeFiles called");
749  if (m_file.isOpen())
750  {
751  m_file.close();
752 // m_run_number = -1;
753 // m_file_name = "";
754 // m_info_file_name = "";
755  }
756 }
757 
758 boost::signals2::connection EventStoreCard::addGoodEventCallback(const EventCallbackSlotType& slot)
759 {
760  LOGSTR_DEBUG("Adding GOOD event callback");
761  return m_good_event_callbacks.connect(slot);
762 }
763 
764 boost::signals2::connection EventStoreCard::addAllEventCallback(const EventCallbackSlotType& slot)
765 {
766  LOGSTR_DEBUG("Adding ALL event callback");
767  return m_good_event_callbacks.connect(slot);
768 }
769 
770 boost::signals2::connection EventStoreCard::addGoodFrameCallback(const FrameCallbackSlotType& slot)
771 {
772  LOGSTR_DEBUG("Adding GOOD frame callback");
773  return m_good_frame_callbacks.connect(slot);
774 }
775 
776 boost::signals2::connection EventStoreCard::addAllFrameCallback(const FrameCallbackSlotType& slot)
777 {
778  LOGSTR_DEBUG("Adding ALL frame callback");
779  return m_all_frame_callbacks.connect(slot);
780 }
781 
782 boost::signals2::connection EventStoreCard::addNoFrameCallback(const NoFrameCallbackSlotType& slot)
783 {
784  LOGSTR_DEBUG("Adding NO frame callback");
785  return m_no_frame_callbacks.connect(slot);
786 }
787 
788 
789 FileEventSource::FileEventSource(int run_number, int source_id) : FileEventSourceBase(), m_source_id(source_id), m_file_words(0)
790 {
791  setLoggerName("FileEventSource");
792  m_file_name = EventStoreCard::fileNameForDC(run_number, source_id);
793  m_info_file_name = EventStoreCard::infoFileNameForDC(run_number, source_id);
794  m_file.open(m_file_name.c_str(), IFile::ReadOnly, _SH_DENYNO);
795  if (!m_file.isOpen())
796  {
797  throw std::runtime_error("fopen");
798  }
800 }
801 
803 {
804  if (m_file.isOpen())
805  {
806  m_file.close();
807  }
808 }
809 
810 uint32_t FileEventSource::getNewEvents(isisU32_t* buffer, uint32_t maxlen, uint64_t& num_unread, bool& run_ended, DAEstatus& status)
811 {
812  uint64_t new_words = nNewEventWords(status);
813  uint32_t num_to_read = 0;
814  size_t nread = 0;
815  run_ended = false;
816  if (new_words == 0)
817  {
818 // LOGSTR_DEBUG("No events - assuming end");
819  run_ended = true;
820  }
821  if (new_words > maxlen)
822  {
823  num_unread = new_words - maxlen;
824  num_to_read = maxlen;
825  }
826  else
827  {
828  num_unread = 0;
829  num_to_read = static_cast<uint32_t>(new_words); // as maxlen is uint32_t this can never overflow
830  }
831  if (num_to_read > 0)
832  {
833  nread = m_file.read(buffer, sizeof(uint32_t), num_to_read);
834  if (nread != num_to_read)
835  {
836  LOGSTR_ERROR("Only read " << nread << " out of " << num_to_read << " requested words");
837  //throw std::runtime_error("getNewEvents");
838  }
839  if (nread == static_cast<size_t>(-1))
840  {
841  nread = 0;
842  }
843  }
844  m_file_words += num_to_read;
845  return nread;
846 }
847 
849 {
850  return m_info.file_words - m_file_words;
851 }
852 
854 {
855 }
856 
858 {
859  return m_source_id;
860 }
861 
863 {
864  return NULL;
865 }
866 
868 {
869  return 0;
870 }
871 
873 {
874 }
875 
877 {
882 }
883 
884 ISISRAWEventSource::ISISRAWEventSource(ISISRAW* iraw, int source_id, time_t run_start) : FileEventSourceBase(), m_file(iraw),
885  m_source_id(source_id), m_events(0), m_events_sent(0), m_pos(0), m_pos_sent(0), m_last_frame(-1)
886 {
887  setLoggerName("ISISRAWEventSource");
888  DAEstatus status;
891  m_run_start = run_start;
892  int i, j, k, frame;
893  int ndet = m_file->i_det;
894  int nper = m_file->t_nper;
895  int nsp1 = m_file->t_nsp1;
896  int ntc1 = m_file->t_ntc1;
897  isisU32_t* dat1 = m_file->dat1;
898  int persize = (nsp1 + 1) * (ntc1 + 1);
899  int maxcrate = 0;
900  for(i=0; i < ndet; i++)
901  {
902  maxcrate = std::max(maxcrate, m_file->crat[i]);
903  }
904  int maxspec = nsp1 + 1;
905  int* last_card_spectrum = new int[maxcrate+1];
906  memset(last_card_spectrum, 0, (maxcrate+1) * sizeof(int));
907  m_spec_lookup = new int[maxspec];
908  memset(m_spec_lookup, 0, maxspec * sizeof(int));
909  m_card_lookup = new int[maxspec];
910  memset(m_card_lookup, 0, maxspec * sizeof(int));
911  int card, spec, offset;
912  // we need to pick only one card for all spectrum 0 events
913  m_card_lookup[0] = maxcrate;
914  for(i=0; i < ndet; i++)
915  {
916  card = m_file->crat[i];
917  spec = m_file->spec[i];
918  if (spec > 0)
919  {
920  m_card_lookup[spec] = card;
921  }
922  }
923  // dae2 card spectra are assigned in ascending order to dae1 spectra on the card
924  m_spec_lookup[0] = 0;
925  for(i=1; i < maxspec; ++i)
926  {
927  card = m_card_lookup[i];
928  m_spec_lookup[i] = ++(last_card_spectrum[card]);
929  }
930  delete []last_card_spectrum;
931  for(i=0; i<nper; ++i)
932  {
933  for(j=0; j<nsp1+1; ++j)
934  {
935  if (m_card_lookup[j] == m_source_id)
936  {
937  for(k=0; k<ntc1+1; ++k)
938  {
939  offset = k + j*(ntc1+1) + i*persize;
940  m_events += dat1[offset];
941  }
942  }
943  }
944  }
946  for(i=1; i <= m_events; ++i)
947  {
948  ++(m_frame_events[getFrame(i)]);
949  }
950  LOGSTR_INFORMATION("Found " << m_events << " events on card");
951 // LOGSTR_INFORMATION("dae1 spectrum 1 on card " << m_card_lookup[1] << " dae2 spec " << m_spec_lookup[1]);
952 // LOGSTR_INFORMATION("dae1 spectrum 20 on card " << m_card_lookup[20] << " dae2 spec " << m_spec_lookup[20]);
953 }
954 
956 {
957 }
958 
959 uint32_t ISISRAWEventSource::getNewEvents(isisU32_t* buffer, uint32_t maxlen, uint64_t& num_unread, bool& run_ended, DAEstatus& status)
960 {
961  uint64_t new_words = nNewEventWords(status);
962  run_ended = false;
963  if (new_words == 0)
964  {
965 // LOGSTR_DEBUG("No events - assuming end");
966  run_ended = true;
967  return 0;
968  }
969  else
970  {
971  return readEvents(buffer, maxlen, num_unread);
972  }
973 }
974 
976 {
977  return m_events - m_events_sent;
978 }
979 
981 {
982 }
983 
985 {
986  return m_source_id;
987 }
988 
990 {
991  return NULL;
992 }
993 
995 {
996  return 0;
997 }
998 
1000 {
1001 }
1002 
1004 {
1005  info.raw_frames = m_good_frames;
1006  info.good_frames = m_good_frames;
1007  info.good_events = m_events;
1008  info.raw_events = m_events;
1009 }
1010 
1011 
1012 int ISISRAWEventSource::readEvents(isisU32_t* buffer, uint32_t max_num_to_read, uint64_t& num_unread)
1013 {
1014  DetectorEvent32 ev;
1015  int num_sent = 0;
1016  int spec;
1017  int ntc1 = m_file->t_ntc1;
1018  int current_frame;
1019  while(num_sent < max_num_to_read)
1020  {
1021  if ( m_pos >= (m_file->t_nsp1+1) * (m_file->t_ntc1+1) )
1022  {
1023  break;
1024  }
1025  spec = m_pos / (ntc1 + 1);
1026  if ( (m_card_lookup[spec] != m_source_id) || ((m_file->dat1[m_pos] - m_pos_sent) == 0) )
1027  {
1028  ++m_pos;
1029  m_pos_sent = 0;
1030  continue;
1031  }
1032  current_frame = getFrame(m_events_sent+1); // frame for next event, hence +1
1033  if (current_frame > m_last_frame)
1034  {
1035  if ( (max_num_to_read - num_sent) < DAE_EVENT_HEADER_WORDS )
1036  {
1037  break;
1038  }
1039  sendHeader(buffer+num_sent, current_frame);
1040  num_sent += DAE_EVENT_HEADER_WORDS;
1041  m_last_frame = current_frame;
1042  continue; // we need to go back round and check we have space for events after header
1043  }
1044  ev.spectrum = m_spec_lookup[spec];
1045  ev.time_channel = m_pos % (ntc1 + 1);
1046  memcpy(buffer+num_sent, reinterpret_cast<isisU32_t*>(&ev), sizeof(ev));
1047  ++m_pos_sent;
1048  ++m_events_sent;
1049  ++num_sent;
1050  }
1051  num_unread = m_events - m_events_sent;
1052  return num_sent;
1053 }
1054 
1055 #define PPP_TO_UAMPH 1.738E-6
1056 
1057 int ISISRAWEventSource::sendHeader(isisU32_t* buffer, int current_frame)
1058 {
1059  DAEEventHeader header;
1060  const int isis_frequency = 10; // in Hz - 10 means TS2, 50 means TS1
1061  memset(&header, 0 , sizeof(header));
1062  header.frame_number = current_frame;
1063  header.marker0 = static_cast<uint32_t>(DAEEventHeaderFixedMarker);
1064  header.marker1 = static_cast<uint32_t>(AllValuesFilledInMarker);
1066  header.period = 0;
1067  header.num_events = m_frame_events[current_frame];
1068  header.protons = m_good_ua / ((float)m_good_frames * PPP_TO_UAMPH);
1069  DAEEventList::OffsetToDAETime(m_run_start, (double)current_frame / (double)isis_frequency, header.time);
1070  header.info.header_type = 0;
1071  memcpy(buffer, reinterpret_cast<isisU32_t*>(&header), DAE_EVENT_HEADER_WORDS * 4);
1072  return 0;
1073 }
1074 
1076 int ISISRAWEventSource::getFrame(uint32_t event_count)
1077 {
1078  int current_frame = floor((double)m_good_frames * (double)(event_count-1) / (double)m_events);
1079  if (current_frame >= m_good_frames)
1080  {
1081  current_frame = m_good_frames - 1;
1082  }
1083  return current_frame;
1084 }
1085 
void flushEvents(DAEstatus &status)
const int * getDAE2SpecMap()
SplitFile m_file
Definition: event_store.h:214
uint64_t numGoodEvents()
Definition: event_store.cpp:55
void setLastAddressRead(isisU32_t addr)
static std::string fileNameForDC(int run_number, int dc_number)
#define FAC_DAE
uint32_t bufferSize()
Definition: event_store.h:82
InfoFileData m_data
Definition: event_store.h:199
void clearAllCallbacks()
Definition: event_store.h:49
isisU32_t * getBuffer()
NoFrameCallbackType m_no_frame_callbacks
Definition: event_store.h:221
unsigned header_type
Definition: dae_events.h:10
DAEEventSource * m_dc
Definition: event_store.h:211
boost::signals2::connection addAllEventCallback(const EventCallbackSlotType &slot)
int r_goodfrm
Definition: isisraw.h:58
static void printEventHeader(const DAEEventHeader *head, std::ostream &os)
Definition: dae_events.cpp:279
std::list< boost::signals2::connection > addGoodFrameCallback(const FrameCallbackSlotType &slot)
uint32_t num_events
Definition: dae_events.h:60
int seek(int64_t offset, int whence)
Definition: SplitFile.cpp:154
size_t write(const void *data, size_t element_size, size_t nelement)
Definition: SplitFile.cpp:56
static std::string infoFileNameForDC(int run_number, int dc_number)
boost::signals2::connection addAllFrameCallback(const FrameCallbackSlotType &slot)
uint32_t raw_frames
total number of raw frames in file
Definition: event_store.h:148
static int eventSourceIDFromFileInputSource(const std::string &file)
Definition: event_store.cpp:85
uint32_t m_pos_sent
number of events sent from pos
Definition: event_store.h:277
int atomic_read(const std::string &filename, const std::string &old_suffix, T *data, size_t n)
Definition: icputils.h:433
uint32_t marker1
should always be DAEEventHeaderFixedMarker
Definition: dae_events.h:47
virtual const int * getDAE2SpecMap()=0
Poco::MemoryPool m_mempool
Definition: event_store.h:31
uint64_t numRawEvents()
Definition: event_store.cpp:14
virtual int eventSourceID() const =0
void newFile()
Definition: event_store.h:151
fixed header marker for DAEEventHeader
Definition: dae_events.h:44
float r_gd_prtn_chrg
Definition: isisraw.h:56
uint64_t nNewEventWords(DAEstatus &status)
int eventSourceID() const
isisU32_t * m_buffer
buffer containing new events from detector card
Definition: event_store.h:212
NoFrameCallbackType::slot_type NoFrameCallbackSlotType
Definition: event_store.h:22
unsigned long isisU32_t
Definition: isisvme_types.h:8
uint32_t m_events_sent
events sent to date
Definition: event_store.h:275
Poco::ThreadPool m_threads
Definition: event_store.h:209
unsigned period
Definition: dae_events.h:57
std::vector< FileEventSourceBase * > m_sources
Definition: event_store.h:33
std::string getFileName()
Definition: event_store.h:187
static const int DAEEventHeaderFixedMarker
Definition: dae_events.h:40
void releaseBuffer(isisU32_t *buffer)
static void OffsetToDAETime(time_t base, double offset, DAEEventHeader::DAETime &daetime)
Definition: dae_events.cpp:238
uint64_t m_file_words
Definition: event_store.h:242
int flush()
Definition: SplitFile.cpp:238
#define LOGSTR_DEBUG(__arg)
Definition: IsisBase.h:71
void clear()
InfoFileData m_info
Definition: event_store.h:243
void readInfo(EventStore::FileEventSourceInfo &info)
int close()
Definition: SplitFile.cpp:135
unsigned header_length
Definition: dae_events.h:11
std::string m_info_file_name
Definition: event_store.h:241
uint32_t getNewEvents(isisU32_t *buffer, uint32_t maxlen, uint64_t &num_unread, bool &run_ended, DAEstatus &status)
void getEnabledEventSourceIDs(std::vector< int > &ids)
Definition: event_store.cpp:43
void setOutputFileName(int run_number, bool new_run, DAEstatus &status)
virtual uint32_t getNewEvents(isisU32_t *buffer, uint32_t maxlen, uint64_t &num_unread, bool &run_ended, DAEstatus &status)=0
static Poco::Path baseRunDir(int run_number)
Definition: event_store.h:118
void setCardStatus(int id, bool enable)
uint32_t last_frame
offset to start of last frame in words
Definition: event_store.h:144
void addCard(DAEEventSource *det_card, bool enable=false)
uint32_t last_address_read
detector card address read which corresponds to file_words
Definition: event_store.h:149
#define LOGSTR_WARNING(__arg)
Definition: IsisBase.h:92
virtual ~ISISRAWEventSource()
int eventSourceID() const
ISISRAWEventSource(ISISRAW *iraw, int source_id, time_t run_start)
boost::signals2::connection addNoFrameCallback(const NoFrameCallbackSlotType &slot)
void invokeNoFrameCallbacks(NoFrameCallbackMode mode)
std::string m_file_name
Definition: event_store.h:240
void closeOutputFile(DAEstatus &status)
unsigned protons
Definition: dae_events.h:63
void stop(bool immediate)
size_t read(void *data, size_t element_size, size_t nelement)
Definition: SplitFile.cpp:85
std::vector< EventStoreCardHolder > m_stores
Definition: event_store.h:42
uint64_t raw_events
total number of raw events in file
Definition: event_store.h:142
void add(const FileEventSourceInfo &f)
Definition: event_store.h:70
static const int m_mempool_size
Definition: event_store.h:30
int t_nper
Definition: isisraw.h:299
uint64_t nNewEventWords(DAEstatus &status)
Definition: event_store.cpp:24
DAEEventHeaderInfo info
should be DAEEventHeaderMarker
Definition: dae_events.h:48
uint64_t good_events
total number of good events in file
Definition: event_store.h:141
bool end_header_present
Definition: event_store.h:150
void checkAndHistogramEvents()
void fixBadFrameHeader(DAEEventHeader &head)
DAEEventHeader header
header of current frame being processed
Definition: event_store.h:143
static int findValidEventHeaderOffset(const isisU32_t *buffer, uint32_t len, DAEstatus &status)
returns index of start of a valid event header, -1 on error
Definition: dae_events.cpp:111
std::list< boost::signals2::connection > addAllFrameCallback(const FrameCallbackSlotType &slot)
int i_det
Definition: isisraw.h:269
int sendHeader(isisU32_t *buffer, int current_frame)
std::list< boost::signals2::connection > addGoodEventCallback(const EventCallbackSlotType &slot)
#define LOGSTR_INFORMATION(__arg)
Definition: IsisBase.h:78
bool endHeaderSeen()
uint32_t marker0
Definition: dae_events.h:46
isisU32_t * dat1
Definition: isisraw.h:317
virtual void updateDAEEventLastRead(DAEstatus &status)=0
void invokeNoFrameCallbacks(NoFrameCallbackMode mode)
FrameCallbackType m_good_frame_callbacks
Definition: event_store.h:219
int readEvents(isisU32_t *buffer, uint32_t max_num_to_read, uint64_t &num_unread)
#define LOGSTR_ERROR(__arg)
Definition: IsisBase.h:99
void stop(bool immediate)
request stop after processing finished
Definition: event_store.h:178
struct DAEEventHeader::DAETime time
uint64_t numGoodEvents()
Definition: event_store.h:193
const int * getDAE2SpecMap()
uint64_t nNewEventWords(DAEstatus &status)
uint32_t m_len_buffer
number of words in m_buffer
Definition: event_store.h:213
double info(const char *title, std::ostream &os, bool add_nl=true)
Definition: icptimer.h:83
isisU32_t getLastAddressRead()
#define PPP_TO_UAMPH
virtual ~FileEventSource()
void setLastAddressRead(isisU32_t addr)
bool fixFrameHeader(DAEEventHeader &head)
static void removeFile(const std::string &path)
Definition: SplitFile.cpp:212
void closeAndDeleteOutputFile(int run_number, DAEstatus &status)
static void deleteOutputFile(int run_number, DAEstatus &status)
int t_nsp1
Definition: isisraw.h:301
virtual void setLastAddressRead(isisU32_t addr)=0
bool m_immediate_stop
Definition: event_store.h:201
std::string m_info_file_name
Definition: event_store.h:216
int getFrame(uint32_t event_count)
find out what frame to put event number event_count into - first frame is 0
std::list< boost::signals2::connection > addCallback(boost::signals2::connection(EventStoreCard::*func)(const SLOT &), const SLOT &slot)
void clearStores()
RPB_STRUCT rpb
Definition: isisraw.h:264
uint32_t m_events
total events to send
Definition: event_store.h:270
void setOutputFileName(int run_number, bool new_run, DAEstatus &status)
unsigned bad_frame
Definition: dae_events.h:13
unsigned time_channel
Definition: dae_events.h:78
void updateFilePosition(DAEstatus &status)
int * spec
Definition: isisraw.h:275
int eventSourceID() const
Definition: event_store.h:185
uint32_t frame_number
Definition: dae_events.h:49
static const unsigned DAE_EVENT_HEADER_WORDS
Definition: dae_events.h:102
uint32_t good_frames
total number of good frames in file
Definition: event_store.h:147
unsigned spectrum
Definition: dae_events.h:79
boost::signals2::connection addGoodEventCallback(const EventCallbackSlotType &slot)
void findUnusedFileInputSources(int run_number, std::set< std::string > &files)
static void getInfoFileData(const std::string &file_name, InfoFileData &info)
Poco::LogStream * m_logstr
Definition: IsisBase.h:15
int t_ntc1
Definition: isisraw.h:302
static bool isEndRunHeader(const DAEEventHeader *head, DAEstatus &status)
check whether an event frame header is a special enf of frame header
Definition: dae_events.cpp:179
Poco::ThreadPool m_threads
Definition: event_store.h:32
EventCallbackType m_all_event_callbacks
Definition: event_store.h:218
static const Poco::Path & baseDir()
Definition: event_store.h:109
uint64_t file_words
size of all data in file in words
Definition: event_store.h:140
void clearOutputFile(DAEstatus &status)
void clearSources()
std::list< boost::signals2::connection > addNoFrameCallback(const NoFrameCallbackSlotType &slot)
boost::signals2::connection addGoodFrameCallback(const FrameCallbackSlotType &slot)
FrameCallbackType::slot_type FrameCallbackSlotType
Definition: event_store.h:18
static Poco::Path m_base_dir
Definition: event_store.h:43
SplitFile m_file
Definition: event_store.h:245
void setLoggerName(const std::string &logger_name)
Definition: IsisBase.h:17
FrameCallbackType m_all_frame_callbacks
Definition: event_store.h:220
NoFrameCallbackMode
Definition: event_store.h:20
void addCards(const std::vector< DAEEventSource * > &det_list)
uint32_t m_pos
current location in raw data
Definition: event_store.h:276
int open(const std::string &path, IFile::Mode mode, int shflag)
Definition: SplitFile.h:24
static void findFileInputSources(int run_number, std::set< std::string > &files)
Definition: event_store.cpp:65
uint64_t nNewEventWords(DAEstatus &status)
void updateDAEEventLastRead(DAEstatus &status)
std::string m_file_name
Definition: event_store.h:215
uint32_t getNewEvents(isisU32_t *buffer, uint32_t maxlen, uint64_t &num_unread, bool &run_ended, DAEstatus &status)
void addRawFileInputSource(ISISRAW *iraw, int source_id, time_t run_start)
void updateDAEEventLastRead(DAEstatus &status)
EventStore * m_store
Definition: event_store.h:210
std::vector< int > m_frame_events
Definition: event_store.h:282
virtual uint64_t nNewEventWords(DAEstatus &status)=0
uint32_t num_events
total number of events read in frame m_header
Definition: event_store.h:145
void start()
EventCallbackType::slot_type EventCallbackSlotType
Definition: event_store.h:15
EventCallbackType m_good_event_callbacks
Definition: event_store.h:217
void addFileInputSource(int run_number, FileEventSourceInfo &info)
int addInfoVa(int facility, const char *format,...)
Definition: DAEstatus.cpp:91
void getEventSourceIDs(std::vector< int > &ids)
Definition: event_store.cpp:34
std::list< boost::signals2::connection > addAllEventCallback(const EventCallbackSlotType &slot)
int atomic_write(const std::string &filename, const std::string &old_suffix, const std::string &new_suffix, T *data, int n)
Definition: icputils.h:380
bool endHeaderSeen() const
Definition: event_store.h:182
int * crat
Definition: isisraw.h:290
uint32_t header_words
number of words of header read, if &lt; DAE_EVENT_HEADER_WORDS then we have a partial header ...
Definition: event_store.h:146
void clearOutputFile(DAEstatus &status)
virtual isisU32_t getLastAddressRead()=0
int discardContents()
Definition: SplitFile.cpp:188
static void checkEventHeader(const DAEEventHeader *head)
Definition: dae_events.cpp:22
isisU32_t getLastAddressRead()
uint64_t numRawEvents()
Definition: event_store.h:192
void readInfo(EventStore::FileEventSourceInfo &info)
bool isOpen() const
Definition: SplitFile.cpp:233