ICP  1
NeXusEventWriter.cpp
Go to the documentation of this file.
1 #include "stdafx.h"
2 #include "isiscrpt.h"
3 #include "isisraw.h"
4 #include "icputils.h"
5 #include "selogger.h"
6 #include "NeXusEventWriter.h"
7 
8 static void nxErrorFunc(void* arg, char* text)
9 {
10  DAEstatus* pStatus = (DAEstatus*)arg;
11  pStatus->addVa(FAC_CRPT, SEV_ERROR, ERRTYPE_OUTOFMEM, "NEXUS ERROR: %s", text);
12 }
13 
14 
15 NeXusEventWriter::NeXusEventWriter() : m_nxcallback(NULL), m_file(NULL)
16 {
17  setLoggerName("NeXusEventWriter");
18 }
19 
20 void NeXusEventWriter::start(const ISISCRPT_STRUCT* crpt, IXNeXusFile* file, bool create_empty)
21 {
22  stop(true);
23  m_file = file;
24  start(crpt, create_empty);
25 }
26 
27 void NeXusEventWriter::start(int run_number, bool clear_counters)
28 {
29  throw std::runtime_error("not implemented");
30 }
31 
32 void NeXusEventWriter::start(const ISISCRPT_STRUCT* crpt, bool create_empty)
33 {
34  LOGSTR_INFORMATION("getting ready to process events for nexus file");
36  m_es.clear();
37  m_es.addFileInputSource(crpt->run_number, info);
38  std::vector<int> ev_source_ids;
39  m_es.getEnabledEventSourceIDs(ev_source_ids);
40  m_nxcallback = new NeXusEventCallback(crpt, m_file, ev_source_ids);
41  if (create_empty)
42  {
44  }
45  m_file->makeNewGroup("framelog", "NXcollection", false);
49  m_es.start();
50  LOGSTR_INFORMATION("processing events for nexus file");
51 }
52 
54 {
55  while (!m_es.endHeaderSeen())
56  {
57  Poco::Thread::sleep(500);
58  }
59  m_es.stop(false);
61  delete m_nxcallback;
62  m_nxcallback = NULL;
63  LOGSTR_INFORMATION("finished events for nexus file");
64 }
65 
66 void NeXusEventWriter::stop(bool close_file)
67 {
68  m_es.stop(true);
69  m_es.clear();
70  if (m_nxcallback != NULL)
71  {
72  delete m_nxcallback;
73  m_nxcallback = NULL;
74  }
75  if ( close_file && (m_file != NULL) )
76  {
77  m_file->close();
78  delete m_file;
79  m_file = NULL;
80  }
81 }
82 
83 ICPEventWriter::ICPEventWriter() : m_dae(NULL), m_crpt(NULL), m_nxcallback(NULL), m_file(NULL), m_status(NULL), m_run_number_digits(0)
84 {
85  setLoggerName("ICPEventWriter");
86 }
87 
89 {
90  stop();
91 }
92 
93 // status needs to be persistent as reference kept in nexus file
94 void ICPEventWriter::setup(const ISISCRPT_STRUCT* crpt, ISISDAE* dae, int run_number_digits, DAEstatus& status)
95 {
96  LOGSTR_INFORMATION("Setup nexus event writer");
97  m_crpt = crpt;
98  m_run_number_digits = run_number_digits;
99  m_dae = dae;
100  m_status = &status;
101  remove(filename().c_str()); // if we got restarted during a run, this will be corrupt and useless
102 }
103 
105 {
106  if (m_crpt != NULL)
107  {
108  return Poco::format("c:\\data\\events%s.tmp", padWithZeros(m_crpt->run_number, m_run_number_digits));
109  }
110  else
111  {
112  return "";
113  }
114 }
115 
116 
118 {
119  if ( !Poco::Util::Application::instance().config().getBool("isisicp.incrementaleventnexus", false) )
120  {
121  return;
122  }
123  LOGSTR_INFORMATION("Starting nexus event writer for " << filename());
124  // start() may get called twice without a stop() if e.g. BEGIN; ABORT; BEGIN
125  stop(true); // deletes file
126 
129 
130  NXsetcache(64*1024*1024);
131 // m_file->enableOptions(IXNeXusFile::WriteDummyOnNULL);
132  m_file->enableCompression(100*NX_COMP_LZW + m_crpt->compression_level);
133 // dstatus.addInfoVa(FAC_DAE, "compression level = %d", crpt->compression_level);
135 // dstatus.addInfoVa(FAC_DAE, "compression block size = %d", crpt->compression_block_size);
136 
137  std::vector<int> ev_source_ids;
138  m_dae->getEnabledEventSourceIDs(ev_source_ids);
139  m_nxcallback = new NeXusEventCallback(m_crpt, m_file, ev_source_ids);
141  m_file->makeNewGroup("framelog", "NXcollection", false);
142  std::list<boost::signals2::connection> cb;
143  m_callbacks.clear();
144  cb = m_dae->addAllEventCallback(boost::bind(&NeXusEventCallback::allEventCallback, m_nxcallback, _1, _2, _3, _4, _5));
145  m_callbacks.insert(m_callbacks.begin(), cb.begin(), cb.end());
147  m_callbacks.insert(m_callbacks.begin(), cb.begin(), cb.end());
149  m_callbacks.insert(m_callbacks.begin(), cb.begin(), cb.end());
150 }
151 
152 void ICPEventWriter::stop(bool delete_file)
153 {
154  if ( !Poco::Util::Application::instance().config().getBool("isisicp.incrementaleventnexus", false) )
155  {
156  return;
157  }
158  LOGSTR_INFORMATION("Stopping nexus event writer for " << filename());
159  if (m_dae != NULL)
160  {
162  }
163  m_callbacks.clear();
164  if (m_nxcallback != NULL)
165  {
166  if (!delete_file)
167  {
168  m_nxcallback->finish();
169  }
170  delete m_nxcallback;
171  m_nxcallback = NULL;
172  }
173  if (m_file != NULL)
174  {
175  m_file->close();
176  delete m_file;
177  m_file = NULL;
178  }
179  if (delete_file)
180  {
181  remove(filename().c_str());
182  }
183 }
184 
185 extern "C" CRITICAL_SECTION nx_critical;
186 
187 void ICPEventWriter::snapshotOld(const std::string& name)
188 {
189  static std::string app_dir = Poco::Util::Application::instance().config().getString("application.dir");
190  static std::string shadow_user = Poco::Util::Application::instance().config().getString("isisicp.shadow.user");
191  static std::string shadow_pass = Poco::Util::Application::instance().config().getString("isisicp.shadow.pass");
192  static std::string shadow_snapvol = Poco::Util::Application::instance().config().getString("isisicp.shadow.snapvolume");
193  static std::string shadow_copydir = Poco::Util::Application::instance().config().getString("isisicp.shadow.copydir");
194  static std::string shadow_prog_name = Poco::Util::Application::instance().config().getString("isisicp.shadow.progname");
195  static std::string shadow_dir = Poco::Util::Application::instance().config().getString("isisicp.shadow.dir");
196  static std::string comspec = getenv("ComSpec");
197  static std::string tmpdir = getenv("TEMP");
198  static std::string script_name = Poco::Path(tmpdir, "isisicp_shadow.cmd").toString(Poco::Path::PATH_WINDOWS);
199  if ( m_file == NULL || !Poco::File(filename()).exists() )
200  {
201  LOGSTR_WARNING("Nothing to shadow copy");
202  return;
203  }
204  if ( !Poco::File(shadow_dir).exists() )
205  {
206  LOGSTR_ERROR("Shadow Directory \"" << shadow_dir << "\" does not exist");
207  return;
208  }
209  // program resides in directory above isisicp e.g. in service/x64 not service/x64/{Debug,Release}
210  Poco::Path shadow_prog_path(app_dir, shadow_prog_name);
211  shadow_prog_path.pushDirectory("..");
212  std::string shadow_prog = shadow_prog_path.toString(Poco::Path::PATH_WINDOWS);
213  DWORD len_comp_name = MAX_COMPUTERNAME_LENGTH + 1;
214  wchar_t comp_name[MAX_COMPUTERNAME_LENGTH + 1];
215  GetComputerNameW(comp_name, &len_comp_name);
216  comp_name[len_comp_name] = '\0';
217  PROCESS_INFORMATION pi = {0};
218  STARTUPINFO si = {0};
219  STARTUPINFOW siw = {0};
220  ZeroMemory( &si, sizeof(si) );
221  ZeroMemory( &siw, sizeof(siw) );
222  ZeroMemory( &pi, sizeof(pi) );
223  si.cb = sizeof(si);
224  //si.dwFlags = STARTF_USESHOWWINDOW;
225  //si.wShowWindow = SW_MINIMIZE;
226  siw.cb = sizeof(siw);
227  //siw.dwFlags = STARTF_USESHOWWINDOW;
228  //siw.wShowWindow = SW_MINIMIZE;
229  BOOL res;
230 // std::string command_line_s = Poco::format("\"%s\" -nw -p -script=\"%s\" \"%s\"", shadow_prog, script_name, shadow_snapvol);
231  std::string command_line_s = Poco::format("\"%s\" /k \"\"%s\" -tracing -nw -p -script=\"%s\" \"%s\"\"", comspec,
232  shadow_prog, script_name, shadow_snapvol);
233  char* command_line = strdup(command_line_s.c_str());
234  EnterCriticalSection(&nx_critical);
235  m_file->flush();
236  if (shadow_user.size() == 0)
237  {
238  res = CreateProcess(NULL, command_line, NULL, NULL, FALSE, CREATE_NEW_CONSOLE, NULL, NULL, &si, &pi);
239  }
240  else
241  {
242  res = CreateProcessWithLogonW(CT2OLE(shadow_user.c_str()), comp_name, CT2OLE(shadow_pass.c_str()),
243  0, NULL, CT2OLE(command_line), 0, NULL, NULL, &siw, &pi);
244  }
245  free(command_line);
246  if (!res)
247  {
248  LeaveCriticalSection(&nx_critical); // once shadow created, we can continue writing
249  LOGSTR_ERROR("Shadow error");
250  return;
251  }
252  WaitForSingleObject( pi.hProcess, INFINITE );
253  CloseHandle(pi.hProcess);
254  CloseHandle(pi.hThread);
255  LeaveCriticalSection(&nx_critical); // once shadow created, we can continue writing
256  ZeroMemory( &si, sizeof(si) );
257  ZeroMemory( &siw, sizeof(siw) );
258  ZeroMemory( &pi, sizeof(pi) );
259  si.cb = sizeof(si);
260  //si.dwFlags = STARTF_USESHOWWINDOW;
261  //si.wShowWindow = SW_MINIMIZE;
262  siw.cb = sizeof(siw);
263  //siw.dwFlags = STARTF_USESHOWWINDOW;
264  //siw.wShowWindow = SW_MINIMIZE;
265  std::fstream f;
266  f.open(script_name, std::ios::app);
267  f << "\"" << shadow_prog <<"\" -tracing -el=%SHADOW_ID_1%,\"" << shadow_dir << "\"" << std::endl;
268  Poco::Path fn(filename());
269  Poco::Path ff(shadow_dir, fn.getFileName());
270  ff.pushDirectory(shadow_copydir);
271  f << "COPY /B /Y \"" << ff.toString() << "\" \"" << name << "\"" << std::endl;
272  f << "\"" << shadow_prog <<"\" -tracing -ds=%SHADOW_ID_1%" << std::endl;
273  f.close();
274  command_line_s = Poco::format("\"%s\" /k \"%s\"", comspec, script_name);
275  command_line = strdup(command_line_s.c_str());
276  if (shadow_user.size() == 0)
277  {
278  res = CreateProcess(NULL, command_line, NULL, NULL, FALSE, CREATE_NEW_CONSOLE, NULL, NULL, &si, &pi);
279  }
280  else
281  {
282  res = CreateProcessWithLogonW(CT2OLE(shadow_user.c_str()), comp_name, CT2OLE(shadow_pass.c_str()),
283  0, NULL, CT2OLE(command_line), 0, NULL, NULL, &siw, &pi);
284  }
285  free(command_line);
286  if (!res)
287  {
288  LOGSTR_ERROR("Shadow error");
289  remove(script_name.c_str());
290  return;
291  }
292  WaitForSingleObject( pi.hProcess, INFINITE );
293  CloseHandle(pi.hProcess);
294  CloseHandle(pi.hThread);
295  remove(script_name.c_str());
296 }
297 
298 void ICPEventWriter::snapshot(const std::string& name)
299 {
300  static std::string app_dir = Poco::Util::Application::instance().config().getString("application.dir");
301  static std::string shadow_user = Poco::Util::Application::instance().config().getString("isisicp.shadow.user");
302  static std::string shadow_pass = Poco::Util::Application::instance().config().getString("isisicp.shadow.pass");
303  static std::string shadow_prog_name = Poco::Util::Application::instance().config().getString("isisicp.shadow.progname");
304  static std::string shadow_dir = Poco::Util::Application::instance().config().getString("isisicp.shadow.dir");
305  static std::string comspec = getenv("ComSpec");
306  static std::string tmpdir = getenv("TEMP");
307  static std::string script_name = Poco::Path(tmpdir, "isisicp_shadow.cmd").toString(Poco::Path::PATH_WINDOWS);
308  if ( m_file == NULL || !Poco::File(filename()).exists() )
309  {
310  LOGSTR_WARNING("Nothing to shadow copy");
311  return;
312  }
313  if ( !Poco::File(shadow_dir).exists() )
314  {
315  LOGSTR_ERROR("Shadow Directory \"" << shadow_dir << "\" does not exist");
316  return;
317  }
318  // program resides in directory above isisicp e.g. in service/x64 not service/x64/{Debug,Release}
319  Poco::Path shadow_prog_path(app_dir, shadow_prog_name);
320  shadow_prog_path.pushDirectory("..");
321  std::string shadow_prog = shadow_prog_path.toString(Poco::Path::PATH_WINDOWS);
322  DWORD len_comp_name = MAX_COMPUTERNAME_LENGTH + 1;
323  wchar_t comp_name[MAX_COMPUTERNAME_LENGTH + 1];
324  GetComputerNameW(comp_name, &len_comp_name);
325  comp_name[len_comp_name] = '\0';
326  PROCESS_INFORMATION pi = {0};
327  STARTUPINFO si = {0};
328  STARTUPINFOW siw = {0};
329  ZeroMemory( &si, sizeof(si) );
330  ZeroMemory( &siw, sizeof(siw) );
331  ZeroMemory( &pi, sizeof(pi) );
332  si.cb = sizeof(si);
333  //si.dwFlags = STARTF_USESHOWWINDOW;
334  //si.wShowWindow = SW_MINIMIZE;
335  siw.cb = sizeof(siw);
336  //siw.dwFlags = STARTF_USESHOWWINDOW;
337  //siw.wShowWindow = SW_MINIMIZE;
338 
339  BOOL res;
340  Poco::Path src_dir(filename());
341  std::string filename_base(src_dir.getFileName());
342  src_dir.setFileName("");
343  Poco::Path dest_filename(shadow_dir, filename_base);
344 
345  std::fstream f;
346  f.open(script_name, std::ios::out | std::ios::trunc);
347  f << "REM \"" << shadow_prog << "\" /full \"" << src_dir.toString() << "\" \"" << shadow_dir << "\" \"" << filename_base << "\"" << std::endl;
348 // f << "COPY /B /Y \"" << ff.toString() << "\" \"" << name << "\"" << std::endl;
349  f << "REM REN \"" << dest_filename.toString() << "\" \"" << name << "\"" << std::endl;
350  f.close();
351  std::string command_line_s = Poco::format("\"%s\" /k \"%s\"", comspec, script_name);
352  char* command_line = strdup(command_line_s.c_str());
353  EnterCriticalSection(&nx_critical);
354  m_file->flush();
355  if (shadow_user.size() == 0)
356  {
357  res = CreateProcess(NULL, command_line, NULL, NULL, FALSE, CREATE_NEW_CONSOLE, NULL, NULL, &si, &pi);
358  }
359  else
360  {
361  res = CreateProcessWithLogonW(CT2OLE(shadow_user.c_str()), comp_name, CT2OLE(shadow_pass.c_str()),
362  0, NULL, CT2OLE(command_line), 0, NULL, NULL, &siw, &pi);
363  }
364  free(command_line);
365  if (!res)
366  {
367  LeaveCriticalSection(&nx_critical); // once shadow created, we can continue writing
368  LOGSTR_ERROR("Shadow error");
369  remove(script_name.c_str());
370  return;
371  }
372  WaitForSingleObject( pi.hProcess, INFINITE );
373  CloseHandle(pi.hProcess);
374  CloseHandle(pi.hThread);
375  LeaveCriticalSection(&nx_critical); // once shadow created, we can continue writing
376  remove(script_name.c_str());
377 }
378 
379 
virtual void wait()
std::string filename()
void allEventCallback(const DAEEventHeader *head, const DetectorEvent32 *det_ev, int n, int event_source_id, const int *mapping)
void noFrameCallbackNull(bool end_present, NoFrameCallbackMode mode, int event_source_id)
#define ERRTYPE_OUTOFMEM
void start(const ISISCRPT_STRUCT *crpt, IXNeXusFile *file, bool create_empty)
static void nxErrorFunc(void *arg, char *text)
DAEstatus * m_status
int addVa(int facility, int severity, int errtype, const char *format,...)
Definition: DAEstatus.cpp:54
virtual void stop(bool close_file)
void stop(bool delete_file=false)
int open(const char *filename, int mode, int type=IXNeXusFile::HDF5)
int makeNewGroup(const char *name, const char *nxclass, bool open_group=true)
int flush()
Definition: nexuswriter.h:29
void getEnabledEventSourceIDs(std::vector< int > &ids)
Definition: isisdae.h:293
IXNeXusFile * m_file
void snapshotOld(const std::string &name)
void clear()
void getEnabledEventSourceIDs(std::vector< int > &ids)
Definition: event_store.cpp:43
void allFrameCallback(const DAEEventHeader *head, int event_source_id)
this will get called for each frame for EACH event source
int compression_block_size
Definition: isiscrpt.h:481
static ISISDAE * dae
Definition: daeset.cpp:7
#define LOGSTR_WARNING(__arg)
Definition: IsisBase.h:92
std::string padWithZeros(int number, int len)
Definition: icputils.h:265
std::list< boost::signals2::connection > addAllEventCallback(const EventCallbackSlotType &slot)
Definition: isisdae.h:269
#define FAC_CRPT
void stop(bool immediate)
#define SEV_ERROR
IXNeXusFile * m_file
std::list< boost::signals2::connection > addAllFrameCallback(const FrameCallbackSlotType &slot)
void setSpectraChunkSize(int nsp)
Definition: nexuswriter.h:164
void clearCallbacks(const std::list< boost::signals2::connection > &callback_list)
Definition: isisdae.h:285
void snapshot(const std::string &name)
#define LOGSTR_INFORMATION(__arg)
Definition: IsisBase.h:78
bool endHeaderSeen()
#define LOGSTR_ERROR(__arg)
Definition: IsisBase.h:99
std::list< boost::signals2::connection > addAllFrameCallback(const FrameCallbackSlotType &slot)
Definition: isisdae.h:277
CRITICAL_SECTION nx_critical
void setup(const ISISCRPT_STRUCT *crpt, ISISDAE *dae, int run_number_digits, DAEstatus &status)
NeXusEventCallback * m_nxcallback
std::list< boost::signals2::connection > addNoFrameCallback(const NoFrameCallbackSlotType &slot)
void setLoggerName(const std::string &logger_name)
Definition: IsisBase.h:17
const ISISCRPT_STRUCT * m_crpt
void enableCompression(int comp_level=NX_COMP_LZW)
Definition: nexuswriter.h:165
NeXusEventCallback * m_nxcallback
void start()
std::list< boost::signals2::connection > addNoFrameCallback(const NoFrameCallbackSlotType &slot)
Definition: isisdae.h:281
void addFileInputSource(int run_number, FileEventSourceInfo &info)
std::list< boost::signals2::connection > addAllEventCallback(const EventCallbackSlotType &slot)
void noFrameCallbackEnd(bool end_present, NoFrameCallbackMode mode, int event_source_id)
std::list< boost::signals2::connection > m_callbacks
int compression_level
Definition: isiscrpt.h:480