nidas v1.2.3
|
SamplePipeline sorts samples that are coming from one or more inputs. More...
#include <SamplePipeline.h>
Public Member Functions | |
SamplePipeline () | |
virtual | ~SamplePipeline () |
std::string | getName () const |
SampleSource * | getRawSampleSource () |
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples. | |
SampleSource * | getProcessedSampleSource () |
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples. | |
void | connect (SampleSource *src) throw () |
Add an input to be merged and sorted. | |
void | disconnect (SampleSource *src) throw () |
Remove a SampleSource from the SamplePipeline. | |
void | addSampleTag (const SampleTag *tag) throw () |
Add a SampleTag to this SampleSource. | |
void | removeSampleTag (const SampleTag *tag) throw () |
std::list< const SampleTag * > | getSampleTags () const |
What SampleTags am I a SampleSource for? | |
SampleTagIterator | getSampleTagIterator () const |
void | addSampleClient (SampleClient *client) throw () |
Add a SampleClient that wants all processed samples. | |
void | removeSampleClient (SampleClient *client) throw () |
Remove a SampleClient from this SampleSource. | |
void | addSampleClientForTag (SampleClient *, const SampleTag *) throw () |
Add a SampleClient that wants samples which have been merged from various inputs, sorted, processed through a certain DSMSensor, and then re-sorted again. | |
void | removeSampleClientForTag (SampleClient *, const SampleTag *) throw () |
Remove a SampleClient for a given SampleTag from this SampleSource. | |
int | getClientCount () const throw () |
How many SampleClients are currently in my list. | |
void | flush () throw () |
Purge samples from the SampleSorters in this pipeline. | |
void | interrupt () |
Interrupt the SampleSorters in this pipeline. | |
void | join () throw () |
Join the SampleSorters in this pipeline. | |
void | setRealTime (bool val) |
bool | getRealTime () const |
const SampleStats & | getSampleStats () const |
size_t | getSorterNumRawSamples () const |
Number of raw samples currently in the sorter. | |
size_t | getSorterNumProcSamples () const |
Number of processed samples currently in the sorter. | |
size_t | getSorterNumRawBytes () const |
Current size in bytes of the raw sample sorter. | |
size_t | getSorterNumProcBytes () const |
Current size in bytes of the processed sample sorter. | |
size_t | getSorterNumRawBytesMax () const |
Current size in bytes of the raw sample sorter. | |
size_t | getSorterNumProcBytesMax () const |
Current size in bytes of the processed sample sorter. | |
size_t | getNumDiscardedRawSamples () const |
Number of raw samples discarded because sorter was getting too big. | |
size_t | getNumDiscardedProcSamples () const |
Number of processed samples discarded because sorter was getting too big. | |
size_t | getNumFutureRawSamples () const |
Number of raw samples discarded because their timetags were in the future. | |
size_t | getNumFutureProcSamples () const |
Number of processed samples discarded because their timetags were in the future. | |
void | setRawSorterLength (float val) |
Set length of raw SampleSorter, in seconds. | |
float | getRawSorterLength () const |
void | setProcSorterLength (float val) |
Set length of processed SampleSorter, in seconds. | |
float | getProcSorterLength () const |
void | setRawHeapMax (size_t val) |
Set the maximum amount of heap memory to use for sorting samples. | |
size_t | getRawHeapMax () const |
void | setProcHeapMax (size_t val) |
Set the maximum amount of heap memory to use for sorting samples. | |
size_t | getProcHeapMax () const |
void | setHeapBlock (bool val) |
bool | getHeapBlock () const |
void | setKeepStats (bool val) |
void | setRawLateSampleCacheSize (unsigned int val) |
Get the size of the late sample cache in the raw sample sorter. | |
unsigned int | getRawLateSampleCacheSize () const |
Cache this number of samples with potentially anomalous, late time tags in the raw sample sorter. | |
void | setProcLateSampleCacheSize (unsigned int val) |
Get the size of the late sample cache in the processed sample sorter. | |
unsigned int | getProcLateSampleCacheSize () const |
Cache this number of samples with potentially anomalous, late time tags in the processed sample sorter. | |
Private Member Functions | |
void | rawinit () |
void | procinit () |
SamplePipeline (const SamplePipeline &x) | |
No copying. | |
SamplePipeline & | operator= (const SamplePipeline &x) |
No assignment. | |
Private Attributes | |
std::string | _name |
nidas::util::Mutex | _rawMutex |
SampleThread * | _rawSorter |
nidas::util::Mutex | _procMutex |
SampleThread * | _procSorter |
std::list< const SampleTag * > | _sampleTags |
std::list< const DSMConfig * > | _dsmConfigs |
bool | _realTime |
float | _rawSorterLength |
seconds | |
float | _procSorterLength |
seconds | |
size_t | _rawHeapMax |
size_t | _procHeapMax |
bool | _heapBlock |
bool | _keepStats |
unsigned int | _rawLateSampleCacheSize |
unsigned int | _procLateSampleCacheSize |
SamplePipeline sorts samples that are coming from one or more inputs.
SamplePipeline makes use of two SampleSorters, one called rawSorter and procSorter.
rawSorter is a client of one or more inputs (the text arrows show the sample flow):
input -—v input --> _rawSorter input -—^
After sorting the samples, rawSorter passes them on to the two types of SampleClients that have registered with SamplePipeline. SampleClients that have registered with SamplePipeline::addSampleClient will receive their raw samples directly from rawSorter.
rawSorter -> sampleClients
SampleClients that have registered with SamplePipeline::addProcessedSampleClient will receive their samples indirectly:
rawSorter -> sensor -> procSorter -> processedSampleClients
rawSorter provides sorting of the samples from the various inputs.
procSorter provides sorting of the processed samples. DSMSensors are apt to create processed samples with different time-tags than the input raw samples, therefore they need to be sorted again.
Multiple threads can be passing samples to the sorters. Thread exclusion is enforced when passing the samples to the SampleClient::receive() methods from either sorter, so the SampleClient::receive() methods don't have to worry about being re-entrant.
SamplePipeline::SamplePipeline | ( | ) |
|
virtual |
References _procMutex, _procSorter, _rawMutex, _rawSorter, nidas::util::Mutex::lock(), and nidas::util::Mutex::unlock().
|
private |
No copying.
|
virtual |
Add a SampleClient that wants all processed samples.
Implementation of SampleSource::addSampleClient().
Implements nidas::core::SampleSource.
References nidas::core::DSMSensor::addSampleClient(), nidas::core::SampleTag::getDSMSensor(), nidas::core::DSMSensor::getName(), nidas::core::DSMSensor::getRawSampleTag(), nidas::core::getSample(), and VLOG.
|
virtual |
Add a SampleClient that wants samples which have been merged from various inputs, sorted, processed through a certain DSMSensor, and then re-sorted again.
Implementation of SampleSource::addSampleClient().
Implements nidas::core::SampleSource.
References nidas::core::DSMSensor::addSampleClient(), nidas::core::DSMSensor::addSampleClientForTag(), and nidas::core::DSMSensor::getRawSampleTag().
Add a SampleTag to this SampleSource.
This SampleSource does not own the SampleTag.
nidas::util::InvalidParameterException |
Implements nidas::core::SampleSource.
References _procSorter, and nidas::core::SampleSource::addSampleTag().
void SamplePipeline::connect | ( | SampleSource * | src | ) | ||
throw | ( | ) |
Add an input to be merged and sorted.
SamplePipeline does not own the SampleSource.
References GET_DSM_ID, GET_SPS_ID, nidas::core::SampleTag::getDSMSensor(), nidas::core::SampleTag::getId(), nidas::core::DSMSensor::getName(), nidas::core::SampleSource::getRawSampleSource(), nidas::core::getSample(), nidas::core::SampleTag::getSampleId(), nidas::core::DSMSensor::getSampleTagIterator(), and LOG_VERBOSE.
Referenced by nidas::dynld::raf::SyncServer::init(), and nidas::core::DSMEngine::openSensors().
void SamplePipeline::disconnect | ( | SampleSource * | src | ) | ||
throw | ( | ) |
Remove a SampleSource from the SamplePipeline.
References nidas::core::SampleTag::getDSMSensor(), nidas::core::SampleSource::getRawSampleSource(), nidas::core::getSample(), and nidas::core::DSMSensor::getSampleTagIterator().
Referenced by nidas::dynld::raf::SyncServer::stop().
|
inlinevirtual |
Purge samples from the SampleSorters in this pipeline.
This call will block, until both sorters are empty.
Implements nidas::core::SampleSource.
References _procSorter, _rawSorter, and nidas::core::SampleThread::flush().
Referenced by nidas::core::DSMEngine::interrupt(), nidas::dynld::RawSampleService::interrupt(), and nidas::dynld::raf::SyncServer::read().
|
inlinevirtual |
How many SampleClients are currently in my list.
Implements nidas::core::SampleSource.
References _procSorter, and nidas::core::SampleSource::getClientCount().
|
inline |
References _heapBlock.
Referenced by procinit(), and rawinit().
|
inline |
References _name.
Referenced by nidas::dynld::RawSampleService::schedule().
|
inline |
Number of processed samples discarded because sorter was getting too big.
References _procSorter, and nidas::core::SampleThread::getNumDiscardedSamples().
Referenced by nidas::dynld::RawSampleService::printStatus().
|
inline |
Number of raw samples discarded because sorter was getting too big.
References _rawSorter, and nidas::core::SampleThread::getNumDiscardedSamples().
Referenced by nidas::dynld::RawSampleService::printStatus().
|
inline |
Number of processed samples discarded because their timetags were in the future.
References _procSorter, and nidas::core::SampleThread::getNumFutureSamples().
Referenced by nidas::dynld::RawSampleService::printStatus().
|
inline |
Number of raw samples discarded because their timetags were in the future.
References _rawSorter, and nidas::core::SampleThread::getNumFutureSamples().
Referenced by nidas::dynld::RawSampleService::printStatus().
|
inlinevirtual |
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
SampleClients use this method to get a pointer to whatever sample source they are interested in. Derived classes can return NULL if they are not a SampleSource of processed samples.
Implements nidas::core::SampleSource.
References procinit().
Referenced by nidas::core::DSMEngine::closeOutputs(), nidas::core::DSMEngine::connectOutputs(), nidas::dynld::raf::SyncServer::init(), nidas::dynld::RawSampleService::printStatus(), nidas::dynld::RawSampleService::schedule(), and nidas::dynld::raf::SyncServer::stop().
|
inline |
References _procHeapMax.
Referenced by procinit().
Cache this number of samples with potentially anomalous, late time tags in the processed sample sorter.
See SampleSorter::setLateSampleCacheSize(val).
References _procLateSampleCacheSize.
Referenced by procinit().
|
inline |
References _procSorterLength.
Referenced by procinit().
|
inline |
References _rawHeapMax.
Referenced by rawinit().
Cache this number of samples with potentially anomalous, late time tags in the raw sample sorter.
See SampleSorter::setLateSampleCacheSize(val).
References _rawLateSampleCacheSize.
Referenced by rawinit().
|
inlinevirtual |
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
SampleClients use this method to get a pointer to whatever sample source they are interested in. Derived classes can return NULL if they are not a SampleSource of raw samples.
Implements nidas::core::SampleSource.
References _rawSorter, and rawinit().
Referenced by nidas::core::DSMEngine::closeOutputs(), nidas::core::DSMEngine::connectOutputs(), nidas::dynld::RawSampleService::printClock(), nidas::dynld::RawSampleService::printStatus(), and nidas::dynld::RawSampleService::schedule().
|
inline |
References _rawSorterLength.
Referenced by rawinit().
|
inline |
References _realTime.
Referenced by procinit(), and rawinit().
|
inlinevirtual |
Implements nidas::core::SampleSource.
References _procSorter, and nidas::core::SampleSource::getSampleStats().
|
inlinevirtual |
Implements nidas::core::SampleSource.
References _procSorter, and nidas::core::SampleSource::getSampleTagIterator().
What SampleTags am I a SampleSource for?
Implements nidas::core::SampleSource.
References _procSorter, and nidas::core::SampleSource::getSampleTags().
|
inline |
Current size in bytes of the processed sample sorter.
References _procSorter, and nidas::core::SampleThread::getHeapSize().
Referenced by nidas::dynld::RawSampleService::printStatus().
|
inline |
Current size in bytes of the processed sample sorter.
References _procSorter, and nidas::core::SampleThread::getHeapMax().
Referenced by nidas::dynld::RawSampleService::printStatus().
|
inline |
Number of processed samples currently in the sorter.
References _procSorter, and nidas::core::SampleThread::size().
Referenced by nidas::dynld::RawSampleService::printStatus().
|
inline |
Current size in bytes of the raw sample sorter.
References _rawSorter, and nidas::core::SampleThread::getHeapSize().
Referenced by nidas::dynld::RawSampleService::printStatus().
|
inline |
Current size in bytes of the raw sample sorter.
References _rawSorter, and nidas::core::SampleThread::getHeapMax().
Referenced by nidas::dynld::RawSampleService::printStatus().
|
inline |
Number of raw samples currently in the sorter.
References _rawSorter, and nidas::core::SampleThread::size().
Referenced by nidas::dynld::RawSampleService::printStatus().
void SamplePipeline::interrupt | ( | ) |
Interrupt the SampleSorters in this pipeline.
References _procMutex, _procSorter, _rawMutex, _rawSorter, nidas::util::Thread::interrupt(), nidas::util::Mutex::lock(), and nidas::util::Mutex::unlock().
Referenced by nidas::core::DSMEngine::interrupt(), nidas::dynld::raf::SyncServer::interrupt(), nidas::dynld::RawSampleService::interrupt(), and nidas::dynld::raf::SyncServer::stop().
void SamplePipeline::join | ( | ) | ||||
throw | ( | ) |
Join the SampleSorters in this pipeline.
References _procMutex, _procSorter, _rawMutex, _rawSorter, nidas::util::Thread::getName(), nidas::util::Thread::interrupt(), nidas::util::Thread::isRunning(), nidas::util::Thread::join(), nidas::util::Mutex::lock(), nidas::util::Mutex::unlock(), and WLOG.
Referenced by nidas::core::DSMEngine::joinDataThreads(), nidas::dynld::raf::SyncServer::stop(), and nidas::dynld::RawSampleService::~RawSampleService().
|
private |
No assignment.
|
private |
References _keepStats, _name, _procMutex, _procSorter, getHeapBlock(), nidas::core::SampleThread::getHeapMax(), nidas::core::SampleThread::getKeepStats(), nidas::core::SampleThread::getLengthSecs(), getProcHeapMax(), getProcLateSampleCacheSize(), getProcSorterLength(), getRealTime(), nidas::core::SampleThread::getRealTime(), nidas::core::getSample(), nidas::core::SampleThread::setHeapBlock(), nidas::core::SampleThread::setHeapMax(), nidas::core::SampleThread::setKeepStats(), nidas::core::SampleThread::setLateSampleCacheSize(), nidas::core::SampleThread::setLengthSecs(), nidas::core::SampleThread::setRealTime(), nidas::util::Thread::setRealTimeFIFOPriority(), nidas::util::Thread::start(), and VLOG.
Referenced by getProcessedSampleSource().
|
private |
References _keepStats, _name, _rawMutex, _rawSorter, getHeapBlock(), nidas::core::SampleThread::getHeapMax(), nidas::core::SampleThread::getKeepStats(), nidas::core::SampleThread::getLengthSecs(), getRawHeapMax(), getRawLateSampleCacheSize(), getRawSorterLength(), getRealTime(), nidas::core::SampleThread::getRealTime(), nidas::core::getSample(), nidas::core::SampleThread::setHeapBlock(), nidas::core::SampleThread::setHeapMax(), nidas::core::SampleThread::setKeepStats(), nidas::core::SampleThread::setLateSampleCacheSize(), nidas::core::SampleThread::setLengthSecs(), nidas::core::SampleThread::setRealTime(), nidas::util::Thread::setRealTimeFIFOPriority(), nidas::util::Thread::start(), and VLOG.
Referenced by getRawSampleSource().
|
virtual |
Remove a SampleClient from this SampleSource.
)
Implements nidas::core::SampleSource.
References nidas::core::SampleTag::getDSMSensor(), nidas::core::DSMSensor::getRawSampleTag(), nidas::core::getSample(), and nidas::core::DSMSensor::removeSampleClient().
|
virtual |
Remove a SampleClient for a given SampleTag from this SampleSource.
The pointer to the SampleClient must remain valid, until after it is removed.
)
Implements nidas::core::SampleSource.
References nidas::core::DSMSensor::getRawSampleTag(), nidas::core::getSample(), nidas::core::DSMSensor::removeSampleClient(), and nidas::core::DSMSensor::removeSampleClientForTag().
)
Implements nidas::core::SampleSource.
References _procSorter, and nidas::core::SampleSource::removeSampleTag().
val | If true, and heapSize exceeds heapMax, then wait for heapSize to be less then heapMax, which will block any SampleSources that are inserting samples into the raw buffer. If false, then discard any samples that are received while heapSize exceeds heapMax. |
References _heapBlock, and nidas::core::getSample().
Referenced by nidas::dynld::RawSampleService::schedule(), and setRealTime().
References _keepStats, and nidas::core::getSample().
Referenced by nidas::core::DSMEngine::openSensors(), and nidas::dynld::RawSampleService::schedule().
Set the maximum amount of heap memory to use for sorting samples.
val | Maximum size of heap in bytes. |
References _procHeapMax, and nidas::core::getSample().
Referenced by nidas::dynld::raf::SyncServer::init(), nidas::core::DSMEngine::openSensors(), and nidas::dynld::RawSampleService::schedule().
Get the size of the late sample cache in the processed sample sorter.
See SampleSorter::getLateSampleCacheSize(). Default: 0.
References _procLateSampleCacheSize, and nidas::core::getSample().
Referenced by nidas::core::DSMEngine::openSensors(), and nidas::dynld::RawSampleService::schedule().
Set length of processed SampleSorter, in seconds.
References _procSorterLength, and nidas::core::getSample().
Referenced by nidas::dynld::raf::SyncServer::init(), nidas::core::DSMEngine::openSensors(), and nidas::dynld::RawSampleService::schedule().
Set the maximum amount of heap memory to use for sorting samples.
val | Maximum size of heap in bytes. |
References _rawHeapMax, and nidas::core::getSample().
Referenced by nidas::dynld::raf::SyncServer::init(), nidas::core::DSMEngine::openSensors(), and nidas::dynld::RawSampleService::schedule().
Get the size of the late sample cache in the raw sample sorter.
See SampleSorter::getLateSampleCacheSize(). Default: 0.
References _rawLateSampleCacheSize, and nidas::core::getSample().
Referenced by nidas::core::DSMEngine::openSensors(), and nidas::dynld::RawSampleService::schedule().
Set length of raw SampleSorter, in seconds.
References _rawSorterLength, and nidas::core::getSample().
Referenced by nidas::dynld::raf::SyncServer::init(), nidas::core::DSMEngine::openSensors(), and nidas::dynld::RawSampleService::schedule().
References _realTime, nidas::core::getSample(), and setHeapBlock().
Referenced by nidas::dynld::raf::SyncServer::init(), nidas::core::DSMEngine::openSensors(), and nidas::dynld::RawSampleService::schedule().
|
private |
Referenced by getHeapBlock(), and setHeapBlock().
|
private |
Referenced by procinit(), rawinit(), and setKeepStats().
|
private |
Referenced by getName(), procinit(), and rawinit().
|
private |
Referenced by getProcHeapMax(), and setProcHeapMax().
Referenced by getProcLateSampleCacheSize(), and setProcLateSampleCacheSize().
|
private |
Referenced by interrupt(), join(), procinit(), and ~SamplePipeline().
|
private |
Referenced by addSampleTag(), flush(), getClientCount(), getNumDiscardedProcSamples(), getNumFutureProcSamples(), getSampleStats(), getSampleTagIterator(), getSampleTags(), getSorterNumProcBytes(), getSorterNumProcBytesMax(), getSorterNumProcSamples(), interrupt(), join(), procinit(), removeSampleTag(), and ~SamplePipeline().
|
private |
seconds
Referenced by getProcSorterLength(), and setProcSorterLength().
|
private |
Referenced by getRawHeapMax(), and setRawHeapMax().
Referenced by getRawLateSampleCacheSize(), and setRawLateSampleCacheSize().
|
private |
Referenced by interrupt(), join(), rawinit(), and ~SamplePipeline().
|
private |
|
private |
seconds
Referenced by getRawSorterLength(), and setRawSorterLength().
|
private |
Referenced by getRealTime(), and setRealTime().