NexusCPP  3.5.0
nxbuffer.hpp
Go to the documentation of this file.
1 //*****************************************************************************
17 //*****************************************************************************
18 
19 //==============================================================================
20 // DataStreamer
21 //==============================================================================
22 //------------------------------------------------------------------------------
23 // DataStreamer::PushData
24 //------------------------------------------------------------------------------
25 template <class TYPE> void DataStreamer::PushData(const std::string &sItemName,
26  const TYPE *pData, unsigned int nCount)
27 {
28  NX_SCOPE_DBG("BufferedData::PushData");
29 
30  if( 0 == nCount )
31  throw yat::Exception("ERROR", "Can't write 0 item!!!", "DataStreamer::PushData");
32 
33  // Thread safety
34  yat::AutoMutex<> _lock(m_mtxLock);
35 
36  DataItemInfo& ItemInfo = GetDataItemInfo(sItemName);
37  NX_DBG("Item: " << sItemName << " : " << nCount);
38 
39  bool bContinue = true;
40  TYPE *pToPush = (TYPE *)pData;
41  std::size_t nToPush = nCount;
42 
43  while( bContinue )
44  {
45  if( !PrivIsBufferOpen() )
46  {
47  PrivOpenNewbuffer();
48  }
49 
50  // Get data c-type at first push for this data item
51  if( NX_NONE == ItemInfo.ptrDatasetWriter->DataType() )
52  ItemInfo.ptrDatasetWriter->SetDataType<TYPE>();
53 
54  std::size_t nPush = 0;
55 
56  if( ItemInfo.mbPendingData.is_empty() && nToPush > 0 )
57  {
58  NX_DBG("No pending data");
59  std::size_t uiCurrentBufferPushedData = ItemInfo.uiPushCount - (m_nBufferCount * m_cfg.buffer_size);
60  if( uiCurrentBufferPushedData + nToPush <= m_cfg.buffer_size )
61  {
62  nPush = nToPush;
63  }
64  else
65  {
66  nPush = m_cfg.buffer_size - uiCurrentBufferPushedData;
67  }
68 
69  if( nCount > 1 )
70  {
71  NX_DBG("Push " << nPush << " / " << nToPush);
72  }
73 
74  if( nPush == nCount && ItemInfo.ptrDatasetWriter->DataItemShape().size() >= 1 )
75  { // Give the data's ownership to the Datasetwriter object, no copy is made for efficiency
76  PrivPushDataItems(ItemInfo, pToPush, nPush, true);
77  }
78  else
79  { // Push as much data as possible, data may be copied
80  PrivPushDataItems(ItemInfo, pToPush, nPush, false);
81  }
82  }
83 
84  if( nPush < nToPush && PrivIsBufferOpen() )
85  {
86  // All data was not pushed and current buffer still open, store the rest
87  std::size_t nPendingData = nToPush - nPush;
88  NX_DBG("Add pending data: " << nPendingData << " items.");
89  char *pToStore = (char *)pData + (nPush * ItemInfo.ptrDatasetWriter->DataItemSize());
90  ItemInfo.mbPendingData.put_bloc(pToStore, nPendingData * ItemInfo.ptrDatasetWriter->DataItemSize());
91  ItemInfo.nPendingData += nPendingData;
92  // Data not pushed is now in the pending data buffer
93  nToPush = 0;
94  }
95  else if( ItemInfo.mbPendingData.is_empty() )
96  { // All data is pushed => exits the 'while' loop
97  bContinue = false;
98  }
99 
100  // Check if current buffer file is already completed, close it if it's completed (for all
101  // DataItem objects)
102  BufferingControl();
103 
104  if( false == ItemInfo.mbPendingData.is_empty() && PrivIsBufferOpen() )
105  { // There is pending data but buffer cannot be closed because other DataItem objects
106  // have not already push all necessary data
107  NX_DBG("There is pending data but buffer cannot be closed");
108  bContinue = false;
109  }
110  }
111 }
112 
113 //------------------------------------------------------------------------------
114 // DataStreamer::PushAxisData
115 //------------------------------------------------------------------------------
116 template <class TYPE> void DataStreamer::PushAxisData(const std::string &sName, TYPE TValue)
117 {
118  DataStreamer::PushData(sName, &TValue);
119 }
#define NX_SCOPE_DBG(s)
Definition: nxdebug.h:77
#define NX_DBG(s)
Definition: nxdebug.h:76
Definition: nxfile.h:139