ICP  1
epicsdb.cpp
Go to the documentation of this file.
1 #include "stdafx.h"
2 #include "Poco/SingletonHolder.h"
3 #include "Poco/Data/Common.h"
4 #include "Poco/Data/SessionPool.h"
5 #include "Poco/Data/MySQl/Connector.h"
6 #include "Poco/Data/RecordSet.h"
7 
8 #include "selogger.h"
9 #include "epicsdb.h"
10 #include "icputils.h"
11 
12 class EpicsSessionPool : public Poco::Data::SessionPool
13 {
14 public:
15  EpicsSessionPool() : Poco::Data::SessionPool("MySQL", "host=localhost;port=3306;db=archive;user=report;password=$report;compress=true;auto-reconnect=true", 3, 32, 0)
16  {
17  Poco::Data::MySQL::Connector::registerConnector();
18  }
19 
20  void customizeSession(Poco::Data::Session& session)
21  {
22 // session.setProperty("maxRetryAttempts", 1000000);
23 // if (strlen(isisdb_per_session_sql) > 0)
24 // {
25 // Poco::Data::Statement stmt(session);
26 // stmt << isisdb_per_session_sql;
27 // stmt.execute();
28 // poco_assert(stmt.done());
29 // }
30  }
31 };
32 
33 static Poco::SingletonHolder<EpicsSessionPool> epics_session_pool;
34 
35 //select eng_id from smpl_engine where name='inst_engine' 'block_engine'
36 //select grp_id from chan_grp where eng_id= and name='BLOCKS' 'INST'
37 //select channel_id,name,descr from channel where grp_id=
38 
39 //select grp_id from chan_grp inner join smpl_eng on chan_grp.eng_id=smpl_eng.eng_id and smpl_eng.name='block_engine' and chan_grp.name='BLOCKS';
40 
41 
42 unsigned __stdcall run_epics_db(void* arg)
43 {
44  using namespace Poco::Data;
45  std::vector<std::string> channel_name, smpl_time;
46  std::vector<float> float_val;
47  std::vector<unsigned> nanosecs;
48  unsigned nrows;
49  epics_thread_data_t* icp_data = (epics_thread_data_t*)arg;
50  bool in_retry = false;
51 // strcpy(icp_data->lastread_iso, "2014-06-09T10:00:00");
52  while(true)
53  {
54  try
55  {
56  Session epics_session(epics_session_pool.get()->get());
57  in_retry = false;
58  while( epics_session.isConnected() )
59  {
60  smpl_time.clear();
61  channel_name.clear();
62  float_val.clear();
63  nanosecs.clear();
64  try
65  {
66  Statement epics_stmt1(epics_session), epics_stmt2(epics_session), epics_stmt3(epics_session);
67  int grp_id; // group id for where SECI like block PVs will go - we need to check this each time as it changhes when new blocks are loaded
68  epics_stmt1 << "SELECT grp_id FROM chan_grp INNER JOIN smpl_eng ON chan_grp.eng_id=smpl_eng.eng_id AND smpl_eng.name='block_engine' AND chan_grp.name='BLOCKS'", into(grp_id);
69  epics_stmt1.execute();
70  // poco 1.4 doesn't support mysql TIMESTAMP field, hence DATE_FORMAT operator to convert to string
71  // when EPICS archiver is restarted, we can get a stream of NULL values followed bythe real values, hence "is not null"
72  epics_stmt2 << "SELECT channel.name, DATE_FORMAT(smpl_time, '%Y-%m-%dT%H:%i:%s'), nanosecs, float_val FROM channel INNER JOIN sample ON (channel.channel_id=sample.channel_id) " <<
73  " AND (float_val IS NOT NULL) AND (grp_id=" << grp_id << ") AND (smpl_time = '" << icp_data->lastread_iso << "') AND (nanosecs > " << icp_data->lastread_nano << ") ORDER BY smpl_time, nanosecs", into(channel_name), into(smpl_time), into(nanosecs), into(float_val), limit(500);
74  while( !epics_stmt2.done() )
75  {
76  nrows = epics_stmt2.execute();
77  Poco::Thread::sleep(1000);
78  }
79  epics_stmt3 << "SELECT channel.name, DATE_FORMAT(smpl_time, '%Y-%m-%dT%H:%i:%s'), nanosecs, float_val FROM channel INNER JOIN sample ON (channel.channel_id=sample.channel_id) " <<
80  " AND (float_val IS NOT NULL) AND (grp_id=" << grp_id << ") AND (smpl_time > '" << icp_data->lastread_iso << "') ORDER BY smpl_time, nanosecs", into(channel_name), into(smpl_time), into(nanosecs), into(float_val), limit(500);
81  while( !epics_stmt3.done() )
82  {
83  nrows = epics_stmt3.execute();
84  Poco::Thread::sleep(1000);
85  }
86  if (smpl_time.size() > 0)
87  {
88  strcpy(icp_data->lastread_iso, smpl_time.back().c_str());
89  icp_data->lastread_nano = nanosecs.back();
90  // erase PV prefix - it should be e.g. IN:LARMOR:CS:SB prefix
91  for(int i=0; i<channel_name.size(); ++i)
92  {
93  int n = channel_name[i].find_last_of(':');
94  if (n != std::string::npos)
95  {
96  channel_name[i].erase(0, n+1);
97  }
98  }
99  se_log_fvalues(-1, "EPICS", smpl_time, channel_name, float_val);
100  std::string log_file = std::string("c:\\data\\") + icp_data->inst_name + padWithZeros(*(icp_data->run_number), 8) + ".log";
101  std::fstream f;
102  f.open(log_file, std::ios::app);
103  if (f.good())
104  {
105  for(int i=0; i<smpl_time.size(); ++i)
106  {
107  f << smpl_time[i] << "\t" << channel_name[i] << "\t" << float_val[i] << std::endl;
108  }
109  f.close();
110  }
111  }
112  }
113  catch(const std::exception& ex)
114  {
115  std::cerr << "error executing statement " << ex.what() << std::endl;
116  }
117  Poco::Thread::sleep(5000);
118  }
119  std::cerr << "EPICS MySQL session has disconnected " << std::endl;
120  }
121  catch(const std::exception& ex)
122  {
123  if (!in_retry)
124  {
125  in_retry = true;
126  std::cerr << "Error creating EPICS MySQL session: " << ex.what() << ", will attempt to retry every 30 seconds" << std::endl;
127  }
128  }
129  Poco::Thread::sleep(30000);
130  }
131 }
132 
133 #if 0
134 select eng_id from smpl_engine where name='inst_engine'
135 select grp_id from chan_grp where eng_id= and name=
136 
137 select channel_id,name,descr from channel where grp_id=
138 smpl_mode
139 
140 select smpl_time,num_val,float_val_str_val,str_val ,arrav_val status_id= severity_id= datatype where channel_id=
141 
142 select unit from num_metadata where channel_id=
143 
144 slect enum_val from enum_metadata where channel_id= and enum_nbr=
145 
146 # Dump all values for all channels
147 SELECT channel.name, smpl_time, severity.name, status.name, float_val
148  FROM channel, severity, status, sample
149  WHERE channel.channel_id = sample.channel_id AND
150  severity.severity_id = sample.severity_id AND
151  status.status_id = sample.status_id
152  ORDER BY channel.name, smpl_time
153  LIMIT 50;
154 
155 # Same with INNER JOIN
156 SELECT channel.name AS channel,
157  smpl_time,
158  severity.name AS severity,
159  status.name AS status,
160  float_val
161  FROM sample INNER JOIN channel INNER JOIN severity INNER JOIN status
162  ON channel.channel_id = sample.channel_id AND
163  severity.severity_id = sample.severity_id AND
164  status.status_id = sample.status_id
165  ORDER BY smpl_time
166  LIMIT 50;
167 #endif
std::string padWithZeros(int number, int len)
Definition: icputils.h:265
SELOGGER_API int __stdcall se_log_fvalues(long run_number, const char *source, const std::vector< std::string > &iso_times, const std::vector< std::string > &block_names, const std::vector< float > &block_values)
Definition: selogger.cpp:578
static Poco::SingletonHolder< EpicsSessionPool > epics_session_pool
Definition: epicsdb.cpp:33
void customizeSession(Poco::Data::Session &session)
Definition: epicsdb.cpp:20
unsigned __stdcall run_epics_db(void *arg)
Definition: epicsdb.cpp:42