nidas  v1.2-1520
SamplePipeline.h
Go to the documentation of this file.
1 // -*- mode: C++; indent-tabs-mode: nil; c-basic-offset: 4; tab-width: 4; -*-
2 // vim: set shiftwidth=4 softtabstop=4 expandtab:
3 /*
4  ********************************************************************
5  ** NIDAS: NCAR In-situ Data Acquistion Software
6  **
7  ** 2009, Copyright University Corporation for Atmospheric Research
8  **
9  ** This program is free software; you can redistribute it and/or modify
10  ** it under the terms of the GNU General Public License as published by
11  ** the Free Software Foundation; either version 2 of the License, or
12  ** (at your option) any later version.
13  **
14  ** This program is distributed in the hope that it will be useful,
15  ** but WITHOUT ANY WARRANTY; without even the implied warranty of
16  ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  ** GNU General Public License for more details.
18  **
19  ** The LICENSE.txt file accompanying this software contains
20  ** a copy of the GNU General Public License. If it is not found,
21  ** write to the Free Software Foundation, Inc.,
22  ** 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23  **
24  ********************************************************************
25 */
26 
27 #ifndef NIDAS_CORE_SAMPLEPIPELINE_H
28 #define NIDAS_CORE_SAMPLEPIPELINE_H
29 
30 #include "SampleThread.h"
31 #include "NidsIterators.h"
32 
33 namespace nidas { namespace core {
34 
35 class DSMConfig;
36 class DSMSensor;
37 class SampleTag;
38 
80 {
81 public:
83 
84  virtual ~SamplePipeline();
85 
86  std::string getName() const { return _name; }
87 
89  {
90  rawinit();
91  return _rawSorter;
92  }
93 
95  {
96  procinit();
97  return this;
98  }
99 
104  void connect(SampleSource* src) throw();
105 
109  void disconnect(SampleSource* src) throw();
110 
111  void addSampleTag(const SampleTag* tag) throw()
112  {
113  return _procSorter->addSampleTag(tag);
114  }
115 
116  void removeSampleTag(const SampleTag* tag) throw()
117  {
118  return _procSorter->removeSampleTag(tag);
119  }
120 
121  std::list<const SampleTag*> getSampleTags() const
122  {
123  return _procSorter->getSampleTags();
124  }
125 
127  {
129  }
130 
135  void addSampleClient(SampleClient* client) throw();
136 
137  void removeSampleClient(SampleClient* client) throw();
138 
145  void addSampleClientForTag(SampleClient*,const SampleTag*) throw();
146 
147  void removeSampleClientForTag(SampleClient*,const SampleTag*) throw();
148 
152  int getClientCount() const throw()
153  {
154  return _procSorter->getClientCount();
155  }
156 
161  void flush() throw()
162  {
163  if (_rawSorter) _rawSorter->flush();
164  if (_procSorter) _procSorter->flush();
165  }
166 
170  void interrupt();
171 
175  void join() throw();
176 
177  void setRealTime(bool val)
178  {
179  _realTime = val;
180  // If running in real-time, don't block threads when heap is maxed.
181  setHeapBlock(!val);
182  }
183 
184  bool getRealTime() const
185  {
186  return _realTime;
187  }
188 
190  {
191  return _procSorter->getSampleStats();
192  }
193 
197  size_t getSorterNumRawSamples() const
198  {
199  return _rawSorter->size();
200  }
201 
205  size_t getSorterNumProcSamples() const
206  {
207  return _procSorter->size();
208  }
209 
213  size_t getSorterNumRawBytes() const
214  {
215  return _rawSorter->getHeapSize();
216  }
217 
221  size_t getSorterNumProcBytes() const
222  {
223  return _procSorter->getHeapSize();
224  }
225 
229  size_t getSorterNumRawBytesMax() const
230  {
231  return _rawSorter->getHeapMax();
232  }
233 
238  {
239  return _procSorter->getHeapMax();
240  }
241 
247  {
249  }
250 
256  {
258  }
259 
264  size_t getNumFutureRawSamples() const
265  {
267  }
268 
273  size_t getNumFutureProcSamples() const
274  {
276  }
277 
281  void setRawSorterLength(float val)
282  {
283  _rawSorterLength = val;
284  }
285 
286  float getRawSorterLength() const
287  {
288  return _rawSorterLength;
289  }
290 
294  void setProcSorterLength(float val)
295  {
296  _procSorterLength = val;
297  }
298 
299  float getProcSorterLength() const
300  {
301  return _procSorterLength;
302  }
303 
309  void setRawHeapMax(size_t val) { _rawHeapMax = val; }
310 
311  size_t getRawHeapMax() const { return _rawHeapMax; }
312 
318  void setProcHeapMax(size_t val) { _procHeapMax = val; }
319 
320  size_t getProcHeapMax() const { return _procHeapMax; }
321 
330  void setHeapBlock(bool val) { _heapBlock = val; }
331 
332  bool getHeapBlock() const { return _heapBlock; }
333 
334  void setKeepStats(bool val)
335  {
336  _keepStats = val;
337  }
338 
343  void setRawLateSampleCacheSize(unsigned int val)
344  {
346  }
347 
353  unsigned int getRawLateSampleCacheSize() const
354  {
356  }
357 
362  void setProcLateSampleCacheSize(unsigned int val)
363  {
365  }
366 
372  unsigned int getProcLateSampleCacheSize() const
373  {
375  }
376 
377 private:
378 
379  void rawinit();
380 
381  void procinit();
382 
383  std::string _name;
384 
386 
388 
390 
392 
393  std::list<const SampleTag*> _sampleTags;
394 
395  std::list<const DSMConfig*> _dsmConfigs;
396 
397  bool _realTime;
398 
401 
404 
405  size_t _rawHeapMax;
406 
407  size_t _procHeapMax;
408 
410 
412 
414 
416 
420  SamplePipeline(const SamplePipeline& x);
421 
426 
427 };
428 
429 }} // namespace nidas namespace core
430 
431 #endif
size_t getNumDiscardedProcSamples() const
Number of processed samples discarded because sorter was getting too big.
Definition: SamplePipeline.h:255
virtual const SampleStats & getSampleStats() const =0
virtual size_t size() const =0
Number of samples that have not be distributed.
virtual ~SamplePipeline()
Definition: SamplePipeline.cc:62
size_t getSorterNumProcSamples() const
Number of processed samples currently in the sorter.
Definition: SamplePipeline.h:205
virtual std::list< const SampleTag * > getSampleTags() const =0
What SampleTags am I a SampleSource for?
void addSampleTag(const SampleTag *tag)
Add a SampleTag to this SampleSource.
Definition: SamplePipeline.h:111
bool _keepStats
Definition: SamplePipeline.h:411
virtual int getClientCount() const =0
How many SampleClients are currently in my list.
void disconnect(SampleSource *src)
Remove a SampleSource from the SamplePipeline.
Definition: SamplePipeline.cc:222
SampleTagIterator getSampleTagIterator() const
Definition: SamplePipeline.h:126
Interface for a Thread for buffering samples.
Definition: SampleThread.h:52
std::string _name
Definition: SamplePipeline.h:383
void removeSampleTag(const SampleTag *tag)
Definition: SamplePipeline.h:116
void procinit()
Definition: SamplePipeline.cc:145
void setRawSorterLength(float val)
Set length of raw SampleSorter, in seconds.
Definition: SamplePipeline.h:281
virtual void removeSampleTag(const SampleTag *)=0
SampleSource * getRawSampleSource()
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
Definition: SamplePipeline.h:88
size_t getNumDiscardedRawSamples() const
Number of raw samples discarded because sorter was getting too big.
Definition: SamplePipeline.h:246
Pure virtual interface for a source of Samples.
Definition: SampleSource.h:48
SampleSource * getProcessedSampleSource()
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
Definition: SamplePipeline.h:94
size_t getRawHeapMax() const
Definition: SamplePipeline.h:311
virtual size_t getNumDiscardedSamples() const =0
Number of samples discarded because of _heapSize &gt; _heapMax and heapBlock == true.
void setRawLateSampleCacheSize(unsigned int val)
Get the size of the late sample cache in the raw sample sorter.
Definition: SamplePipeline.h:343
std::list< const SampleTag * > getSampleTags() const
What SampleTags am I a SampleSource for?
Definition: SamplePipeline.h:121
size_t getSorterNumProcBytesMax() const
Current size in bytes of the processed sample sorter.
Definition: SamplePipeline.h:237
size_t getSorterNumProcBytes() const
Current size in bytes of the processed sample sorter.
Definition: SamplePipeline.h:221
std::string getName() const
Definition: SamplePipeline.h:86
virtual size_t getHeapMax() const =0
void interrupt()
Interrupt the SampleSorters in this pipeline.
Definition: SamplePipeline.cc:73
void setHeapBlock(bool val)
Definition: SamplePipeline.h:330
size_t getNumFutureRawSamples() const
Number of raw samples discarded because their timetags were in the future.
Definition: SamplePipeline.h:264
Pure virtual interface of a client of Samples.
Definition: SampleClient.h:38
float getRawSorterLength() const
Definition: SamplePipeline.h:286
bool getRealTime() const
Definition: SamplePipeline.h:184
void join()
Join the SampleSorters in this pipeline.
Definition: SamplePipeline.cc:84
SampleThread * _procSorter
Definition: SamplePipeline.h:391
nidas::util::Mutex _procMutex
Definition: SamplePipeline.h:389
void setProcHeapMax(size_t val)
Set the maximum amount of heap memory to use for sorting samples.
Definition: SamplePipeline.h:318
virtual void flush()=0
Both SampleClient and SampleSource have a flush() method.
const SampleStats & getSampleStats() const
Definition: SamplePipeline.h:189
size_t getSorterNumRawSamples() const
Number of raw samples currently in the sorter.
Definition: SamplePipeline.h:197
size_t _procHeapMax
Definition: SamplePipeline.h:407
unsigned int getRawLateSampleCacheSize() const
Cache this number of samples with potentially anomalous, late time tags in the raw sample sorter...
Definition: SamplePipeline.h:353
SampleThread * _rawSorter
Definition: SamplePipeline.h:387
bool getHeapBlock() const
Definition: SamplePipeline.h:332
float getProcSorterLength() const
Definition: SamplePipeline.h:299
bool _heapBlock
Definition: SamplePipeline.h:409
void flush()
Purge samples from the SampleSorters in this pipeline.
Definition: SamplePipeline.h:161
std::list< const SampleTag * > _sampleTags
Definition: SamplePipeline.h:393
size_t getSorterNumRawBytesMax() const
Current size in bytes of the raw sample sorter.
Definition: SamplePipeline.h:229
virtual size_t getHeapSize() const =0
Get the current amount of heap being used for sorting.
virtual size_t getNumFutureSamples() const =0
Number of samples discarded because their timetags were in the future.
Class for iterating over the SampleTags of a Project, Site, DSMConfig, or a SampleSource.
Definition: NidsIterators.h:217
virtual SampleTagIterator getSampleTagIterator() const =0
void addSampleClient(SampleClient *client)
Add a SampleClient that wants all processed samples.
Definition: SamplePipeline.cc:269
unsigned int _procLateSampleCacheSize
Definition: SamplePipeline.h:415
void removeSampleClient(SampleClient *client)
Remove a SampleClient from this SampleSource.
Definition: SamplePipeline.cc:292
Class describing a group of variables that are sampled and handled together.
Definition: SampleTag.h:87
size_t getNumFutureProcSamples() const
Number of processed samples discarded because their timetags were in the future.
Definition: SamplePipeline.h:273
virtual void addSampleTag(const SampleTag *)=0
Add a SampleTag to this SampleSource.
size_t _rawHeapMax
Definition: SamplePipeline.h:405
A source of samples.
Definition: SampleStats.h:41
void removeSampleClientForTag(SampleClient *, const SampleTag *)
Remove a SampleClient for a given SampleTag from this SampleSource.
Definition: SamplePipeline.cc:341
SamplePipeline & operator=(const SamplePipeline &x)
No assignment.
void rawinit()
Definition: SamplePipeline.cc:117
float _procSorterLength
seconds
Definition: SamplePipeline.h:403
SamplePipeline()
Definition: SamplePipeline.cc:40
nidas::util::Mutex _rawMutex
Definition: SamplePipeline.h:385
SamplePipeline sorts samples that are coming from one or more inputs.
Definition: SamplePipeline.h:79
int getClientCount() const
How many SampleClients are currently in my list.
Definition: SamplePipeline.h:152
void connect(SampleSource *src)
Add an input to be merged and sorted.
Definition: SamplePipeline.cc:173
void setProcLateSampleCacheSize(unsigned int val)
Get the size of the late sample cache in the processed sample sorter.
Definition: SamplePipeline.h:362
std::list< const DSMConfig * > _dsmConfigs
Definition: SamplePipeline.h:395
size_t getSorterNumRawBytes() const
Current size in bytes of the raw sample sorter.
Definition: SamplePipeline.h:213
bool _realTime
Definition: SamplePipeline.h:397
unsigned int getProcLateSampleCacheSize() const
Cache this number of samples with potentially anomalous, late time tags in the processed sample sorte...
Definition: SamplePipeline.h:372
A C++ wrapper for a POSIX mutex.
Definition: ThreadSupport.h:154
void setRealTime(bool val)
Definition: SamplePipeline.h:177
void setProcSorterLength(float val)
Set length of processed SampleSorter, in seconds.
Definition: SamplePipeline.h:294
float _rawSorterLength
seconds
Definition: SamplePipeline.h:400
void setRawHeapMax(size_t val)
Set the maximum amount of heap memory to use for sorting samples.
Definition: SamplePipeline.h:309
void addSampleClientForTag(SampleClient *, const SampleTag *)
Add a SampleClient that wants samples which have been merged from various inputs, sorted...
Definition: SamplePipeline.cc:323
unsigned int _rawLateSampleCacheSize
Definition: SamplePipeline.h:413
size_t getProcHeapMax() const
Definition: SamplePipeline.h:320
void setKeepStats(bool val)
Definition: SamplePipeline.h:334