12 static std::string
win32_message(
const std::string& message, DWORD code)
14 std::string s(message);
17 FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER
18 | FORMAT_MESSAGE_FROM_SYSTEM
19 | FORMAT_MESSAGE_IGNORE_INSERTS,
22 MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
27 LocalFree((LPVOID)errMsg);
35 #include "Poco/String.h"
36 #include "Poco/Environment.h"
37 #include "Poco/Format.h"
56 int timeout = 5 * 1000;
57 LOGSTR_INFORMATION(
"RDMARead channel \"" <<
m_chan_name <<
"\" address " << std::hex <<
"0x" << remote_address << std::dec <<
" count " << len);
58 qxtrmStatus status =
m_qx->RDMARead(
m_chan_id, static_cast<char*>(buffer), len, remote_address, qxtrmTransferSync, timeout);
59 if (qxtrmStatusOK != status)
67 int timeout = 5 * 1000;
68 LOGSTR_INFORMATION(
"RDMAWrite channel \"" <<
m_chan_name <<
"\" address " << std::hex <<
"0x" << remote_address << std::dec <<
" count " << len);
69 qxtrmStatus status =
m_qx->RDMAWrite(
m_chan_id, static_cast<char*>(const_cast<void*>(buffer)), len, remote_address, qxtrmTransferSync, timeout);
70 if (qxtrmStatusOK != status)
79 qxtrmChannelStatus chan_stat;
80 qxtrmStatus status = (
const_cast<Poco::SharedPtr<Quixtream>&
>(
m_qx))->GetTransferStats(
m_chan_id, qxtrmTransStatsCmdLastStatus, &chan_stat);
81 if (qxtrmStatusOK == status)
95 qxtrmStatus status = (
const_cast<Poco::SharedPtr<Quixtream>&
>(
m_qx))->GetTransferStats(chan, cmd, &n);
96 if (qxtrmStatusOK == status)
98 os << mess <<
": " << n <<
"\n";
108 os <<
"--- Transfer stats for channel \"" <<
m_chan_name <<
"\" (id=" <<
m_chan_id <<
") ---\n";
109 channelStatsHelper(m_chan_id, qxtrmTransStatsCmdLastBytesTrans,
"Bytes transferred for last completed transfer", os);
110 channelStatsHelper(m_chan_id, qxtrmTransStatsCmdTotalBytesTrans,
"Bytes transferred for all cumulative transfers", os);
111 channelStatsHelper(m_chan_id, qxtrmTransStatsCmdLastDataErrors,
"Data frame errors for last completed transfer", os);
112 channelStatsHelper(m_chan_id, qxtrmTransStatsCmdTotalDataErrors,
"Data frame errors for all cumulative transfers", os);
113 channelStatsHelper(m_chan_id, qxtrmTransStatsCmdLastAckErrors,
"Acknowledgement frame errors for last completed transfer", os);
114 channelStatsHelper(m_chan_id, qxtrmTransStatsCmdTotalAckErrors,
"Acknowledgement frame errors for all cumulative transfers", os);
128 int timeout = 5 * 1000;
129 qxtrmStatus status =
m_qx->BlockSend(
m_chan_id, static_cast<char*>(const_cast<void*>(buffer)), len, qxtrmTransferSync, timeout);
130 if (status != qxtrmStatusOK)
139 int timeout = 5 * 1000;
140 qxtrmStatus status =
m_qx->BlockRecv(
m_chan_id, static_cast<char*>(buffer), len, qxtrmTransferSync, timeout);
141 if (status != qxtrmStatusOK)
150 int timeout = 5 * 1000;
151 char* pDataBuffer = NULL;
153 qxtrmStatus status =
m_qx->StreamSend(
m_chan_id, &pDataBuffer, timeout);
154 if (status != qxtrmStatusOK)
158 memcpy(pDataBuffer, buffer, len);
160 if (status != qxtrmStatusOK)
169 int timeout = QXTRM_WAIT_FOREVER;
171 unsigned int newDataAvail = 0;
172 unsigned int oldDataAvail = 0;
173 bool endOfSeg =
false;
178 status =
m_qx->StreamRecv(
m_chan_id, &pData, &newDataAvail, &endOfSeg, qxtrmTransferAsync, timeout);
179 if (status != qxtrmStatusOK && status != qxtrmStatusNoStreamDataWaiting)
183 if (newDataAvail > oldDataAvail)
186 oldDataAvail = newDataAvail;
190 std::cerr <<
"Qxtrm_channel::streamReceive(StreamRecv) waiting for more data, sofar " << oldDataAvail << std::endl;
195 if (status != qxtrmStatusOK)
200 if (status != qxtrmStatusOK)
212 int timeout = 5 * 1000;
215 while( (status =
m_qx->StreamRecv(
m_chan_id, pCallback, interval, qxtrmTransferSync, timeout)) == qxtrmStatusTransferTimeout )
217 std::cerr <<
"Qxtrm_channel::streamReceive(StreamRecv) timeout, retrying" << std::endl;
219 if (status != qxtrmStatusOK)
223 std::cerr <<
"releasing stream" << std::endl;
225 if (status != qxtrmStatusOK)
234 static int oldData = 0;
243 pNewData = pDataBuffer + oldData;
244 newData = callbackSize - oldData;
247 newData = word_size * (newData / word_size);
249 std::cerr <<
"outputStreamDataCallback: callbackSize=" << callbackSize <<
" oldData=" << oldData <<
" newData=" << newData <<
" segmentEnd=" << segmentEnd << std::endl;
251 for(
int i = 0; i < newData; ++i)
253 word.c[i%word_size] = pNewData[i];
255 if ( (i+1) % word_size == 0 )
257 fprintf(outstream,
"0x%0*x%s", 2*word_size, word.u, ((oldData % (word_size *
m_outstream_columns) == 0) ?
"\n" :
" "));
262 std::cerr <<
"outputStreamDataCallback: finished segment total size=" << callbackSize << std::endl;
263 if (oldData != callbackSize)
265 std::cerr <<
"outputStreamDataCallback: ERROR: callbackSize=" << callbackSize <<
" oldData=" << oldData << std::endl;
268 fprintf(outstream,
"\n");
275 qxtrmStatus status =
m_qx->RDMARegisterMemory(
m_chan_id, static_cast<char*>(local_address), len, rdmaAddr);
276 if (qxtrmStatusOK != status)
284 qxtrmStatus status =
m_qx->RDMAUnregisterMemory(
m_chan_id, rdmaAddr);
285 if (qxtrmStatusOK != status)
293 for(chan_map_t::const_iterator it =
m_chans.begin(); it !=
m_chans.end(); ++it)
295 it->second->channelStatus(os);
302 for(chan_map_t::const_iterator it =
m_chans.begin(); it !=
m_chans.end(); ++it)
304 it->second->channelTransferStats(os);
314 ip_addr = mac_addr =
"";
315 IP_ADAPTER_INFO* pAdapterInfos = (IP_ADAPTER_INFO*) malloc(
sizeof(IP_ADAPTER_INFO));
318 for(
int i = 0; i < 5 && (dwRetVal == ERROR_BUFFER_OVERFLOW || dwRetVal == NO_ERROR); ++i )
320 dwRetVal = GetAdaptersInfo(pAdapterInfos, &outBufLen);
321 if( dwRetVal == NO_ERROR )
325 else if( dwRetVal == ERROR_BUFFER_OVERFLOW )
328 pAdapterInfos = (IP_ADAPTER_INFO*) malloc(outBufLen);
336 if( dwRetVal == NO_ERROR )
338 IP_ADAPTER_INFO* pAdapterInfo = pAdapterInfos;
339 while( pAdapterInfo )
341 IP_ADDR_STRING* pIpAddress = &(pAdapterInfo->IpAddressList);
342 while( pIpAddress != 0 )
344 if ( !strncmp(pIpAddress->IpAddress.String, ip_prefix.c_str(), ip_prefix.size()) )
346 ip_addr = pIpAddress->IpAddress.String;
347 sprintf(mac_buffer,
"%02X-%02X-%02X-%02X-%02X-%02X", pAdapterInfo->Address[0], pAdapterInfo->Address[1], pAdapterInfo->Address[2],
348 pAdapterInfo->Address[3], pAdapterInfo->Address[4], pAdapterInfo->Address[5]);
349 mac_addr = mac_buffer;
351 pIpAddress = pIpAddress->Next;
353 pAdapterInfo = pAdapterInfo->Next;
361 std::stringstream config_stream;
362 std::string ip_prefix =
"192.168.1.";
363 std::string card_ip, card_mac;
364 std::fstream fs(config_file, std::ios::in);
365 config_stream << fs.rdbuf();
367 config_data = config_stream.str();
368 std::string ip_addr, mac_addr;
370 if ( ip_addr.size() == 0 )
372 ip_prefix =
"192.168.0.";
375 if ( ip_addr.size() == 0 )
377 ip_prefix =
"130.246.49.";
383 card_ip = Poco::format(
"%s%d", ip_prefix, 1);
384 card_mac =
"00-0A-35-00-00-00";
388 card_ip = Poco::format(
"%s%d", ip_prefix, 2);
389 card_mac =
"00-0A-35-00-00-02";
393 card_ip = Poco::format(
"%s%d", ip_prefix, 3);
394 card_mac =
"00-0A-35-00-00-03";
398 throw(std::runtime_error(
"invalid card ethernet id"));
401 struct sockaddr_in addr;
402 SOCKET sfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
407 memset(&addr, 0,
sizeof(
struct sockaddr_in));
408 addr.sin_family = AF_INET;
409 addr.sin_port = htons(0);
410 addr.sin_addr.S_un.S_addr = inet_addr(ip_addr.c_str());
411 if (::bind(sfd, (
struct sockaddr*)&addr,
sizeof(
struct sockaddr_in)) < 0)
415 int namelen =
sizeof(
struct sockaddr_in);
416 if (getsockname(sfd, (
struct sockaddr*)&addr, &namelen) < 0)
420 char port_string[10];
423 sprintf(port_string,
"0x%x", 0xfe02 + 8 * ethernet_id);
424 std::cerr <<
"Using HOST IP " << ip_addr <<
" Mac " << mac_addr <<
" Port " << port_string << std::endl;
425 LOGSTR_INFORMATION(
"Using HOST IP " << ip_addr <<
" Mac " << mac_addr <<
" Port " << port_string);
427 Poco::replaceInPlace(config_data,
"##HOST_IP##", ip_addr.c_str());
428 Poco::replaceInPlace(config_data,
"##HOST_MAC##", mac_addr.c_str());
429 Poco::replaceInPlace(config_data,
"##HOST_PORT_BASE##", port_string);
430 Poco::replaceInPlace(config_data,
"##PACKET_SIZE##",
"1452");
431 Poco::replaceInPlace(config_data,
"##DAE_CARD##",
"DAE_CARD_FPGA");
432 Poco::replaceInPlace(config_data,
"##DAE_CARD_IP##", card_ip.c_str());
433 Poco::replaceInPlace(config_data,
"##DAE_CARD_MAC##", card_mac.c_str());
434 Poco::replaceInPlace(config_data,
"##DAE_CARD_PORT_BASE##",
"0xfe22");
435 Poco::replaceInPlace(config_data,
"##ACK_TIMEOUT##",
"1000");
436 Poco::replaceInPlace(config_data,
"##DATA_TIMEOUT##",
"1000");
437 Poco::replaceInPlace(config_data,
"##ACK_MODE##",
"on");
438 if (do_remote_config)
440 Poco::replaceInPlace(config_data,
"##REMOTE_CONFIG##",
"<RemoteConfig remoteinterface=\"Eth0\" remoteprocess=\"process1\" remotenode=\"host\"/>");
444 Poco::replaceInPlace(config_data,
"##REMOTE_CONFIG##",
"");
446 Poco::replaceInPlace(config_data,
"##ACK_MODE##",
"on");
466 message =
"The command completed successfully with no errors.";
469 case qxtrmStatusInvalidChannelHandle:
470 message =
"The supplied channel handle is not valid. ";
473 case qxtrmStatusInvalidTransferForChannel:
474 message =
"The requested transfer type is not supported by this channel instance. ";
477 case qxtrmStatusChannelTerminated:
478 message =
"The channels transfer thread is not running. ";
481 case qxtrmStatusTransferInProgress:
482 message =
"The channel instance already has a transfer in progress, or has an unsynced completed transfer. ";
485 case qxtrmStatusInvalidTransferArguments:
486 message =
"The transfer arguments are invalid, i.e. the sizes may not be aligned, etc. ";
489 case qxtrmStatusTransferTimeout:
490 message =
"The timeout period on this channel expired before the transfer commenced. ";
493 case qxtrmStatusTransferErrors:
494 message =
"The last transfer on this channel completed with errors. Use GetTransferStats() to see the nature of the errors. ";
497 case qxtrmStatusTransferNotInProgress:
498 message =
"The channel instance does not have an outstanding transfer which require syncing. ";
501 case qxtrmStatusNoStreamDataWaiting:
502 message =
"There is no streaming data waiting to be received on the stream channel. Stream data must be available for an async call. ";
505 case qxtrmStatusNoStreamBuffersFree:
506 message =
"No free Stream buffers available to write into. ";
509 case qxtrmStatusStreamNotInitialised:
510 message =
"The Stream send has not been initialised yet. ";
513 case qxtrmStatusRDMARegionOverlap:
514 message =
"The specified RDMA region overlaps with an existing region. ";
517 case qxtrmStatusTooManyRDMARegions:
518 message =
"Too many RDMA memory regions already specified. ";
521 case qxtrmStatusError:
522 message =
"General error. ";
526 message = Poco::format(
"Unknown qxtrmStatus error code %d. ", qxtrm_status);
538 qxtrmChannelStatus chan_stat;
539 qxtrmStatus status = qx->GetTransferStats(chan, qxtrmTransStatsCmdLastStatus, &chan_stat);
540 if (qxtrmStatusOK == status)
554 switch(qxtrm_channel_status)
556 case qxtrmChanStatusOK:
557 message =
"Transfer completed OK. ";
560 case qxtrmChanStatusInitialPSNMismatch:
561 message =
"Transfer completed OK, but the initial packet sequence number was not what we expected. This may indicate a sync loss or a whole previous packet loss. ";
564 case qxtrmChanStatusTooMuchData:
565 message =
"Transfer completed OK. Too much data received compared to the amount expected. ";
568 case qxtrmChanStatusTooLittleData:
569 message =
"Transfer completed OK. Too little data received compared to the amount expected. ";
572 case qxtrmChanStatusLostDataPackets:
573 message =
"Transfer failed. Packet seqnece numbers within the message where inconsistent. One or more packets where not received. ";
576 case qxtrmChanStatusLostACKPackets:
577 message =
"Transfer failed. Packet seqnece numbers within the acknowledgment packets where inconsistent. One or more ACK packets where not received. ";
580 case qxtrmChanStatusACKRecvTimeout:
581 message =
"Transfer failed. Acknowledgment packet not received before timeout. ";
584 case qxtrmChanStatusDataRecvTimeout:
585 message =
"Transfer failed. Data packet not received before timeout. ";
588 case qxtrmChanStatusInvalidRDMAAddr:
589 message =
"Transfer failed. The specified RDMA region was invalid. ";
592 case qxtrmChanStatusTransportFailure:
593 message =
"Transfer failed. Send or receive command failed. ";
597 message = Poco::format(
"Unknown qxtrmChannelStatus error code %d. ", qxtrm_channel_status);
603 Qxtrm_driver::Qxtrm_driver(
const std::string& config_file,
const std::string& node_name,
int ethernet_id,
int process_number,
bool do_remote_config) : ISIS::Base(
"Qxtrm_driver"), m_qx(NULL)
606 LOGSTR_INFORMATION(
"Creating Qxtrm driver from \"" << config_file <<
"\" node \"" << node_name <<
"\" process \"process" << process_number <<
"\"");
607 int timeout = 30 * 1000;
608 std::string config_data;
609 std::string process_name = Poco::format(
"process%d", process_number);
610 loadConfigData(config_file, ethernet_id, do_remote_config, config_data);
611 m_qx =
new Quixtream(const_cast<char*>(node_name.c_str()), const_cast<char*>(process_name.c_str()),
612 const_cast<char*>(config_data.c_str()),
false, timeout);
Poco::SharedPtr< Qxtrm_channel > createChannel(const std::string &chan_name)
WinsockException(const std::string &message, DWORD code)
void RDMAUnregisterMemory(unsigned int rdmaAddr)
static FILE * m_outstream
void RDMARegisterMemory(void *local_address, unsigned int len, unsigned int rdmaAddr)
static std::string qxtrmChannelStatusMessage(int qxtrm_channel_status)
Poco::SharedPtr< Quixtream > m_qx
std::string channelStatus() const
static const int INVALID_QXTRM_CHANNEL
void blockSend(const void *buffer, unsigned int len)
std::ostream & channelTransferStats(std::ostream &os) const
static void getNetworkDetails(const std::string &ip_prefix, std::string &ip_addr, std::string &mac_addr)
void RDMAWrite(uint32_t value, unsigned int remote_address)
static std::string win32_message(const std::string &message, DWORD code)
std::ostream & channelTransferStats(std::ostream &os) const
void streamSend(const void *buffer, unsigned int len)
void channelStatsHelper(int chan, qxtrmTransStatsCmd cmd, const char *mess, std::ostream &oss) const
std::ostream & channelStatus(std::ostream &os) const
void loadConfigData(const std::string &config_file, int ethernet_id, bool do_remote_config, std::string &config_data)
#define LOGSTR_INFORMATION(__arg)
Poco::SharedPtr< Quixtream > m_qx
void RDMARead(uint32_t &value, unsigned int remote_address)
void streamReceive()
receives one segment
Qxtrm_driver(const std::string &config_file, const std::string &node_name, int ethernet_id, int process_number, bool do_remote_config)
static std::string qxtrmStatusMessage(int qxtrm_status, Quixtream *qx=NULL, int chan=INVALID_QXTRM_CHANNEL)
static int m_outstream_word_size
void setLoggerName(const std::string &logger_name)
WinsockException(const std::string &message)
Exception class recording an error raised by a Quixtream API function.
static void outputStreamDataCallback(char *pDataBuffer, unsigned int callbackSize, bool segmentEnd)
static int m_outstream_columns
void blockReceive(void *buffer, unsigned int len)
Qxtrm_channel(Poco::SharedPtr< Quixtream > qx, const std::string &chan_name)