CustusX  2023.01.05-dev+develop.0da12
An IGT application
cxIGTLinkClientStreamer.cpp
Go to the documentation of this file.
1 /*=========================================================================
2 This file is part of CustusX, an Image Guided Therapy Application.
3 
4 Copyright (c) SINTEF Department of Medical Technology.
5 All rights reserved.
6 
7 CustusX is released under a BSD 3-Clause license.
8 
9 See Lisence.txt (https://github.com/SINTEFMedtek/CustusX/blob/master/License.txt) for details.
10 =========================================================================*/
12 
13 #include <QTcpSocket>
14 #include "igtlOSUtil.h"
15 #include "igtlMessageHeader.h"
16 #include "igtlTransformMessage.h"
17 #include "igtlPositionMessage.h"
18 #include "igtlImageMessage.h"
19 #include "igtlClientSocket.h"
20 #include "igtlStatusMessage.h"
21 
22 #include "cxTypeConversions.h"
23 #include "cxLogger.h"
24 #include "cxIGTLinkConversion.h"
27 #include "cxCyclicActionLogger.h"
28 #include "cxUtilHelpers.h"
29 #include "cxTime.h"
30 #include "cxSender.h"
31 #include "vtkImageData.h"
32 
33 namespace cx
34 {
35 
37  mHeadingReceived(false),
38  mAddress(""),
39  mPort(0)
40 {
41 }
42 
44 {
45 
46 }
47 
48 void IGTLinkClientStreamer::setAddress(QString address, int port)
49 {
50  mAddress = address;
51  mPort = port;
52 }
53 
54 
56 {
57  mSender = sender;
58 // this->createSendTimer();
59 // mTestTimer = new QTimer(this);
60 // connect(mTestTimer, SIGNAL(timeout()), this, SLOT(myStreamSlot()));
61 // std::cout << "IGTLinkClientStreamer::startStreaming " << std::endl;
62 
63  // Establish Connection
64  mSocket.reset(new QTcpSocket());
65  connect(mSocket.get(), SIGNAL(readyRead()), this, SLOT(readyReadSlot()), Qt::DirectConnection);
66  connect(mSocket.get(), SIGNAL(hostFound()), this, SLOT(hostFoundSlot()), Qt::DirectConnection);
67  connect(mSocket.get(), SIGNAL(connected()), this, SLOT(connectedSlot()), Qt::DirectConnection);
68  connect(mSocket.get(), SIGNAL(disconnected()), this, SLOT(disconnectedSlot()), Qt::DirectConnection);
69  connect(mSocket.get(), SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(errorSlot(QAbstractSocket::SocketError)),
70  Qt::DirectConnection);
71 
72  if (!this->multipleTryConnectToHost())
73  {
74  reportError("IGTLinkClientStreamer: Failed to start streaming");
75  mSocket.reset();
76  return;
77  }
78 
79  // Create a message buffer to receive header
80  mHeaderMsg = igtl::MessageHeader::New();
81 }
82 
83 bool IGTLinkClientStreamer::multipleTryConnectToHost()
84 {
85  // hold here until all attempts are finished
86  int numberOfConnectionAttempts = 5;
87  int baseSleep = 300;
88  for (int i=0; i<numberOfConnectionAttempts; ++i)
89  {
90  if (i>0)
91  report(QString("[%2] Attempt %1 to connect to host").arg(i+1).arg(this->hostDescription()));
92  if (this->tryConnectToHost())
93  return true;
94  sleep_ms(baseSleep*(i+1));
95  }
96  reportError(QString("[%1] Timeout connecting to host").arg(this->hostDescription()));
97  return false;
98 }
99 
100 bool IGTLinkClientStreamer::tryConnectToHost()
101 {
102  mSocket->connectToHost(mAddress, mPort);
103 
104  int timeout = 5000;
105  if (!mSocket->waitForConnected(timeout))
106  {
107  mSocket->disconnectFromHost();
108  return false;
109  }
110  return true;
111 }
112 
114 {
115  if (mSocket)
116  {
117  mSocket->disconnectFromHost();
118  mSocket.reset();
119  }
120  mSender.reset();
121 }
122 
124 {
125  return (mSocket && mSocket->isValid());
126 }
127 
128 QString IGTLinkClientStreamer::hostDescription() const
129 {
130  return mAddress + ":" + qstring_cast(mPort);
131 }
132 
133 void IGTLinkClientStreamer::hostFoundSlot()
134 {
135  report(QString("[%1] Found host").arg(this->hostDescription()));
136 // report("Host found: " + this->hostDescription());
137 }
138 void IGTLinkClientStreamer::connectedSlot()
139 {
140  reportSuccess(QString("[%1] Connected to host").arg(this->hostDescription()));
141 }
142 void IGTLinkClientStreamer::disconnectedSlot()
143 {
144  report(QString("[%1] Disconnected from host").arg(this->hostDescription()));
145 // report("Disconnected from host " + this->hostDescription());
146 }
147 void IGTLinkClientStreamer::errorSlot(QAbstractSocket::SocketError socketError)
148 {
149  report(QString("[%1] Socket error [code=%2]: %3")
150  .arg(this->hostDescription())
151  .arg(QString::number(socketError))
152  .arg(mSocket->errorString()));
153 }
154 
155 void IGTLinkClientStreamer::readyReadSlot()
156 {
157  // read messages until one fails
158  while (this->readOneMessage());
159 }
160 
165 bool IGTLinkClientStreamer::readOneMessage()
166 {
167 
168 // std::cout << "tick " << std::endl;
169 
170  if (!mHeadingReceived)
171  {
172 // std::cout << "client::tick: received: " << mSocket->bytesAvailable() << ", head needed: " << mHeaderMsg->GetPackSize() << std::endl;
173  // Initialize receive buffer
174  mHeaderMsg->InitPack();
175 
176  // ignore if not enough data (yet)
177  if (mSocket->bytesAvailable() < mHeaderMsg->GetPackSize())
178  {
179  //std::cout << "Incomplete heading received, ignoring. " << std::endl;
180  //std::cout << "available: " << mSocket->bytesAvailable() << ", needed " << mHeaderMsg->GetPackSize() << std::endl;
181  return false;
182  }
183 
184  // after peek: read to increase pos
185  mSocket->read(reinterpret_cast<char*>(mHeaderMsg->GetPackPointer()), mHeaderMsg->GetPackSize());
186  mHeadingReceived = true;
187 
188  // Deserialize the header
189  mHeaderMsg->Unpack();
190  }
191 
192  if (mHeadingReceived)
193  {
194 // std::cout << "client::tick: received: " << mSocket->bytesAvailable() << ", body needed: " << mHeaderMsg->GetBodySizeToRead() << std::endl;
195  bool success = false;
196  // Check data type and receive data body
197 // if (QString(mHeaderMsg->GetDeviceType()) == "TRANSFORM")
198 // {
199 // ReceiveTransform(mSocket, mHeaderMsg);
200 // }
201 // else if (QString(mHeaderMsg->GetDeviceType() == "POSITION")
202 // {
203 // ReceivePosition(mSocket, mHeaderMsg);
204 // }
205  if (QString(mHeaderMsg->GetDeviceType()) == "IMAGE")
206  {
207  success = this->ReceiveImage(mSocket.get(), mHeaderMsg);
208  }
209  else if (QString(mHeaderMsg->GetDeviceType()) == "CX_US_ST")
210  {
211  success = this->ReceiveSonixStatus(mSocket.get(), mHeaderMsg);
212  }
213 // else if (QString(mHeaderMsg->GetDeviceType() == "STATUS")
214 // {
215 // ReceiveStatus(mSocket, mHeaderMsg);
216 // }
217  else
218  {
219  std::cerr << "Receiving : " << mHeaderMsg->GetDeviceType() << std::endl;
220  mSocket->read(mHeaderMsg->GetBodySizeToRead());
221  }
222 
223  if (success)
224  mHeadingReceived = false; // restart
225  else
226  return false;
227  }
228 // std::cout << " tock " << std::endl;
229  return true;
230 }
231 
232 bool IGTLinkClientStreamer::ReceiveSonixStatus(QTcpSocket* socket, igtl::MessageHeader::Pointer& header)
233 {
235  msg = IGTLinkUSStatusMessage::New();
236  msg->SetMessageHeader(header);
237  msg->AllocatePack();
238 
239  if (socket->bytesAvailable() < msg->GetPackBodySize())
240  {
241  //std::cout << "Incomplete body received, ignoring. " << std::endl;
242  return false;
243  }
244  socket->read(reinterpret_cast<char*>(msg->GetPackBodyPointer()), msg->GetPackBodySize());
245  // Deserialize the transform data
246  // If you want to do a CRC check, call Unpack(1).
247  // If you want to skip CRC check, call Unpack() without argument.
248  int c = msg->Unpack();
249  if (c & (igtl::MessageHeader::UNPACK_BODY | igtl::MessageHeader::UNPACK_UNDEF)) // if CRC check is OK or skipped
250  {
251  this->addToQueue(msg);
252 
253  return true;
254  }
255 
256  std::cout << "body crc failed!" << std::endl;
257  return true;
258 }
259 
260 namespace
261 {
262 QDateTime my_decode_timestamp(igtl::MessageBase* msg)
263 {
264  // get timestamp from igtl second-format:
265  igtl::TimeStamp::Pointer timestamp = igtl::TimeStamp::New();
266  msg->GetTimeStamp(timestamp);
267  double timestampMS = timestamp->GetTimeStamp() * 1000;
268  return QDateTime::fromMSecsSinceEpoch(timestampMS);
269 }
270 
271 void write_time_info(igtl::ImageMessage::Pointer imgMsg)
272 {
273  int kb = imgMsg->GetPackSize()/1024;
274 // CX_LOG_CHANNEL_DEBUG("igtl_rec_test") << "unpacked: , " << kb << " kByte, name=" << imgMsg->GetDeviceName();
275  QDateTime org_ts = my_decode_timestamp(imgMsg.GetPointer());
276  QDateTime now_ts = QDateTime::currentDateTime();
277  QString format = timestampMilliSecondsFormatNice();
278  CX_LOG_CHANNEL_DEBUG("igtl_rec_test") << "received " << kb << "kByte"
279  << ", time=(" << org_ts.toString(format) << "->" << now_ts.toString(format) << ")"
280  << ", lag=" << org_ts.msecsTo(now_ts) << "ms";
281 }
282 }
283 
284 bool IGTLinkClientStreamer::ReceiveImage(QTcpSocket* socket, igtl::MessageHeader::Pointer& header)
285 {
286  // Create a message buffer to receive transform data
287  igtl::ImageMessage::Pointer imgMsg = igtl::ImageMessage::New();
288  imgMsg->SetMessageHeader(header);
289  imgMsg->AllocatePack();
290 
291  // Receive transform data from the socket
292  // ignore if not enough data (yet)
293  if (socket->bytesAvailable() < imgMsg->GetPackBodySize())
294  {
295  //std::cout << "Incomplete body received, ignoring. " << std::endl;
296  return false;
297  }
298 
299  socket->read(reinterpret_cast<char*>(imgMsg->GetPackBodyPointer()), imgMsg->GetPackBodySize());
300  // Deserialize the transform data
301  // If you want to do a CRC check, call Unpack(1).
302  // If you want to skip CRC check, call Unpack() without argument.
303  int c = imgMsg->Unpack();
304 
305 // write_time_info(imgMsg);
306 
307  if (c & (igtl::MessageHeader::UNPACK_BODY | igtl::MessageHeader::UNPACK_UNDEF)) // if CRC check is OK or skipped
308  {
309  this->addToQueue(imgMsg);
310  return true;
311  }
312 
313  std::cout << "body crc failed!" << std::endl;
314  return true;
315 }
316 
317 void IGTLinkClientStreamer::addToQueue(IGTLinkUSStatusMessage::Pointer msg)
318 {
319  // set temporary, then assume the image adder will pass this message on.
320  mUnsentUSStatusMessage = msg;
321 }
322 
323 void IGTLinkClientStreamer::addToQueue(igtl::ImageMessage::Pointer msg)
324 {
325  IGTLinkConversion converter;
326  IGTLinkConversionImage imageconverter;
327  IGTLinkConversionSonixCXLegacy cxconverter;
328 
329  PackagePtr package(new Package());
330 
331  if (cxconverter.guessIsSonixLegacyFormat(msg->GetDeviceName()))
332  {
333  package->mImage = cxconverter.decode(msg);
334  }
335  else
336  {
337  package->mImage = imageconverter.decode(msg);
338  }
339 
340  // if us status not sent, do it here
341  if (mUnsentUSStatusMessage)
342  {
343  package->mProbe = converter.decode(mUnsentUSStatusMessage, msg, ProbeDefinitionPtr());
344 
345  if (cxconverter.guessIsSonixLegacyFormat(mUnsentUSStatusMessage->GetDeviceName()))
346  {
347  package->mProbe = cxconverter.decode(package->mProbe);
348  }
349 
350  //Commenting out this line means that the last US sector information is always sent together with the image
351 // mUnsentUSStatusMessage = IGTLinkUSStatusMessage::Pointer();
352  }
353 
354  //Should only be needed if time stamp is set on another computer that is
355  //not synched with the one running this code: e.g. The Ultrasonix scanner
356  mStreamSynchronizer.syncToCurrentTime(package->mImage);
357 
358  mSender->send(package);
359 }
360 
361 
362 } // namespace cx
363 
364 
QString qstring_cast(const T &val)
void reportError(QString msg)
Definition: cxLogger.cpp:71
#define CX_LOG_CHANNEL_DEBUG(channel)
Definition: cxLogger.h:107
void reportSuccess(QString msg)
Definition: cxLogger.cpp:72
boost::shared_ptr< struct Package > PackagePtr
void report(QString msg)
Definition: cxLogger.cpp:69
boost::shared_ptr< Sender > SenderPtr
Definition: cxSender.h:64
QString timestampMilliSecondsFormatNice()
Definition: cxTime.cpp:30
void sleep_ms(int ms)
boost::shared_ptr< class ProbeDefinition > ProbeDefinitionPtr
Namespace for all CustusX production code.