19 #ifndef __NX_BUFFER_H__ 20 #define __NX_BUFFER_H__ 40 #ifndef __STRING_INCLUDED__ 42 #define __STRING_INCLUDED__ 45 #ifndef __VECTOR_INCLUDED__ 47 #define __VECTOR_INCLUDED__ 51 #include <yat/utils/String.h> 52 #include <yat/time/Time.h> 53 #include <yat/threading/Task.h> 54 #include <yat/threading/Mutex.h> 55 #include <yat/utils/Callback.h> 141 #define NX_ATTR_NODE_NAME "buf_name" 142 #define NX_ATTR_DATA_SIZE "buf_size" 143 #define NX_ATTR_SLAB_START "buf_start" 144 #define NX_ATTR_SLAB_END "buf_end" 145 #define NX_ATTR_SLAB_STOP "buf_stop" 146 #define NX_ATTR_ACQ_BATCH "buf_batch" 150 #define NX_ATTR_DATA_DIM "buf_dim" 268 acquisition_size = 0;
271 min_bytes_for_compression = 1024;
273 message_handler_p = 0;
274 exception_handler_p = 0;
277 throw_if_file_exists =
false;
291 void Initialize(
const Config& cfg);
296 static void ResetBufferIndex();
301 void SetWriteNotificationCallback(
const WriteNotificationCallback& cb);
313 void AddDataItem(
const std::string &sItemName,
const std::vector<int>& viDimSize,
314 bool bDataSignal=
true) ;
320 void AddDataItem0D(
const std::string &sItemName,
bool bDataSignal=
true) ;
327 void AddDataItem1D(
const std::string &sItemName,
int iSizeDim,
bool bDataSignal=
true);
335 void AddDataItem2D(
const std::string &sItemName,
int iSizeDim1,
int iSizeDim2,
336 bool bDataSignal=
true) ;
346 void AddDataAxis(
const std::string &sItemName,
int iDimension,
int iOrder) ;
352 void SetDataItemMemoryMode(
const std::string &sItemName, MemoryMode mode);
357 template <
class TYPE>
void PushData(
const std::string &sItemName,
const TYPE *tData,
363 unsigned int nCount=1);
369 template <
class TYPE>
void PushAxisData(
const std::string &sName, TYPE TValue);
378 void PushIntegerAttribute(
const std::string &sItemName,
const std::string &sName,
long lValue);
386 void PushFloatAttribute(
const std::string &sItemName,
const std::string &sName,
double dValue);
394 void PushStringAttribute(
const std::string &sItemName,
const std::string &sName,
395 const std::string &strValue) ;
412 void SetDeviceName(
const std::string &strDevice) { m_cfg.device_name = strDevice; }
423 void Abort(
bool bSynchronize=
false);
456 bool IsExistingItem(
const std::string &sItemName);
467 void ResetStatistics();
483 static std::string GenerateBufferName(
const std::string &sBaseName,
long lIndex,
484 const std::string &strPrefix=
"");
492 DataStreamer(
const std::string&, std::size_t, std::size_t);
494 void Initialize(
const std::string&x,
const std::string&y=
"");
495 void SetDataItemNodeName(
const std::string&,
const std::string&);
496 void SetPath(
const std::string&,
const std::string&);
507 std::string strDatasetName;
508 MemoryMode eMemoryMode;
510 std::vector<int> viNextStart;
511 std::vector<int> viCurrentStart;
512 std::vector<int> viTotalSize;
513 std::vector<int> viCurrentSize;
514 unsigned int uiPushCount;
516 yat::MemBuf mbPendingData;
517 bool bAttributesWritten;
518 std::size_t nPendingData;
521 typedef std::map<std::string, DataItemInfo> DataItemMap;
522 typedef std::map<std::string, long> IndexMap;
523 typedef std::map<std::string, yat::uint64> StartMap;
524 class StreamBufferTask* m_pStreamBufferTask;
528 static IndexMap s_mapFileIndex;
529 static StartMap s_mapStartIndex;
530 static yat::Time s_tmLastWriteAccess;
532 DataItemMap m_mapDataItem;
533 unsigned int m_uiStepCompleted;
535 std::size_t m_nBufferCount;
536 std::size_t m_nTotalBufferCount;
538 static yat::Mutex s_indexLock;
539 mutable yat::Mutex m_mtxLock;
540 WriteNotificationCallback m_write_notif_cb;
551 void PrivPushDataItems(DataItemInfo &sItemInfo,
const void *pData, std::size_t nCount,
bool bNoCopy);
557 void PushPendingData(DataItemInfo& ItemInfo);
560 bool PrivIsBufferOpen();
565 void AddDataItem(
const std::string &sItemName,
const std::vector<int>& viDataDim,
569 long GetNewBufferFirstIndex(
const std::string &sBaseName,
const std::string &strPrefix);
570 bool CheckBufferDirectory(
const std::string &sBaseName,
const std::string &strPrefix);
571 bool IsBufferPathAvailable(
const std::string &sBaseName,
long lIndex);
572 void BufferingControl();
573 DataItemInfo &GetDataItemInfo(
const std::string &sItemName) ;
574 DataItemInfo &GetDataItemInfoFromDatasetName(
const std::string &strDataset) ;
575 void PrivOpenNewbuffer();
576 void PrivCloseBuffer(
bool bStopMark=
false);
580 void OnWriteSubSet(
NexusFileWriter* source_p,
const std::string& dataset_path,
int start_pos[
MAX_RANK],
int dim[MAX_RANK]);
581 void OnWrite(
NexusFileWriter* source_p,
const std::string& dataset_path);
582 void OnCloseFile(
NexusFileWriter* source_p,
const std::string& file_path);
587 void OnNexusMessage(yat::ELogLevel lvl,
const yat::String& msg);
638 yat::uint32 mem_mode);
642 const char* data_item_name,
643 yat::uint32 data_type,
644 yat::uint32 num_items_in_data_buffer,
NEXUSCPP_DECL int nds_add_data_item_0D(nxcpp::DataStreamer *ds, const char *in)
yat::uint16 ui16MaxSimultaneousWriters
Definition: nxbuffer.h:203
float fInstantMbPerSec
Definition: nxbuffer.h:204
MemoryMode
Memory mode.
Definition: nxbuffer.h:181
std::string item_name
data item name
Definition: nxbuffer.h:158
Definition: nxwriter.h:241
IExceptionHandler * exception_handler_p
optional exception handler
Definition: nxbuffer.h:257
#define NEXUSCPP_DECL
Definition: nxfile.h:67
DataSet (signal)
Definition: nxbuffer.h:138
float fAverageMbPerSec
Definition: nxbuffer.h:206
yat::uint64 ui64MaxPendingBytes
Definition: nxbuffer.h:200
std::vector< std::size_t > shape_t
Definition: nxbuffer.h:63
void SetWorkingFolder(const std::string &)
Definition: nxbuffer.h:498
Exception handling interface.
Definition: nxwriter.h:215
NEXUSCPP_DECL int nds_set_data_item_memory_mode(nxcpp::DataStreamer *ds, const char *in, yat::uint32 mem_mode)
Write statistices.
Definition: nxbuffer.h:195
Definition: nxbuffer.h:118
NEXUSCPP_DECL nxcpp::DataStreamer * nds_new(const char *dsn, yat::uint32 nas, yat::uint32 nbs)
yat::String device_name
The data source device name.
Definition: nxbuffer.h:244
int GetNbPushInFile() const
Definition: nxbuffer.h:443
yat::uint32 write_count
The number of written items.
Definition: nxbuffer.h:161
DataItemCategory
Definition: nxbuffer.h:133
std::size_t min_bytes_for_compression
Minimum uncompressed byte size of data item that will be compressed.
Definition: nxbuffer.h:241
std::size_t max_attempts
max attempts on write access in case of error
Definition: nxbuffer.h:260
void SetTargetFolder(const std::string &sPath)
Definition: nxbuffer.h:437
WriteNotificationCallback write_notif_cb
The optional write notification callback.
Definition: nxbuffer.h:229
void SetWriteNotificationCallback(WriteNotificationCallback &cb)
Definition: nxbuffer.h:497
std::size_t retry_delay
delay between each try in ms
Definition: nxbuffer.h:263
std::size_t buffer_size
number of data to be stored into one buffer file
Definition: nxbuffer.h:219
IMessageHandler * message_handler_p
Messages logging handler.
Definition: nxbuffer.h:247
Write notification.
Definition: nxbuffer.h:155
NexusFileWriter::WriteMode WriteMode() const
Definition: nxbuffer.h:441
yat::String target_path
Target directory for streamed data.
Definition: nxbuffer.h:222
std::size_t acquisition_size
total number of data (may be either scalars, spectrums or images)
Definition: nxbuffer.h:216
yat::uint64 ui64WrittenBytes
Definition: nxbuffer.h:198
const int MAX_RANK
Max datasets rank.
Definition: nxfile.h:175
yat::String buffer_name
buffer file names suffix
Definition: nxbuffer.h:213
const std::string & GetWorkingFolder() const
Definition: nxbuffer.h:499
Message handling interface.
Definition: nxwriter.h:225
IExceptionHandler * ExceptionHandler() const
Get exception handler.
Definition: nxbuffer.h:459
yat::SharedPtr< DatasetWriter, yat::Mutex > DatasetWriterPtr
Shared pointer definition.
Definition: nxwriter.h:622
NEXUSCPP_DECL int nds_stop(nxcpp::DataStreamer *ds)
AxisDataSet.
Definition: nxbuffer.h:137
void SetDeviceName(const std::string &strDevice)
Definition: nxbuffer.h:412
NEXUSCPP_DECL int nds_add_data_item_2D(nxcpp::DataStreamer *ds, const char *in, yat::uint32 sd1, yat::uint32 sd2)
float fPeakMbPerSec
Definition: nxbuffer.h:205
OverwriteError(const char *pcszDesc, const char *pcszOrigin)
Definition: nxbuffer.h:123
GenericDataSet.
Definition: nxbuffer.h:136
std::map< FilterOption, int > FilterConfig
Filter options to be passed with CreateCompressedDataSet method.
Definition: nxfile.h:93
Config()
c-tor
Definition: nxbuffer.h:266
yat::uint64 ui64TotalBytes
Definition: nxbuffer.h:201
NexusFileWriter::WriteMode write_mode
SYNCHRONOUS OR ASYNCHRONOUS writing mode.
Definition: nxbuffer.h:232
YAT_DEFINE_CALLBACK(WriteNotificationCallback, WriteNotification)
Write notification callback.
Definition: nxwriter.h:431
Definition: nxwriter.h:434
CompressionFilter
List of known compression filters.
Definition: nxfile.h:102
FilterConfig filter_config
Compresssion filter options.
Definition: nxbuffer.h:235
NEXUSCPP_DECL void nds_delete(nxcpp::DataStreamer *ds)
NEXUSCPP_DECL int nds_fini(nxcpp::DataStreamer *ds)
IMessageHandler * MessageHandler() const
Get message handler.
Definition: nxbuffer.h:462
Definition: nxwriter.h:246
WriteMode
Definition: nxwriter.h:244
yat::uint64 ui64PendingBytes
Definition: nxbuffer.h:199
void SetWriteMode(const NexusFileWriter::WriteMode &mode)
Definition: nxbuffer.h:436
Definition: nxbuffer.h:173
bool throw_if_file_exists
Throw an exception if destination buffer file already exists ?
Definition: nxbuffer.h:250
OverwriteError()
Definition: nxbuffer.h:121
yat::uint16 ui16ActiveWriters
Definition: nxbuffer.h:202
NEXUSCPP_DECL int nds_push_data(nxcpp::DataStreamer *ds, const char *data_item_name, yat::uint32 data_type, yat::uint32 num_items_in_data_buffer, void *data_buffer)
OverwriteError(const std::string &desc, const char *pcszOrigin)
Definition: nxbuffer.h:126
Write notification interface.
Definition: nxwriter.h:269
NEXUSCPP_DECL int nds_init(nxcpp::DataStreamer *ds, const char *tp, const char *dp)
yat::String data_source
Definition: nxbuffer.h:226
DataStreamer configuration structure.
Definition: nxbuffer.h:210
const std::string & GetTargetFolder() const
Definition: nxbuffer.h:442
CompressionFilter compress_filter
Compression filer.
Definition: nxbuffer.h:238
NEXUSCPP_DECL int nds_add_data_item_1D(nxcpp::DataStreamer *ds, const char *in, yat::uint32 sd1)
Definition: nxbuffer.h:135