nidas v1.2.3
SamplePipeline.h
Go to the documentation of this file.
1// -*- mode: C++; indent-tabs-mode: nil; c-basic-offset: 4; tab-width: 4; -*-
2// vim: set shiftwidth=4 softtabstop=4 expandtab:
3/*
4 ********************************************************************
5 ** NIDAS: NCAR In-situ Data Acquistion Software
6 **
7 ** 2009, Copyright University Corporation for Atmospheric Research
8 **
9 ** This program is free software; you can redistribute it and/or modify
10 ** it under the terms of the GNU General Public License as published by
11 ** the Free Software Foundation; either version 2 of the License, or
12 ** (at your option) any later version.
13 **
14 ** This program is distributed in the hope that it will be useful,
15 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
16 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 ** GNU General Public License for more details.
18 **
19 ** The LICENSE.txt file accompanying this software contains
20 ** a copy of the GNU General Public License. If it is not found,
21 ** write to the Free Software Foundation, Inc.,
22 ** 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23 **
24 ********************************************************************
25*/
26
27#ifndef NIDAS_CORE_SAMPLEPIPELINE_H
28#define NIDAS_CORE_SAMPLEPIPELINE_H
29
30#include "SampleThread.h"
31#include "NidsIterators.h"
32
33namespace nidas { namespace core {
34
35class DSMConfig;
36class DSMSensor;
37class SampleTag;
38
80{
81public:
83
84 virtual ~SamplePipeline();
85
86 std::string getName() const { return _name; }
87
89 {
90 rawinit();
91 return _rawSorter;
92 }
93
95 {
96 procinit();
97 return this;
98 }
99
104 void connect(SampleSource* src) throw();
105
109 void disconnect(SampleSource* src) throw();
110
111 void addSampleTag(const SampleTag* tag) throw()
112 {
113 return _procSorter->addSampleTag(tag);
114 }
115
116 void removeSampleTag(const SampleTag* tag) throw()
117 {
118 return _procSorter->removeSampleTag(tag);
119 }
120
121 std::list<const SampleTag*> getSampleTags() const
122 {
123 return _procSorter->getSampleTags();
124 }
125
130
135 void addSampleClient(SampleClient* client) throw();
136
137 void removeSampleClient(SampleClient* client) throw();
138
145 void addSampleClientForTag(SampleClient*,const SampleTag*) throw();
146
147 void removeSampleClientForTag(SampleClient*,const SampleTag*) throw();
148
152 int getClientCount() const throw()
153 {
154 return _procSorter->getClientCount();
155 }
156
161 void flush() throw()
162 {
165 }
166
170 void interrupt();
171
175 void join() throw();
176
177 void setRealTime(bool val)
178 {
179 _realTime = val;
180 // If running in real-time, don't block threads when heap is maxed.
181 setHeapBlock(!val);
182 }
183
184 bool getRealTime() const
185 {
186 return _realTime;
187 }
188
190 {
191 return _procSorter->getSampleStats();
192 }
193
198 {
199 return _rawSorter->size();
200 }
201
206 {
207 return _procSorter->size();
208 }
209
213 size_t getSorterNumRawBytes() const
214 {
215 return _rawSorter->getHeapSize();
216 }
217
222 {
223 return _procSorter->getHeapSize();
224 }
225
230 {
231 return _rawSorter->getHeapMax();
232 }
233
238 {
239 return _procSorter->getHeapMax();
240 }
241
247 {
249 }
250
256 {
258 }
259
265 {
267 }
268
274 {
276 }
277
281 void setRawSorterLength(float val)
282 {
284 }
285
286 float getRawSorterLength() const
287 {
288 return _rawSorterLength;
289 }
290
294 void setProcSorterLength(float val)
295 {
297 }
298
300 {
301 return _procSorterLength;
302 }
303
309 void setRawHeapMax(size_t val) { _rawHeapMax = val; }
310
311 size_t getRawHeapMax() const { return _rawHeapMax; }
312
318 void setProcHeapMax(size_t val) { _procHeapMax = val; }
319
320 size_t getProcHeapMax() const { return _procHeapMax; }
321
330 void setHeapBlock(bool val) { _heapBlock = val; }
331
332 bool getHeapBlock() const { return _heapBlock; }
333
334 void setKeepStats(bool val)
335 {
336 _keepStats = val;
337 }
338
343 void setRawLateSampleCacheSize(unsigned int val)
344 {
346 }
347
353 unsigned int getRawLateSampleCacheSize() const
354 {
356 }
357
362 void setProcLateSampleCacheSize(unsigned int val)
363 {
365 }
366
372 unsigned int getProcLateSampleCacheSize() const
373 {
375 }
376
377private:
378
379 void rawinit();
380
381 void procinit();
382
383 std::string _name;
384
386
388
390
392
393 std::list<const SampleTag*> _sampleTags;
394
395 std::list<const DSMConfig*> _dsmConfigs;
396
398
401
404
406
408
410
412
414
416
421
426
427};
428
429}} // namespace nidas namespace core
430
431#endif
Pure virtual interface of a client of Samples.
Definition SampleClient.h:38
SamplePipeline sorts samples that are coming from one or more inputs.
Definition SamplePipeline.h:80
size_t getRawHeapMax() const
Definition SamplePipeline.h:311
size_t getProcHeapMax() const
Definition SamplePipeline.h:320
size_t getSorterNumRawBytesMax() const
Current size in bytes of the raw sample sorter.
Definition SamplePipeline.h:229
void procinit()
Definition SamplePipeline.cc:140
void addSampleClient(SampleClient *client)
Add a SampleClient that wants all processed samples.
Definition SamplePipeline.cc:264
float _rawSorterLength
seconds
Definition SamplePipeline.h:400
size_t getNumDiscardedProcSamples() const
Number of processed samples discarded because sorter was getting too big.
Definition SamplePipeline.h:255
size_t getNumFutureProcSamples() const
Number of processed samples discarded because their timetags were in the future.
Definition SamplePipeline.h:273
SampleSource * getProcessedSampleSource()
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
Definition SamplePipeline.h:94
void removeSampleClientForTag(SampleClient *, const SampleTag *)
Remove a SampleClient for a given SampleTag from this SampleSource.
Definition SamplePipeline.cc:336
std::list< const DSMConfig * > _dsmConfigs
Definition SamplePipeline.h:395
void addSampleTag(const SampleTag *tag)
Add a SampleTag to this SampleSource.
Definition SamplePipeline.h:111
SamplePipeline & operator=(const SamplePipeline &x)
No assignment.
void disconnect(SampleSource *src)
Remove a SampleSource from the SamplePipeline.
Definition SamplePipeline.cc:217
unsigned int getProcLateSampleCacheSize() const
Cache this number of samples with potentially anomalous, late time tags in the processed sample sorte...
Definition SamplePipeline.h:372
bool getRealTime() const
Definition SamplePipeline.h:184
void interrupt()
Interrupt the SampleSorters in this pipeline.
Definition SamplePipeline.cc:68
void rawinit()
Definition SamplePipeline.cc:112
nidas::util::Mutex _rawMutex
Definition SamplePipeline.h:385
size_t getSorterNumRawSamples() const
Number of raw samples currently in the sorter.
Definition SamplePipeline.h:197
SampleThread * _procSorter
Definition SamplePipeline.h:391
virtual ~SamplePipeline()
Definition SamplePipeline.cc:57
SampleSource * getRawSampleSource()
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
Definition SamplePipeline.h:88
void flush()
Purge samples from the SampleSorters in this pipeline.
Definition SamplePipeline.h:161
size_t getSorterNumProcSamples() const
Number of processed samples currently in the sorter.
Definition SamplePipeline.h:205
SamplePipeline(const SamplePipeline &x)
No copying.
size_t _rawHeapMax
Definition SamplePipeline.h:405
void setRawLateSampleCacheSize(unsigned int val)
Get the size of the late sample cache in the raw sample sorter.
Definition SamplePipeline.h:343
bool _keepStats
Definition SamplePipeline.h:411
unsigned int getRawLateSampleCacheSize() const
Cache this number of samples with potentially anomalous, late time tags in the raw sample sorter.
Definition SamplePipeline.h:353
std::list< const SampleTag * > getSampleTags() const
What SampleTags am I a SampleSource for?
Definition SamplePipeline.h:121
void removeSampleClient(SampleClient *client)
Remove a SampleClient from this SampleSource.
Definition SamplePipeline.cc:287
void setRawSorterLength(float val)
Set length of raw SampleSorter, in seconds.
Definition SamplePipeline.h:281
void setRealTime(bool val)
Definition SamplePipeline.h:177
std::string getName() const
Definition SamplePipeline.h:86
int getClientCount() const
How many SampleClients are currently in my list.
Definition SamplePipeline.h:152
bool _realTime
Definition SamplePipeline.h:397
SamplePipeline()
Definition SamplePipeline.cc:40
void setKeepStats(bool val)
Definition SamplePipeline.h:334
bool _heapBlock
Definition SamplePipeline.h:409
void setProcHeapMax(size_t val)
Set the maximum amount of heap memory to use for sorting samples.
Definition SamplePipeline.h:318
void setHeapBlock(bool val)
Definition SamplePipeline.h:330
SampleThread * _rawSorter
Definition SamplePipeline.h:387
void addSampleClientForTag(SampleClient *, const SampleTag *)
Add a SampleClient that wants samples which have been merged from various inputs, sorted,...
Definition SamplePipeline.cc:318
size_t getSorterNumRawBytes() const
Current size in bytes of the raw sample sorter.
Definition SamplePipeline.h:213
std::list< const SampleTag * > _sampleTags
Definition SamplePipeline.h:393
float getRawSorterLength() const
Definition SamplePipeline.h:286
void setRawHeapMax(size_t val)
Set the maximum amount of heap memory to use for sorting samples.
Definition SamplePipeline.h:309
size_t getNumFutureRawSamples() const
Number of raw samples discarded because their timetags were in the future.
Definition SamplePipeline.h:264
void setProcLateSampleCacheSize(unsigned int val)
Get the size of the late sample cache in the processed sample sorter.
Definition SamplePipeline.h:362
size_t getSorterNumProcBytes() const
Current size in bytes of the processed sample sorter.
Definition SamplePipeline.h:221
size_t getNumDiscardedRawSamples() const
Number of raw samples discarded because sorter was getting too big.
Definition SamplePipeline.h:246
void join()
Join the SampleSorters in this pipeline.
Definition SamplePipeline.cc:79
void connect(SampleSource *src)
Add an input to be merged and sorted.
Definition SamplePipeline.cc:168
void removeSampleTag(const SampleTag *tag)
Definition SamplePipeline.h:116
SampleTagIterator getSampleTagIterator() const
Definition SamplePipeline.h:126
unsigned int _procLateSampleCacheSize
Definition SamplePipeline.h:415
bool getHeapBlock() const
Definition SamplePipeline.h:332
std::string _name
Definition SamplePipeline.h:383
nidas::util::Mutex _procMutex
Definition SamplePipeline.h:389
size_t _procHeapMax
Definition SamplePipeline.h:407
float getProcSorterLength() const
Definition SamplePipeline.h:299
unsigned int _rawLateSampleCacheSize
Definition SamplePipeline.h:413
size_t getSorterNumProcBytesMax() const
Current size in bytes of the processed sample sorter.
Definition SamplePipeline.h:237
float _procSorterLength
seconds
Definition SamplePipeline.h:403
void setProcSorterLength(float val)
Set length of processed SampleSorter, in seconds.
Definition SamplePipeline.h:294
const SampleStats & getSampleStats() const
Definition SamplePipeline.h:189
Pure virtual interface for a source of Samples.
Definition SampleSource.h:48
virtual std::list< const SampleTag * > getSampleTags() const =0
What SampleTags am I a SampleSource for?
virtual void addSampleTag(const SampleTag *)=0
Add a SampleTag to this SampleSource.
virtual int getClientCount() const =0
How many SampleClients are currently in my list.
virtual void removeSampleTag(const SampleTag *)=0
virtual SampleTagIterator getSampleTagIterator() const =0
virtual const SampleStats & getSampleStats() const =0
A source of samples.
Definition SampleStats.h:41
Class for iterating over the SampleTags of a Project, Site, DSMConfig, or a SampleSource.
Definition NidsIterators.h:218
Class describing a group of variables that are sampled and handled together.
Definition SampleTag.h:88
Interface for a Thread for buffering samples.
Definition SampleThread.h:54
virtual size_t getHeapSize() const =0
Get the current amount of heap being used for sorting.
virtual size_t size() const =0
Number of samples that have not be distributed.
virtual size_t getHeapMax() const =0
virtual size_t getNumFutureSamples() const =0
Number of samples discarded because their timetags were in the future.
virtual void flush()=0
Both SampleClient and SampleSource have a flush() method.
virtual size_t getNumDiscardedSamples() const =0
Number of samples discarded because of _heapSize > _heapMax and heapBlock == true.
A C++ wrapper for a POSIX mutex.
Definition ThreadSupport.h:161
Sample * getSample(sampleType type, unsigned int len)
A convienence method for getting a sample of an enumerated type from a pool.
Definition Sample.cc:70
Root namespace for the NCAR In-Situ Data Acquisition Software.
Definition A2DConverter.h:31