ICP  1
TCPEventStreamer.cpp
Go to the documentation of this file.
1 #include "stdafx.h"
2 #include "CRPTProxy.h"
3 #include "event_store.h"
4 #include "NeXusEventCallback.h"
5 #include "TCPEventStreamer.h"
6 
7  TCPEventStreamConnection::TCPEventStreamConnection(const Poco::Net::StreamSocket& s, EventStore& es, const CRPTProxy& crpt) : TCPServerConnection(s), m_es(es), m_crpt(crpt), m_bytes_backlog(0), m_discarded_events(0), m_discarding_events(false)
8  {
9  setLoggerName("TCPEventStreamConnection");
10  }
11 
13  {
14  if (m_bytes_backlog > 0)
15  {
16  LOGSTR_INFORMATION("Bytes backlog unsent " << m_bytes_backlog);
17  }
18  if (m_buffers.size() > 0)
19  {
20  LOGSTR_INFORMATION("Buffers backlog unsent " << m_buffers.size());
21  }
22  BOOST_FOREACH(TCPStreamEventDataNeutron* d, m_buffers)
23  {
24  delete d;
25  }
26  m_buffers.clear();
27  }
28 
30  {
31  const Poco::Timespan timeout(5,0);
32  Poco::Net::StreamSocket& sock = this->socket();
33  LOGSTR_INFORMATION("Received request from " + sock.peerAddress().toString());
34  TCPStreamEventDataSetup setup_packet;
35  setup_packet.head_setup.start_time = m_crpt->start_time;
36  setup_packet.head_setup.run_number = m_crpt->run_number;
37  int sent = sock.sendBytes(&setup_packet, sizeof(setup_packet));
38  std::list<boost::signals2::connection> connect_list = m_es.addGoodEventCallback(boost::bind(&TCPEventStreamConnection::allEventCallback, this, _1, _2, _3, _4, _5));
39  try
40  {
41  while(true)
42  {
43  if ( (m_buffers.size() > 0) && sock.poll(timeout, Poco::Net::Socket::SELECT_WRITE) )
44  {
45  std::list<TCPStreamEventDataNeutron*>::iterator it;
46  {
47  Poco::RWLock::ScopedReadLock _lock(m_lock);
48  it = m_buffers.begin();
49  }
50  sent = sock.sendBytes(&((*it)->head), sizeof((*it)->head));
51  sent = sock.sendBytes(&((*it)->head_n), sizeof((*it)->head_n));
52  sent = sock.sendBytes(&((*it)->data[0]), (*it)->data.size() * sizeof((*it)->data[0]));
53  {
54  Poco::RWLock::ScopedWriteLock _lock(m_lock);
55 // m_bytes_backlog -= it->numBytes();
56  delete *it;
57  m_buffers.erase(it);
58  }
59  }
60  else
61  {
62  Poco::Thread::sleep(2000);
63  }
64  }
65  }
66  catch(const std::exception& ex)
67  {
68  LOGSTR_WARNING("Terminating request from " + sock.peerAddress().toString() + ": " + ex.what());
69  }
70  if (m_buffers.size() > 0)
71  {
72  LOGSTR_INFORMATION("Buffers backlog unsent " << m_buffers.size());
73  }
74  LOGSTR_INFORMATION("Events discarded during session = " << m_discarded_events);
75  m_es.clearCallbacks(connect_list);
76  Poco::RWLock::ScopedWriteLock _lock(m_lock);
77  BOOST_FOREACH(TCPStreamEventDataNeutron* d, m_buffers)
78  {
79  delete d;
80  }
81  m_buffers.clear();
82  }
83 
84 #define PPP_TO_UAMPH 1.738E-6
85 
86  void TCPEventStreamConnection::allEventCallback(const DAEEventHeader* head, const DetectorEvent32* det_ev, int n, int event_source_id, const int* mapping)
87  {
88  if (n == 0)
89  {
90  return;
91  }
92  if ( m_buffers.size() > 50 )
93  {
94 #ifdef _WIN64
95  InterlockedExchangeAdd64(&m_discarded_events, n);
96 #endif /* _WIN64 */
97  if ( !m_discarding_events )
98  {
99  m_discarding_events = true;
100  LOGSTR_WARNING("Starting to discard new events for live listener");
101  }
102  return;
103  }
104  if ( m_discarding_events )
105  {
106  m_discarding_events = false;
107  LOGSTR_NOTICE("Resumed sending events for live listener, total discarded so far = " << m_discarded_events);
108  }
109  float time_offset;
110  DAEEventList::DAETimeToOffset(head->time, m_crpt->start_time, time_offset);
113  header_n.frame_number = head->frame_number;
114  header_n.protons = head->protons * PPP_TO_UAMPH;
115  header_n.frame_time_zero = time_offset;
116  header_n.nevents = n;
118  t->data.resize(n);
119  int card_index = m_crpt->cardIndexFromPos(event_source_id);
120  int spec, tr;
121  for(int i=0; i<n; ++i)
122  {
123  spec = m_crpt->dae1SpecForCardIndex(card_index, det_ev[i].spectrum);
124  t->data[i].spectrum = spec;
125  tr = m_crpt->spectrumDAETR(spec);
126  t->data[i].time_of_flight = NeXusEventCallback::getTimeOffset(m_crpt.CRPT(), det_ev, event_source_id, tr, i);
127  }
128  t->head = header;
129  t->head_n = header_n;
130  if ( t->isValid() )
131  {
132  Poco::RWLock::ScopedWriteLock _lock(m_lock);
133  m_buffers.push_back(t);
134  }
135  else
136  {
137  LOGSTR_WARNING("Invalid event packet");
138  }
139  }
140 
141 
143 
144 
145  TCPEventStreamer::TCPEventStreamer(EventStore& es, const CRPTProxy& crpt) : m_es(es), m_crpt(crpt)
146  {
147  setLoggerName("TCPEventStreamer");
148  LOGSTR_INFORMATION("Creating TCPEventStreamer");
149  unsigned short port = 10000;
150 
151  // set-up a server socket
152  m_svs = new Poco::Net::ServerSocket(port);
153  // set-up a TCPServer instance
154  m_srv = new Poco::Net::TCPServer(new TCPEventStreamConnectionFactory(m_es, m_crpt), *m_svs);
155  // start the TCPServer
156  m_srv->start();
157  }
158 
160  {
161  LOGSTR_INFORMATION("Terminating TCPEventStreamer");
162  m_srv->stop();
163  delete m_srv;
164  delete m_svs;
165  }
void allEventCallback(const DAEEventHeader *head, const DetectorEvent32 *det_ev, int n, int event_source_id, const int *mapping)
fixed header marker for DAEEventHeader
Definition: dae_events.h:44
time_t start_time
Definition: isiscrpt.h:254
float frame_time_zero
time offset from run_start of this frame, in seconds
Poco::Net::TCPServer * m_srv
#define LOGSTR_NOTICE(__arg)
Definition: IsisBase.h:85
std::list< TCPStreamEventDataNeutron * > m_buffers
#define LOGSTR_WARNING(__arg)
Definition: IsisBase.h:92
int spectrumDAETR(int spec) const
Definition: isiscrpt.h:799
unsigned protons
Definition: dae_events.h:63
const CRPTProxy & m_crpt
int cardIndexFromPos(int pos) const
Definition: isiscrpt.h:727
void clearCallbacks(const std::list< boost::signals2::connection > &callback_list)
Definition: event_store.h:91
uint32_t nevents
number of TCPStreamEvent() structures in this packet
this structure is part of a sequence of neutron events, which are all from the same ISIS frame ...
std::list< boost::signals2::connection > addGoodEventCallback(const EventCallbackSlotType &slot)
#define LOGSTR_INFORMATION(__arg)
Definition: IsisBase.h:78
struct DAEEventHeader::DAETime time
TCPStreamEventHeaderNeutron head_n
details of ISIS frame data was collected in and the number of neutron events in this packet ...
static void DAETimeToOffset(const DAEEventHeader::DAETime &daetime, time_t base, T &offset)
Definition: dae_events.cpp:226
A factory for TimeServerConnection.
static float getTimeOffset(const ISISCRPT_STRUCT *crpt, const DetectorEvent32 *det_ev, int event_source_id, int tr, int i)
std::vector< TCPStreamEventNeutron > data
list of neutron events
time_t start_time
run start time from ISISCRPT_STRUCT
int dae1SpecForCardIndex(int index, int dae2_spec) const
Definition: isiscrpt.h:701
uint32_t frame_number
Definition: dae_events.h:49
ISISCRPT_STRUCT * CRPT()
Definition: CRPTProxy.h:348
EventStore & m_es
void setLoggerName(const std::string &logger_name)
Definition: IsisBase.h:17
TCPEventStreamConnection(const Poco::Net::StreamSocket &s, EventStore &es, const CRPTProxy &crpt)
TCPStreamEventHeaderSetup head_setup
const CRPTProxy & m_crpt
static uint32_t MAX_BYTES_BACKLOG
layout of initial data packet send on initial connection and on a state change e.g. run number changes
Poco::Net::ServerSocket * m_svs
TCPEventStreamer(EventStore &es, const CRPTProxy &crpt)
#define PPP_TO_UAMPH
float protons
proton charge (uAh) for this frame
TCPStreamEventHeader head
uint32_t frame_number
ISIS frame number, 0 being first frame of run.