NexusCPP  3.5.0
nxbuffer.h
Go to the documentation of this file.
1 //*****************************************************************************
17 //*****************************************************************************
18 
19 #ifndef __NX_BUFFER_H__
20 #define __NX_BUFFER_H__
21 
22 // ============================================================================
36 // ============================================================================
37 
38 
39 // including standard files
40 #ifndef __STRING_INCLUDED__
41  #include <string>
42  #define __STRING_INCLUDED__
43 #endif
44 
45 #ifndef __VECTOR_INCLUDED__
46  #include <vector>
47  #define __VECTOR_INCLUDED__
48 #endif
49 
50 // YAT
51 #include <yat/utils/String.h>
52 #include <yat/time/Time.h>
53 #include <yat/threading/Task.h>
54 #include <yat/threading/Mutex.h>
55 #include <yat/utils/Callback.h>
56 
57 // NexusCPP
58 #include <nexuscpp/nxwriter.h>
59 
60 namespace nxcpp
61 {
62 
63 typedef std::vector<std::size_t> shape_t;
64 /*
65 #define SHAPE_1D \
66  shape_t
67 
68 // shape datatype
69 class Shape
70 {
71 public:
73  Shape(const std::vector<std::size_t>& dims) : dims_(dims) {}
74 
76  Shape(const Shape& other) : dims_(other.dims_) {}
77 
79  Shape() {}
80 
82  Shape(std::size_t dim);
83 
85  Shape(std::size_t dimX, std::size_t dimY);
86 
88  Shape(std::size_t dimX, std::size_t dimY, std::size_t dimZ);
89 
91  Shape& append(std::size_t dim_size);
92 
94  void append(const Shape& shape);
95 
97  const std::vector<std::size_t>& dimensions() const { return dims_; }
98 
100  std::size_Shape& operator[](std::size_t dim) const;
101 
103  Shape& operator[](std::size_t dim);
104 
105  Shape& operator+(std::size_t dim_size);
106 
107 private:
108  std::vector<std::size_t> dims_;
109 };
110 */
111 
112 //=============================================================================
117 //=============================================================================
119 {
120 public:
122 
123  OverwriteError(const char *pcszDesc, const char *pcszOrigin):
124  NexusException(pcszDesc, pcszOrigin) {}
125 
126  OverwriteError(const std::string& desc, const char *pcszOrigin):
127  NexusException(desc.c_str(), pcszOrigin) {}
128 };
129 
130 //==============================================================================
134 {
136  NX_AXIS = 2,
138  NX_DATA = 4
139 };
140 
141 #define NX_ATTR_NODE_NAME "buf_name"
142 #define NX_ATTR_DATA_SIZE "buf_size"
143 #define NX_ATTR_SLAB_START "buf_start"
144 #define NX_ATTR_SLAB_END "buf_end"
145 #define NX_ATTR_SLAB_STOP "buf_stop"
146 #define NX_ATTR_ACQ_BATCH "buf_batch"
147 
148 // type of buffered data
149 // '0' for one shoot data (scalar, spectrum or image), '1' for data subset in a 1D acquisition, ...
150 #define NX_ATTR_DATA_DIM "buf_dim"
151 
152 //==============================================================================
154 //==============================================================================
156 {
158  std::string item_name;
159 
161  yat::uint32 write_count;
162 
163 };
164 
166 YAT_DEFINE_CALLBACK(WriteNotificationCallback, WriteNotification);
167 
168 //==============================================================================
172 //==============================================================================
175  public IExceptionHandler,
176  public IMessageHandler
177 {
178 public:
179 
181  typedef enum MemoryMode
182  {
186  COPY = 0,
187 
191  NO_COPY
192  } MemoryMode;
193 
195  typedef struct Statistics
196  {
197  Statistics();
198  yat::uint64 ui64WrittenBytes;
199  yat::uint64 ui64PendingBytes;
200  yat::uint64 ui64MaxPendingBytes;
201  yat::uint64 ui64TotalBytes;
202  yat::uint16 ui16ActiveWriters;
207  } Statistics;
208 
210  typedef struct NEXUSCPP_DECL Config
211  {
213  yat::String buffer_name;
214 
216  std::size_t acquisition_size;
217 
219  std::size_t buffer_size;
220 
222  yat::String target_path;
223 
226  yat::String data_source;
227 
229  WriteNotificationCallback write_notif_cb;
230 
233 
236 
239 
242 
244  yat::String device_name;
245 
248 
251 
252  // ----------------
253  // error management
254  // ----------------
255 
258 
260  std::size_t max_attempts;
261 
263  std::size_t retry_delay;
264 
267  {
268  acquisition_size = 0;
269  buffer_size = 0;
270  compress_filter = CompressNone;
271  min_bytes_for_compression = 1024;
272  write_mode = NexusFileWriter::SYNCHRONOUS;
273  message_handler_p = 0;
274  exception_handler_p = 0;
275  max_attempts = 10;
276  retry_delay = 250;
277  throw_if_file_exists = false;
278  }
279  } Config;
280 
284  DataStreamer(const Config& cfg);
285 
287  virtual ~DataStreamer();
288 
289  // Initialization
290  //## Should be a private method...
291  void Initialize(const Config& cfg);
292 
296  static void ResetBufferIndex();
297 
301  void SetWriteNotificationCallback(const WriteNotificationCallback& cb);
302 
304 
313  void AddDataItem(const std::string &sItemName, const std::vector<int>& viDimSize,
314  bool bDataSignal=true) ;
315 
320  void AddDataItem0D(const std::string &sItemName, bool bDataSignal=true) ;
321 
327  void AddDataItem1D(const std::string &sItemName, int iSizeDim, bool bDataSignal=true);
328 
335  void AddDataItem2D(const std::string &sItemName, int iSizeDim1, int iSizeDim2,
336  bool bDataSignal=true) ;
337 
346  void AddDataAxis(const std::string &sItemName, int iDimension, int iOrder) ;
347 
352  void SetDataItemMemoryMode(const std::string &sItemName, MemoryMode mode);
353 
355 
357  template <class TYPE> void PushData(const std::string &sItemName, const TYPE *tData,
363  unsigned int nCount=1);
364 
369  template <class TYPE> void PushAxisData(const std::string &sName, TYPE TValue);
370 
378  void PushIntegerAttribute(const std::string &sItemName, const std::string &sName, long lValue);
379 
386  void PushFloatAttribute(const std::string &sItemName, const std::string &sName, double dValue);
387 
394  void PushStringAttribute(const std::string &sItemName, const std::string &sName,
395  const std::string &strValue) ;
396 
398 
402  void SetExceptionHandler(IExceptionHandler *pHandler);
403 
407  void SetMessageHandler(IMessageHandler *pHandler);
408 
411  //§ \param strDevice Device name
412  void SetDeviceName(const std::string &strDevice) { m_cfg.device_name = strDevice; }
413 
415  void Reset();
416 
418  void Finalize();
419 
423  void Abort(bool bSynchronize=false);
424 
426  void Stop();
427 
429  bool IsDone();
430 
432  void Synchronize();
433 
435  //## deprecated methods
436  void SetWriteMode(const NexusFileWriter::WriteMode &mode) { m_cfg.write_mode = mode; }
437  void SetTargetFolder(const std::string &sPath) { m_cfg.target_path = sPath; }
439 
441  NexusFileWriter::WriteMode WriteMode() const { return m_cfg.write_mode; }
442  const std::string &GetTargetFolder() const { return m_cfg.target_path; }
443  int GetNbPushInFile() const { return m_cfg.buffer_size; }
445 
450  void Clean();
451 
453  void OnFlushData(DatasetWriter* pWriter);
454 
456  bool IsExistingItem(const std::string &sItemName);
457 
459  IExceptionHandler* ExceptionHandler() const{ return m_pExceptHandler; }
460 
462  IMessageHandler* MessageHandler() const{ return m_pMsgHandler; }
463 
465 
467  void ResetStatistics();
468 
470  Statistics GetStatistics() const;
471 
473 
475 
483  static std::string GenerateBufferName(const std::string &sBaseName, long lIndex,
484  const std::string &strPrefix="");
485 
487 
489 
490  DataStreamer(const std::string&, int, int);
491  DataStreamer(const std::string&, int);
492  DataStreamer(const std::string&, std::size_t, std::size_t);
493  DataStreamer(const std::string&, std::size_t);
494  void Initialize(const std::string&x, const std::string&y="");
495  void SetDataItemNodeName(const std::string&, const std::string&);
496  void SetPath(const std::string&, const std::string&);
497  void SetWriteNotificationCallback(WriteNotificationCallback& cb) { m_write_notif_cb = cb; }
498  void SetWorkingFolder(const std::string&) { }
499  const std::string &GetWorkingFolder() const { return ""; }
500 
502 
503 private:
504 
505  struct DataItemInfo
506  {
507  std::string strDatasetName;
508  MemoryMode eMemoryMode;
509  DatasetWriterPtr ptrDatasetWriter;
510  std::vector<int> viNextStart;
511  std::vector<int> viCurrentStart;
512  std::vector<int> viTotalSize;
513  std::vector<int> viCurrentSize;
514  unsigned int uiPushCount; // Pushed data since begining
515  int iBatchIndex;
516  yat::MemBuf mbPendingData; // Data to be flushed later into the next buffer file
517  bool bAttributesWritten; // flag to ensure buffer attributes to be written once
518  std::size_t nPendingData; // Pending data count
519  std::size_t nRank; // Canonical data rank (0 to 2)
520  };
521  typedef std::map<std::string, DataItemInfo> DataItemMap;
522  typedef std::map<std::string, long> IndexMap;
523  typedef std::map<std::string, yat::uint64> StartMap;
524  class StreamBufferTask* m_pStreamBufferTask; // Task managing the file recording
525 
526  Config m_cfg; // Configuration
528  static IndexMap s_mapFileIndex; // Current buffer file's index
529  static StartMap s_mapStartIndex; // Current data buffer's index
530  static yat::Time s_tmLastWriteAccess;
531 
532  DataItemMap m_mapDataItem; // Data items to be written
533  unsigned int m_uiStepCompleted; // Last completed step
534  bool m_bInProgress; // true if data buffering is in progress
535  std::size_t m_nBufferCount; // Filled buffers since begining (in infinite mode: since begining of batch)
536  std::size_t m_nTotalBufferCount; // Filled buffers since begining (m_nTotalBufferCount eq. m_nBufferCount in finite mode)
537  mutable Statistics m_Stats;
538  static yat::Mutex s_indexLock;
539  mutable yat::Mutex m_mtxLock;
540  WriteNotificationCallback m_write_notif_cb;
541  IExceptionHandler *m_pExceptHandler; // Exception handler
542  IMessageHandler *m_pMsgHandler; // Message handler
543 
544  // PrivPushData
545  //
546  // sItemInfo sensor's information data
547  // tData Added data
548  // vtDetectPos sensor's data position in the acquisition multidimensional space
549  // nCount canonical data count
550  //
551  void PrivPushDataItems(DataItemInfo &sItemInfo, const void *pData, std::size_t nCount, bool bNoCopy);
552 
553  // Push pending data
554  //
555  // sItemInfo sensor's information data
556  //
557  void PushPendingData(DataItemInfo& ItemInfo);
558 
559  // Check buffer state
560  bool PrivIsBufferOpen();
561 
562  // Initializing default values
563  void init();
564 
565  void AddDataItem(const std::string &sItemName, const std::vector<int>& viDataDim,
566  DataItemCategory nxCat) ;
567 
568  // Buffer file control
569  long GetNewBufferFirstIndex(const std::string &sBaseName, const std::string &strPrefix);
570  bool CheckBufferDirectory(const std::string &sBaseName, const std::string &strPrefix);
571  bool IsBufferPathAvailable(const std::string &sBaseName, long lIndex);
572  void BufferingControl();
573  DataItemInfo &GetDataItemInfo(const std::string &sItemName) ;
574  DataItemInfo &GetDataItemInfoFromDatasetName(const std::string &strDataset) ;
575  void PrivOpenNewbuffer();
576  void PrivCloseBuffer(bool bStopMark=false);
577  void EndRecording();
578 
579  // interface NexusFileWriter::INotify
580  void OnWriteSubSet(NexusFileWriter* source_p, const std::string& dataset_path, int start_pos[MAX_RANK], int dim[MAX_RANK]);
581  void OnWrite(NexusFileWriter* source_p, const std::string& dataset_path);
582  void OnCloseFile(NexusFileWriter* source_p, const std::string& file_path);
583 
584  // interface IExceptionHandler
585  void OnNexusException(const NexusException &e);
586  // interface IMessageHandler
587  void OnNexusMessage(yat::ELogLevel lvl, const yat::String& msg);
588 };
589 
590 // Including template methods
592 
593 } // end of namespace
594 
596 // python ctypes wrapper (using ctype to avoid to use boost)
598 // DO NOT CHANGE ANYTHING WITHOUT VALIDATING YOUR MODS WITH A FLYSCAN EXPERT
600 extern "C"
601 {
602  //- ctor --------------------------------------------
603  NEXUSCPP_DECL nxcpp::DataStreamer * nds_new (const char* dsn,
604  yat::uint32 nas,
605  yat::uint32 nbs);
606 
607  //- dtor --------------------------------------------
609 
610  //- init --------------------------------------------
612  const char* tp,
613  const char* dp);
614 
615  //- stop -------------------------------------------
617 
618  //- fini --------------------------------------------
620 
621  //- 0D data item ------------------------------------
622  NEXUSCPP_DECL int nds_add_data_item_0D (nxcpp::DataStreamer* ds, const char* in);
623 
624  //- 1D data item ------------------------------------
626  const char* in,
627  yat::uint32 sd1);
628 
629  //- 2D data item ------------------------------------
631  const char* in,
632  yat::uint32 sd1,
633  yat::uint32 sd2);
634 
635  //- memory mode -------------------------------------
637  const char* in,
638  yat::uint32 mem_mode);
639 
640  //- push data into the stream -----------------------
642  const char* data_item_name,
643  yat::uint32 data_type,
644  yat::uint32 num_items_in_data_buffer,
645  void* data_buffer);
646 }
647 
648 #endif
NEXUSCPP_DECL int nds_add_data_item_0D(nxcpp::DataStreamer *ds, const char *in)
yat::uint16 ui16MaxSimultaneousWriters
Definition: nxbuffer.h:203
float fInstantMbPerSec
Definition: nxbuffer.h:204
Definition: nxfile.h:104
MemoryMode
Memory mode.
Definition: nxbuffer.h:181
std::string item_name
data item name
Definition: nxbuffer.h:158
Definition: nxwriter.h:241
IExceptionHandler * exception_handler_p
optional exception handler
Definition: nxbuffer.h:257
#define NEXUSCPP_DECL
Definition: nxfile.h:67
DataSet (signal)
Definition: nxbuffer.h:138
float fAverageMbPerSec
Definition: nxbuffer.h:206
yat::uint64 ui64MaxPendingBytes
Definition: nxbuffer.h:200
std::vector< std::size_t > shape_t
Definition: nxbuffer.h:63
void SetWorkingFolder(const std::string &)
Definition: nxbuffer.h:498
Exception handling interface.
Definition: nxwriter.h:215
NEXUSCPP_DECL int nds_set_data_item_memory_mode(nxcpp::DataStreamer *ds, const char *in, yat::uint32 mem_mode)
Write statistices.
Definition: nxbuffer.h:195
Definition: nxbuffer.h:118
NEXUSCPP_DECL nxcpp::DataStreamer * nds_new(const char *dsn, yat::uint32 nas, yat::uint32 nbs)
yat::String device_name
The data source device name.
Definition: nxbuffer.h:244
int GetNbPushInFile() const
Definition: nxbuffer.h:443
yat::uint32 write_count
The number of written items.
Definition: nxbuffer.h:161
DataItemCategory
Definition: nxbuffer.h:133
std::size_t min_bytes_for_compression
Minimum uncompressed byte size of data item that will be compressed.
Definition: nxbuffer.h:241
std::size_t max_attempts
max attempts on write access in case of error
Definition: nxbuffer.h:260
void SetTargetFolder(const std::string &sPath)
Definition: nxbuffer.h:437
WriteNotificationCallback write_notif_cb
The optional write notification callback.
Definition: nxbuffer.h:229
void SetWriteNotificationCallback(WriteNotificationCallback &cb)
Definition: nxbuffer.h:497
std::size_t retry_delay
delay between each try in ms
Definition: nxbuffer.h:263
std::size_t buffer_size
number of data to be stored into one buffer file
Definition: nxbuffer.h:219
IMessageHandler * message_handler_p
Messages logging handler.
Definition: nxbuffer.h:247
Write notification.
Definition: nxbuffer.h:155
NexusFileWriter::WriteMode WriteMode() const
Definition: nxbuffer.h:441
yat::String target_path
Target directory for streamed data.
Definition: nxbuffer.h:222
std::size_t acquisition_size
total number of data (may be either scalars, spectrums or images)
Definition: nxbuffer.h:216
yat::uint64 ui64WrittenBytes
Definition: nxbuffer.h:198
const int MAX_RANK
Max datasets rank.
Definition: nxfile.h:175
yat::String buffer_name
buffer file names suffix
Definition: nxbuffer.h:213
Definition: nxdebug.h:31
const std::string & GetWorkingFolder() const
Definition: nxbuffer.h:499
Message handling interface.
Definition: nxwriter.h:225
IExceptionHandler * ExceptionHandler() const
Get exception handler.
Definition: nxbuffer.h:459
yat::SharedPtr< DatasetWriter, yat::Mutex > DatasetWriterPtr
Shared pointer definition.
Definition: nxwriter.h:622
NEXUSCPP_DECL int nds_stop(nxcpp::DataStreamer *ds)
AxisDataSet.
Definition: nxbuffer.h:137
void SetDeviceName(const std::string &strDevice)
Definition: nxbuffer.h:412
NEXUSCPP_DECL int nds_add_data_item_2D(nxcpp::DataStreamer *ds, const char *in, yat::uint32 sd1, yat::uint32 sd2)
float fPeakMbPerSec
Definition: nxbuffer.h:205
OverwriteError(const char *pcszDesc, const char *pcszOrigin)
Definition: nxbuffer.h:123
GenericDataSet.
Definition: nxbuffer.h:136
std::map< FilterOption, int > FilterConfig
Filter options to be passed with CreateCompressedDataSet method.
Definition: nxfile.h:93
Config()
c-tor
Definition: nxbuffer.h:266
yat::uint64 ui64TotalBytes
Definition: nxbuffer.h:201
NexusFileWriter::WriteMode write_mode
SYNCHRONOUS OR ASYNCHRONOUS writing mode.
Definition: nxbuffer.h:232
YAT_DEFINE_CALLBACK(WriteNotificationCallback, WriteNotification)
Write notification callback.
Definition: nxfile.h:182
Definition: nxwriter.h:431
CompressionFilter
List of known compression filters.
Definition: nxfile.h:102
FilterConfig filter_config
Compresssion filter options.
Definition: nxbuffer.h:235
NEXUSCPP_DECL void nds_delete(nxcpp::DataStreamer *ds)
NEXUSCPP_DECL int nds_fini(nxcpp::DataStreamer *ds)
IMessageHandler * MessageHandler() const
Get message handler.
Definition: nxbuffer.h:462
Definition: nxwriter.h:246
WriteMode
Definition: nxwriter.h:244
yat::uint64 ui64PendingBytes
Definition: nxbuffer.h:199
void SetWriteMode(const NexusFileWriter::WriteMode &mode)
Definition: nxbuffer.h:436
Definition: nxbuffer.h:173
bool throw_if_file_exists
Throw an exception if destination buffer file already exists ?
Definition: nxbuffer.h:250
OverwriteError()
Definition: nxbuffer.h:121
yat::uint16 ui16ActiveWriters
Definition: nxbuffer.h:202
NEXUSCPP_DECL int nds_push_data(nxcpp::DataStreamer *ds, const char *data_item_name, yat::uint32 data_type, yat::uint32 num_items_in_data_buffer, void *data_buffer)
OverwriteError(const std::string &desc, const char *pcszOrigin)
Definition: nxbuffer.h:126
Write notification interface.
Definition: nxwriter.h:269
NEXUSCPP_DECL int nds_init(nxcpp::DataStreamer *ds, const char *tp, const char *dp)
yat::String data_source
Definition: nxbuffer.h:226
DataStreamer configuration structure.
Definition: nxbuffer.h:210
const std::string & GetTargetFolder() const
Definition: nxbuffer.h:442
CompressionFilter compress_filter
Compression filer.
Definition: nxbuffer.h:238
NEXUSCPP_DECL int nds_add_data_item_1D(nxcpp::DataStreamer *ds, const char *in, yat::uint32 sd1)
Definition: nxbuffer.h:135