nidas
v1.2-1520
|
A SampleClient that sorts its received samples, using an STL multiset, and then sends the sorted samples onto its SampleClients. More...
#include <SampleSorter.h>
Public Types | |
enum | runStatus { RUN_CANCELED = -1, RUN_OK = 0, NOT_RUNNING = 1, RUN_EXCEPTION = 2 } |
Values that can be returned by run method. More... | |
enum | SchedPolicy { NU_THREAD_OTHER =SCHED_OTHER, NU_THREAD_FIFO =SCHED_FIFO, NU_THREAD_RR =SCHED_RR } |
Public Member Functions | |
SampleSorter (const std::string &name, bool raw) | |
Constructor. More... | |
virtual | ~SampleSorter () |
void | setKeepStats (bool val) |
bool | getKeepStats () const |
SampleSource * | getRawSampleSource () |
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples. More... | |
SampleSource * | getProcessedSampleSource () |
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples. More... | |
void | addSampleTag (const SampleTag *tag) throw () |
How to tell this SampleSorter what sample tags it will be sorting. More... | |
void | removeSampleTag (const SampleTag *tag) throw () |
std::list< const SampleTag * > | getSampleTags () const |
Implementation of SampleSource::getSampleTags(). More... | |
SampleTagIterator | getSampleTagIterator () const |
Implementation of SampleSource::getSampleTagIterator(). More... | |
void | addSampleClient (SampleClient *client) throw () |
Implementation of SampleSource::addSampleClient(). More... | |
void | removeSampleClient (SampleClient *client) throw () |
Remove a SampleClient from this SampleSource. More... | |
void | addSampleClientForTag (SampleClient *client, const SampleTag *tag) throw () |
Add a Client for a given SampleTag. More... | |
void | removeSampleClientForTag (SampleClient *client, const SampleTag *tag) throw () |
Remove a SampleClient for a given SampleTag from this SampleSource. More... | |
int | getClientCount () const throw () |
How many SampleClients are currently in my list. More... | |
const SampleStats & | getSampleStats () const |
void | flush () throw () |
Implementation of SampleSource::flush(). More... | |
void | interrupt () |
Interrupt sorting thread. More... | |
bool | receive (const Sample *s) throw () |
Implementation of SampleClient::receive(). More... | |
size_t | size () const |
Current number of samples in the sorter. More... | |
void | setLengthSecs (float val) |
float | getLengthSecs () const |
void | setHeapMax (size_t val) |
Set the maximum amount of heap memory to use for sorting samples. More... | |
size_t | getHeapMax () const |
size_t | getHeapSize () const |
Get the current amount of heap being used for sorting. More... | |
void | setHeapBlock (bool val) |
bool | getHeapBlock () const |
size_t | getNumDiscardedSamples () const |
Number of samples discarded because of _heapSize > _heapMax and heapBlock == true. More... | |
size_t | getNumFutureSamples () const |
Number of samples discarded because their timetags were in the future. More... | |
size_t | getNumEarlySamples () const |
Number of early samples, which may not be sorted. More... | |
void | setRealTime (bool val) |
Is this sorter running in real-time? If so then we can screen for bad time-tags by checking against the system clock, which is trusted. More... | |
bool | getRealTime () const |
void | setLateSampleCacheSize (unsigned int val) |
When aging-off samples, cache this number of samples with the latest time tags. More... | |
unsigned int | getLateSampleCacheSize () const |
virtual void | start () throw (Exception) |
Start the thread running, meaning execute the run method in a separate thread. More... | |
virtual int | join () throw (Exception) |
The calling thread joins this thread, waiting until the thread finishes, which means either that the run() method returned, the thread called pthread_exit() or the thread was cancelled. More... | |
virtual void | kill (int sig) throw (Exception) |
Send a signal to this thread. More... | |
virtual void | cancel () throw (Exception) |
Cancel this thread. More... | |
virtual bool | isInterrupted () const |
Return true when this thread has been interrupted. More... | |
virtual bool | isRunning () const |
Is this thread running? More... | |
virtual bool | isJoined () const |
Has this thread been joined? More... | |
virtual bool | isDetached () const |
Is this a detached thread. More... | |
bool | isCancelEnabled () const |
Return true if the cancel state of this thread is PTHREAD_CANCEL_ENABLE. More... | |
bool | isCancelDeferred () const |
Return true if the cancel type of this thread is PTHREAD_CANCEL_DEFERRED. More... | |
const std::string & | getName () const throw () |
Return the name of this thread. More... | |
const std::string & | getFullName () throw () |
Return a name with a bunch of descriptive fields, specifying whether it is detached, the real-time priority, etc. More... | |
bool | setRealTimeRoundRobinPriority (int val) throw (Exception) |
bool | setRealTimeFIFOPriority (int val) throw (Exception) |
bool | setNonRealTimePriority () throw (Exception) |
void | setThreadScheduler (enum SchedPolicy policy, int priority) throw (Exception) |
void | blockSignal (int) |
Block a signal in this thread. More... | |
void | unblockSignal (int) |
Install a signal handler and unblock the signal. More... | |
Static Public Member Functions | |
static Thread * | currentThread () |
Return the thread object for the current thread. More... | |
static pthread_t | currentThreadId () |
static Thread * | lookupThread (pthread_t id) |
static const std::string & | currentName () |
Convenience routine to return the name for the current thread, or a string indicating that the name of the thread is unknown. More... | |
static std::string | getPolicyString (int policy) |
Convenience function to return a string for the given scheduler policy: "Non-RT", "RT:FIFO", "RT:RR" or "RT:Unknown". More... | |
static int | test (int argc, char **argv) |
a test method. More... | |
Protected Member Functions | |
void | setCancelEnabled (bool val) |
Set the cancel state for this thread - false means cancel requests are ignored. More... | |
void | setCancelDeferred (bool val) |
Set the cancel type for this thread. More... | |
pthread_t | getId () const |
void | testCancel () const |
Check if we have been cancelled. More... | |
virtual bool | amInterrupted () const |
Call testCancel, and return true when this thread has been interrupted. More... | |
Private Member Functions | |
int | run () throw (nidas::util::Exception) |
Thread run function. More... | |
void | heapDecrement (size_t bytes) |
Utility function to decrement the heap size after writing one or more samples. More... | |
SampleSorter (const SampleSorter &) | |
No copy. More... | |
SampleSorter & | operator= (const SampleSorter &) |
No assignment. More... | |
Private Attributes | |
SampleSourceSupport | _source |
unsigned int | _sorterLengthUsec |
Length of SampleSorter, in micro-seconds. More... | |
SortedSampleSet | _samples |
nidas::util::Cond | _sampleSetCond |
nidas::util::Cond | _flushCond |
size_t | _heapMax |
Limit on the maximum size of memory to use while buffering samples. More... | |
size_t | _heapSize |
Current heap size, in bytes. More... | |
bool | _heapBlock |
_heapBlock controls what happens when the number of bytes in _samples exceeds _heapMax. More... | |
nidas::util::Cond | _heapCond |
bool | _heapExceeded |
unsigned int | _discardedSamples |
Number of samples discarded because of _heapSize > _heapMax and heapBlock == true. More... | |
unsigned int | _realTimeFutureSamples |
Number of samples discarded because getRealTime() is true and the samples have timetags later than the system clock. More... | |
unsigned int | _earlySamples |
Samples which are earlier than the current latest sample in the sorter minus the sorter length. More... | |
int | _discardWarningCount |
How often to log warnings about discardedSamples. More... | |
int | _earlyWarningCount |
How often to log warnings about early samples. More... | |
bool | _doFlush |
bool | _flushed |
SampleT< char > | _dummy |
bool | _realTime |
Is this sorter running in real-time? If so then we can screen for bad time-tags by checking against the system clock, which is trusted. More... | |
long long | _maxSorterLengthUsec |
unsigned int | _lateSampleCacheSize |
A SampleClient that sorts its received samples, using an STL multiset, and then sends the sorted samples onto its SampleClients.
The time period of the sorting is specified with setLengthSecs(). Samples whose time-tags are previous to the time-tag of the latest sample received minus the sorter length, are sent on to the SampleClients. This is implemented as a Thread, which must be started, otherwise the sorter will grow and no samples will be sent to clients. This can be a client of multiple SampleSources, so that the distributed samples are sorted in time.
|
inherited |
Values that can be returned by run method.
User can define other values greater than RUN_EXCEPTION. These values are then returned by int join()
. Note that PTHREAD_CANCELLED is -1
Enumerator | |
---|---|
RUN_CANCELED | |
RUN_OK | |
NOT_RUNNING | |
RUN_EXCEPTION |
|
inherited |
SampleSorter::SampleSorter | ( | const std::string & | name, |
bool | raw | ||
) |
Constructor.
@ param raw: boolean indicating whether this SampleSorter is for raw or processed samples. Clients can query this value, and it controls what is returned by getRawSampleSource() and getProcessedSampleSource().
References _discardWarningCount, _earlyWarningCount, nidas::util::LogScheme::getParameterT(), and nidas::util::Logger::getScheme().
|
virtual |
References _discardedSamples, _earlySamples, _maxSorterLengthUsec, _samples, _sorterLengthUsec, DLOG, nidas::util::Thread::getName(), ILOG, interrupt(), nidas::util::Thread::isJoined(), nidas::util::Thread::isRunning(), nidas::util::Thread::join(), USECS_PER_SEC, VLOG, nidas::util::Exception::what(), and WLOG.
|
private |
No copy.
|
inlinevirtual |
Implementation of SampleSource::addSampleClient().
Implements nidas::core::SampleSource.
References _source, and nidas::core::SampleSourceSupport::addSampleClient().
|
inlinevirtual |
Add a Client for a given SampleTag.
Implementation of SampleSource::addSampleClient().
Implements nidas::core::SampleSource.
References _source, and nidas::core::SampleSourceSupport::addSampleClientForTag().
|
inlinevirtual |
How to tell this SampleSorter what sample tags it will be sorting.
SampleClients can then query it.
Implements nidas::core::SampleSource.
References _source, and nidas::core::SampleSourceSupport::addSampleTag().
|
protectedvirtualinherited |
Call testCancel, and return true when this thread has been interrupted.
This is protected because it should only called within the run method of the thread.
Referenced by nidas::core::SampleOutputRequestThread::run(), nidas::core::Looper::run(), nidas::dynld::isff::GOESOutput::run(), nidas::core::DSMServerStat::run(), and nidas::util::Thread::test().
|
inherited |
Block a signal in this thread.
This method is usually called before this Thread has started. If this Thread is currently running, then this method is only effective if called from this Thread, i.e. from its own run() method.
Because SIGINT, SIGTERM and SIGHUP are typically caught in the main thread, they are blocked by default in a Thread. Call unblockSignal(sig) if you want to catch them in a Thread.
References nidas::util::Thread::_blockedSignals, nidas::util::Thread::_unblockedSignals, nidas::util::Thread::currentThread(), nidas::util::Thread::isRunning(), and nidas::util::Thread::thr_add_sig().
Referenced by nidas::dynld::UDPSampleOutput::ConnectionMonitor::ConnectionMonitor(), nidas::core::ServerSocket::ConnectionThread::ConnectionThread(), nidas::core::DerivedDataReader::DerivedDataReader(), nidas::util::McSocketListener::McSocketListener(), nidas::util::McSocketMulticaster< SocketTT >::McSocketMulticaster(), nidas::core::SensorHandler::SensorHandler(), nidas::core::SensorOpener::SensorOpener(), nidas::util::Thread::test(), nidas::dynld::RawSampleService::Worker::Worker(), nidas::core::XmlRpcThread::XmlRpcThread(), and nidas::dynld::UDPSampleOutput::XMLSocketListener::XMLSocketListener().
|
virtualinherited |
Cancel this thread.
References nidas::util::Thread::_id, nidas::util::Thread::_mutex, nidas::util::Exception::errnoToString(), and nidas::util::Thread::getName().
Referenced by nidas::core::DSMService::cancel(), nidas::dynld::isff::GOESOutput::cancelThread(), nidas::core::DerivedDataReader::deleteInstance(), and nidas::core::DSMEngine::joinDataThreads().
|
inlinestaticinherited |
Convenience routine to return the name for the current thread, or a string indicating that the name of the thread is unknown.
References nidas::util::Thread::currentThread(), nidas::util::Thread::getName(), and nidas::util::Thread::unknownName.
Referenced by nidas::util::Logger::msg_locked().
|
staticinherited |
Return the thread object for the current thread.
Returns null if not found.
Referenced by nidas::util::Thread::blockSignal(), nidas::util::Thread::currentName(), and nidas::util::Thread::unblockSignal().
|
staticinherited |
Referenced by nidas::core::Looper::removeClient().
|
virtual |
Implementation of SampleSource::flush().
distribute all buffered samples to SampleClients.
flush all samples from buffer, distributing them to SampleClients. The first caller will block until the buffer is empty.
Implements nidas::core::SampleThread.
References _doFlush, _flushCond, _flushed, _samples, _sampleSetCond, _source, DLOG, nidas::core::SampleSourceSupport::getRawSampleSource(), nidas::util::Thread::isInterrupted(), nidas::util::Cond::lock(), nidas::util::Cond::signal(), nidas::util::Cond::unlock(), nidas::util::Cond::wait(), and WLOG.
|
inlinevirtual |
How many SampleClients are currently in my list.
Implements nidas::core::SampleSource.
References _source, and nidas::core::SampleSourceSupport::getClientCount().
|
inherited |
Return a name with a bunch of descriptive fields, specifying whether it is detached, the real-time priority, etc.
References nidas::util::Thread::_fullname.
Referenced by nidas::util::Thread::pRun(), nidas::util::Thread::test(), nidas::util::Thread::thr_cleanup(), and nidas::util::Thread::thr_cleanup_delete().
|
inlinevirtual |
Implements nidas::core::SampleThread.
References _heapBlock.
|
inlinevirtual |
Implements nidas::core::SampleThread.
References _heapMax.
|
inlinevirtual |
Get the current amount of heap being used for sorting.
Implements nidas::core::SampleThread.
References _heapSize.
|
inlineprotectedinherited |
References nidas::util::Thread::_id, and nidas::util::Thread::_mutex.
Referenced by nidas::util::Thread::isJoined(), nidas::util::Thread::join(), nidas::util::Thread::pRun(), and nidas::core::Looper::removeClient().
|
inlinevirtual |
Implements nidas::core::SampleThread.
References _source, and nidas::core::SampleSourceSupport::getKeepStats().
|
inlinevirtual |
Implements nidas::core::SampleThread.
References _lateSampleCacheSize.
|
inlinevirtual |
Implements nidas::core::SampleThread.
References _sorterLengthUsec, and USECS_PER_SEC.
|
inherited |
Return the name of this thread.
References nidas::util::Thread::_name.
Referenced by nidas::core::DSMService::cancel(), nidas::util::Thread::cancel(), nidas::core::DSMService::checkSubThreads(), nidas::core::SensorHandler::PolledDSMSensor::checkTimeout(), nidas::util::Thread::currentName(), nidas::core::SensorHandler::PolledDSMSensor::handlePollEvents(), heapDecrement(), nidas::core::DSMService::interrupt(), nidas::core::DSMService::join(), nidas::util::Thread::join(), nidas::core::SamplePipeline::join(), nidas::core::FsMountWorkerThread::run(), run(), nidas::util::ThreadJoiner::run(), nidas::util::Thread::sigAction(), nidas::util::Thread::start(), nidas::util::Thread::test(), nidas::util::LogContext::threadName(), ~SampleSorter(), and nidas::util::Thread::~Thread().
|
inlinevirtual |
Number of samples discarded because of _heapSize > _heapMax and heapBlock == true.
Implements nidas::core::SampleThread.
References _discardedSamples.
|
inline |
Number of early samples, which may not be sorted.
References _earlySamples.
|
inlinevirtual |
Number of samples discarded because their timetags were in the future.
Implements nidas::core::SampleThread.
References _realTimeFutureSamples.
|
staticinherited |
Convenience function to return a string for the given scheduler policy: "Non-RT", "RT:FIFO", "RT:RR" or "RT:Unknown".
References nidas::util::Thread::NU_THREAD_FIFO, nidas::util::Thread::NU_THREAD_OTHER, and nidas::util::Thread::NU_THREAD_RR.
Referenced by nidas::core::DSMSensor::getLooper(), and nidas::util::Thread::makeFullName().
|
inlinevirtual |
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
SampleClients use this method to get a pointer to whatever sample source they are interested in. Derived classes can return NULL if they are not a SampleSource of processed samples.
Implements nidas::core::SampleSource.
References _source, and nidas::core::SampleSourceSupport::getProcessedSampleSource().
|
inlinevirtual |
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
SampleClients use this method to get a pointer to whatever sample source they are interested in. Derived classes can return NULL if they are not a SampleSource of raw samples.
Implements nidas::core::SampleSource.
References _source, and nidas::core::SampleSourceSupport::getRawSampleSource().
|
inlinevirtual |
Implements nidas::core::SampleThread.
References _realTime.
|
inlinevirtual |
Implements nidas::core::SampleSource.
References _source, and nidas::core::SampleSourceSupport::getSampleStats().
|
inlinevirtual |
Implementation of SampleSource::getSampleTagIterator().
Implements nidas::core::SampleSource.
References _source, and nidas::core::SampleSourceSupport::getSampleTagIterator().
|
inlinevirtual |
Implementation of SampleSource::getSampleTags().
Implements nidas::core::SampleSource.
References _source, and nidas::core::SampleSourceSupport::getSampleTags().
|
private |
Utility function to decrement the heap size after writing one or more samples.
If the heapSize has has shrunk below heapMax then signal any threads waiting on heapCond.
References _heapBlock, _heapCond, _heapExceeded, _heapMax, _heapSize, DLOG, nidas::util::Thread::getName(), nidas::util::Cond::lock(), nidas::util::Cond::signal(), and nidas::util::Cond::unlock().
Referenced by run().
|
virtual |
Interrupt sorting thread.
Reimplemented from nidas::util::Thread.
References _flushCond, _heapCond, _heapExceeded, _heapMax, _heapSize, _sampleSetCond, nidas::util::Thread::interrupt(), nidas::util::Cond::lock(), nidas::util::Cond::signal(), and nidas::util::Cond::unlock().
Referenced by ~SampleSorter().
|
inherited |
Return true if the cancel type of this thread is PTHREAD_CANCEL_DEFERRED.
References nidas::util::Thread::_cancel_deferred.
|
inherited |
Return true if the cancel state of this thread is PTHREAD_CANCEL_ENABLE.
References nidas::util::Thread::_cancel_enabled.
|
inlinevirtualinherited |
Is this a detached thread.
References nidas::util::Thread::_detached.
|
inlinevirtualinherited |
Return true when this thread has been interrupted.
Implements nidas::util::Runnable.
References nidas::util::Thread::_interrupted, and nidas::util::Thread::_mutex.
Referenced by flush(), nidas::core::SampleOutputRequestThread::run(), nidas::core::StatusListener::run(), nidas::core::DerivedDataReader::run(), nidas::core::SensorOpener::run(), Sender::run(), nidas::core::DSMEngineStat::run(), nidas::core::SensorHandler::run(), nidas::core::FsMountWorkerThread::run(), ModemLineSetter::run(), ModemLineMonitor::run(), nidas::core::SampleBuffer::run(), run(), ServerThread::run(), nidas::util::McSocketListener::run(), and nidas::dynld::raf::SyncServer::stop().
|
inlinevirtualinherited |
Has this thread been joined?
References nidas::util::Thread::getId().
Referenced by nidas::core::FsMount::cancel(), nidas::core::DerivedDataReader::deleteInstance(), nidas::core::SensorHandler::join(), nidas::dynld::isff::GOESOutput::joinThread(), nidas::util::Thread::test(), nidas::core::SampleBuffer::~SampleBuffer(), ~SampleSorter(), and nidas::util::Thread::~Thread().
|
inlinevirtualinherited |
Is this thread running?
References nidas::util::Thread::_mutex, and nidas::util::Thread::_running.
Referenced by nidas::util::Thread::blockSignal(), nidas::core::DSMService::cancel(), nidas::core::FsMount::cancel(), nidas::dynld::isff::GOESOutput::cancelThread(), nidas::core::DSMService::checkSubThreads(), nidas::dynld::UDPSampleOutput::XMLSocketListener::checkWorkers(), nidas::dynld::UDPSampleOutput::close(), nidas::util::McSocket< SocketT >::close(), nidas::core::DerivedDataReader::deleteInstance(), nidas::core::DSMService::interrupt(), nidas::core::SamplePipeline::join(), nidas::dynld::isff::GOESOutput::joinThread(), nidas::core::DSMServerApp::killStatusThread(), nidas::dynld::isff::GOESOutput::killThread(), nidas::core::Looper::removeClient(), nidas::core::SensorHandler::run(), nidas::core::Looper::setupClientMaps(), nidas::util::Thread::test(), nidas::util::Thread::unblockSignal(), nidas::core::SampleBuffer::~SampleBuffer(), ~SampleSorter(), nidas::util::Thread::~Thread(), and nidas::dynld::UDPSampleOutput::~UDPSampleOutput().
|
virtualinherited |
The calling thread joins this thread, waiting until the thread finishes, which means either that the run() method returned, the thread called pthread_exit() or the thread was cancelled.
The return value is the int return value of the run method, or PTHREAD_CANCELED (-1). If the run method threw an Exception, it will be caught and then re-thrown by join.
Reimplemented in nidas::core::SensorHandler.
References nidas::util::Thread::_exception, nidas::util::Thread::_id, nidas::util::Thread::_mutex, nidas::util::Exception::errnoToString(), nidas::util::Thread::getId(), nidas::util::Thread::getName(), nidas::util::Thread::RUN_EXCEPTION, and nidas::util::Thread::RUN_OK.
Referenced by nidas::core::DSMService::checkSubThreads(), nidas::dynld::UDPSampleOutput::XMLSocketListener::checkWorkers(), nidas::dynld::UDPSampleOutput::close(), nidas::util::McSocketListener::close(), nidas::core::DerivedDataReader::deleteInstance(), nidas::dynld::UDPSampleOutput::XMLSocketListener::fireWorkers(), nidas::core::DSMService::join(), nidas::core::SensorHandler::join(), nidas::core::SamplePipeline::join(), nidas::core::DSMEngine::joinDataThreads(), nidas::dynld::isff::GOESOutput::joinThread(), nidas::core::DSMServerApp::killStatusThread(), nidas::core::DSMServerApp::killXmlRpcThread(), nidas::core::DSMEngine::killXmlRpcThread(), main(), nidas::core::Looper::removeClient(), nidas::util::ThreadJoiner::run(), nidas::core::SampleBuffer::~SampleBuffer(), ~SampleSorter(), nidas::core::ServerSocket::~ServerSocket(), and nidas::dynld::UDPSampleOutput::~UDPSampleOutput().
|
virtualinherited |
Send a signal to this thread.
References nidas::util::Exception::errnoToString().
Referenced by nidas::core::FsMount::cancel(), nidas::core::XmlRpcThread::interrupt(), nidas::core::SensorOpener::interrupt(), nidas::core::DerivedDataReader::interrupt(), nidas::core::SensorHandler::interrupt(), nidas::util::McSocketListener::interrupt(), nidas::core::DSMServerApp::killStatusThread(), nidas::dynld::isff::GOESOutput::killThread(), main(), nidas::core::SensorHandler::scheduleAdd(), nidas::core::SensorHandler::scheduleClose(), nidas::core::SensorHandler::scheduleReopen(), and nidas::core::SensorHandler::sensorIsOpen().
|
staticinherited |
Referenced by nidas::util::LogContext::threadName().
|
private |
No assignment.
|
virtual |
Implementation of SampleClient::receive().
Implements nidas::core::SampleClient.
References DLOG, nidas::core::SampleTracer::format_time(), GET_DSM_ID, GET_SPS_ID, nidas::util::getSystemTime(), USECS_PER_SEC, and WLOG.
|
inlinevirtual |
Remove a SampleClient from this SampleSource.
Implements nidas::core::SampleSource.
References _source, and nidas::core::SampleSourceSupport::removeSampleClient().
|
inlinevirtual |
Remove a SampleClient for a given SampleTag from this SampleSource.
The pointer to the SampleClient must remain valid, until after it is removed.
Implements nidas::core::SampleSource.
References _source, and nidas::core::SampleSourceSupport::removeSampleClientForTag().
|
inlinevirtual |
Implements nidas::core::SampleSource.
References _source, and nidas::core::SampleSourceSupport::removeSampleTag().
|
privatevirtual |
Thread run function.
Thread function.
Implements nidas::util::Runnable.
References _doFlush, _dummy, _flushCond, _flushed, _heapBlock, _heapCond, _heapExceeded, _heapMax, _lateSampleCacheSize, _maxSorterLengthUsec, _samples, _sampleSetCond, _sorterLengthUsec, _source, nidas::core::SampleTracer::active(), nidas::util::LogContext::active(), nidas::util::Cond::broadcast(), nidas::core::SampleSourceSupport::distribute(), nidas::util::endlog(), nidas::core::SampleTracer::format_time(), nidas::core::Sample::freeReference(), GET_DSM_ID, GET_SHORT_ID, nidas::core::Sample::getDataByteLength(), nidas::core::Sample::getHeaderLength(), nidas::core::Sample::getId(), nidas::util::Thread::getName(), nidas::core::SampleSourceSupport::getRawSampleSource(), nidas::core::Sample::getTimeTag(), heapDecrement(), ILOG, nidas::util::Thread::isInterrupted(), nidas::util::Cond::lock(), LOG_VERBOSE, nidas::core::SampleTracer::msg(), NLOG, nidas::util::Thread::RUN_OK, nidas::core::Sample::setTimeTag(), nidas::util::Cond::signal(), size(), nidas::util::Cond::unlock(), USECS_PER_MSEC, USECS_PER_SEC, nidas::util::Cond::wait(), and WLOG.
|
protectedinherited |
Set the cancel type for this thread.
true means cancel requests are deferred until the next cancellation point. false means they occur instantly. This is protected, it should be called only from a thread's own run method. See the pthreads(7) man page for a list of the cancellation points.
Note: non-deferred canceling is difficult to get right. It has not been tested with this class, and is not recommended.
References nidas::util::Thread::_cancel_deferred.
|
protectedinherited |
Set the cancel state for this thread - false means cancel requests are ignored.
See pthread_setcancelstate. This is protected, it should be called only from a thread's own run method.
References nidas::util::Thread::_cancel_enabled.
|
inlinevirtual |
val | If true, and heapSize exceeds heapMax, then wait for heapSize to be less then heapMax, which will block any SampleSources that are inserting samples into this sorter. If false, then discard any samples that are received while heapSize exceeds heapMax. |
Implements nidas::core::SampleThread.
References _heapBlock.
|
inlinevirtual |
Set the maximum amount of heap memory to use for sorting samples.
val | Maximum size of heap in bytes. |
Implements nidas::core::SampleThread.
References _heapMax.
|
inlinevirtual |
Implements nidas::core::SampleThread.
References _source, and nidas::core::SampleSourceSupport::setKeepStats().
|
inlinevirtual |
When aging-off samples, cache this number of samples with the latest time tags.
This number, or fewer, of samples with anomalous, late, time tags will not effect the sorting. If the late sample cache size is zero, a jump ahead in time greater than the length of the sorter in seconds will cause the sorter to be emptied and the sorting effectively disabled until samples within the sorter length of the bad sample are encountered.
Implements nidas::core::SampleThread.
References _lateSampleCacheSize.
|
inlinevirtual |
Implements nidas::core::SampleThread.
References _sorterLengthUsec, and USECS_PER_SEC.
|
inherited |
|
inlinevirtual |
Is this sorter running in real-time? If so then we can screen for bad time-tags by checking against the system clock, which is trusted.
Implements nidas::core::SampleThread.
References _realTime.
|
inherited |
|
inherited |
|
inherited |
|
inlinevirtual |
Current number of samples in the sorter.
This method does not hold a lock to force exclusive access to the sample container. Therefore this is only an instantaneous check and shouldn't be used by methods in this class when exclusive access is required.
Implements nidas::core::SampleThread.
References _samples.
Referenced by run().
|
virtualinherited |
Start the thread running, meaning execute the run method in a separate thread.
References nidas::util::Thread::_id, nidas::util::Thread::_interrupted, nidas::util::Thread::_mutex, nidas::util::Thread::_running, nidas::util::Thread::_thread_attr, nidas::util::Exception::errnoToString(), nidas::util::Thread::getName(), nidas::util::Thread::makeFullName(), nidas::util::Thread::NU_THREAD_FIFO, nidas::util::Thread::NU_THREAD_OTHER, nidas::util::Thread::NU_THREAD_RR, nidas::util::Thread::registerThread(), nidas::util::Thread::setThreadSchedulerNolock(), nidas::util::Thread::thr_run(), nidas::util::Thread::thr_run_detached(), and WLOG.
Referenced by nidas::util::McSocketListener::accept(), nidas::dynld::RawSampleService::connect(), nidas::core::DerivedDataReader::createInstance(), nidas::util::McSocket< SocketT >::joinMulticaster(), main(), nidas::core::DSMEngine::openSensors(), nidas::core::SamplePipeline::procinit(), nidas::core::SamplePipeline::rawinit(), nidas::util::McSocket< SocketT >::request(), nidas::core::DSMServerApp::run(), nidas::core::DSMEngine::run(), PSI::run(), nidas::core::SensorHandler::run(), nidas::core::FsMountWorkerThread::run(), nidas::dynld::UDPSampleOutput::XMLSocketListener::run(), nidas::core::Socket::ConnectionThread::run(), ServerThread::run(), nidas::core::Looper::setupClientMaps(), nidas::core::DSMServerApp::startXmlRpcThread(), and nidas::core::DSMEngine::startXmlRpcThread().
|
staticinherited |
a test method.
References nidas::util::Runnable::amInterrupted(), nidas::util::Thread::blockSignal(), nidas::util::Thread::getFullName(), nidas::util::Thread::getName(), nidas::util::Thread::isJoined(), nidas::util::Thread::isRunning(), nidas::util::Runnable::run(), nidas::util::Thread::Thread(), and nidas::util::Exception::what().
|
inlineprotectedinherited |
Check if we have been cancelled.
Calls pthread_testcancel. This is protected since it only checks the current thread - i.e. it must be called within the run method. Since it just calls pthread_testcancel, it is a cancellation point.
|
inherited |
Install a signal handler and unblock the signal.
The signal handler will log a message about the receipt of the signal at severity LOG_INFO using the nidas::util::Logger. Then, if the signal handler is being invoked from a registered Thread, the virtual method signalHandler() for that Thread will be called.
The signal handler is installed with the sigaction() system call, and will be the action for the given signal in all threads, including the main() thread. If other threads do not wish to take action on a given signal, they should call blockSignal(sig). Or they can define their own signalHandler() method.
After installing the signal handler, the signal is added to those that are unblocked for the thread, or if the Thread is not yet running, the signal will be unblocked in the thread once it runs.
As with blockSignal(), this method is typically called on this Thread before it has started. If this Thread has started, then the signal will only be unblocked if the method is called from this Thread, i.e. from its own run() method.
To install a signal handler, and then block the signal so that it is held as pending until it is later unblocked, typically with pselect(), or sigwaitinfo(), do:
References nidas::util::Thread::_blockedSignals, nidas::util::Thread::_unblockedSignals, nidas::util::Thread::currentThread(), nidas::util::Thread::isRunning(), and nidas::util::Thread::thr_add_sig().
Referenced by nidas::core::DerivedDataReader::DerivedDataReader(), nidas::core::FsMountWorkerThread::FsMountWorkerThread(), nidas::core::StatusListener::StatusListener(), nidas::core::StatusThread::StatusThread(), nidas::dynld::XMLConfigService::Worker::Worker(), nidas::core::XmlRpcThread::XmlRpcThread(), and nidas::dynld::UDPSampleOutput::XMLSocketListener::XMLSocketListener().
|
private |
Number of samples discarded because of _heapSize > _heapMax and heapBlock == true.
Referenced by getNumDiscardedSamples(), and ~SampleSorter().
|
private |
How often to log warnings about discardedSamples.
Referenced by SampleSorter().
|
private |
Samples which are earlier than the current latest sample in the sorter minus the sorter length.
These samples have arrived too late to be necessarily sorted. They may or may not be sorted, depending on how often the sorting buffer is read.
Referenced by getNumEarlySamples(), and ~SampleSorter().
|
private |
How often to log warnings about early samples.
Referenced by SampleSorter().
|
private |
Referenced by flush(), interrupt(), and run().
|
private |
_heapBlock controls what happens when the number of bytes in _samples exceeds _heapMax.
If _heapBlock is true and _heapSize exceeds _heapMax, then any threads which are calling SampleSorter::receive to insert a sample to this sorter will block on _heapCond until sample consumers have reduced _heapSize to less than _heapMax. If _heapBlock is false and _heapSize exceeds _heapMax then samples are discarded until _heapSize is less than _heapMax.
Referenced by getHeapBlock(), heapDecrement(), run(), and setHeapBlock().
|
private |
Referenced by heapDecrement(), interrupt(), and run().
|
private |
Referenced by heapDecrement(), interrupt(), and run().
|
private |
Limit on the maximum size of memory to use while buffering samples.
Referenced by getHeapMax(), heapDecrement(), interrupt(), run(), and setHeapMax().
|
private |
Current heap size, in bytes.
Referenced by getHeapSize(), heapDecrement(), and interrupt().
|
private |
Referenced by getLateSampleCacheSize(), run(), and setLateSampleCacheSize().
|
private |
Referenced by run(), and ~SampleSorter().
|
private |
Is this sorter running in real-time? If so then we can screen for bad time-tags by checking against the system clock, which is trusted.
Referenced by getRealTime(), and setRealTime().
|
private |
Number of samples discarded because getRealTime() is true and the samples have timetags later than the system clock.
Referenced by getNumFutureSamples().
|
private |
Referenced by flush(), run(), size(), and ~SampleSorter().
|
private |
Referenced by flush(), interrupt(), and run().
|
private |
Length of SampleSorter, in micro-seconds.
Referenced by getLengthSecs(), run(), setLengthSecs(), and ~SampleSorter().
|
private |
Referenced by addSampleClient(), addSampleClientForTag(), addSampleTag(), flush(), getClientCount(), getKeepStats(), getProcessedSampleSource(), getRawSampleSource(), getSampleStats(), getSampleTagIterator(), getSampleTags(), removeSampleClient(), removeSampleClientForTag(), removeSampleTag(), run(), and setKeepStats().