nidas  v1.2-1520
Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Protected Member Functions | Private Member Functions | Private Attributes | List of all members
nidas::dynld::raf::SyncServer Class Reference

#include <SyncServer.h>

Inheritance diagram for nidas::dynld::raf::SyncServer:
Inheritance graph
[legend]

Public Types

enum  runStatus { RUN_CANCELED = -1, RUN_OK = 0, NOT_RUNNING = 1, RUN_EXCEPTION = 2 }
 Values that can be returned by run method. More...
 
enum  SchedPolicy { NU_THREAD_OTHER =SCHED_OTHER, NU_THREAD_FIFO =SCHED_FIFO, NU_THREAD_RR =SCHED_RR }
 

Public Member Functions

 SyncServer ()
 
 ~SyncServer ()
 
void openStream ()
 Open the data file input stream and read the nidas header, but do not parse the project. More...
 
void init () throw (nidas::util::Exception)
 Call this method to parse the project, setup sample tags, preload calibrations using the time of the first sample, setup sample streams, and init the sensors. More...
 
int run () throw (nidas::util::Exception)
 This method implements the Runnable interface for Threads, but it can also be called synchronously when run from main(). More...
 
virtual void interrupt ()
 Interrupt this thread. More...
 
void read (bool once=false) throw (nidas::util::IOException)
 
void setSorterLengthSeconds (float sorter_secs)
 
void setRawSorterLengthSeconds (float sorter_secs)
 
std::string getXMLFileName ()
 Return the current XML filename setting. More...
 
void setXMLFileName (const std::string &name)
 
void resetAddress (nidas::util::SocketAddress *addr)
 
void addSampleClient (SampleClient *client)
 Specify a SampleClient instance to receive the sync samples instead of writing the sync samples to an output stream. More...
 
void setDataFileNames (const std::list< std::string > &dataFileNames)
 
void setStopSignal (StopSignal *stop_signal)
 Set a callback function which will be called when the SyncServer reaches EOF on its input stream or stops on an error. More...
 
void getTimeWindow (nidas::util::UTime *start, nidas::util::UTime *end)
 
void setTimeWindow (nidas::util::UTime start, nidas::util::UTime end)
 
void connect (SampleOutput *output) throw ()
 Implementation of SampleConnectionRequester::connect(). More...
 
void disconnect (SampleOutput *output) throw ()
 Implementation of SampleConnectionRequester::disconnect(). More...
 
virtual void start () throw (Exception)
 Start the thread running, meaning execute the run method in a separate thread. More...
 
virtual int join () throw (Exception)
 The calling thread joins this thread, waiting until the thread finishes, which means either that the run() method returned, the thread called pthread_exit() or the thread was cancelled. More...
 
virtual void kill (int sig) throw (Exception)
 Send a signal to this thread. More...
 
virtual void cancel () throw (Exception)
 Cancel this thread. More...
 
virtual bool isInterrupted () const
 Return true when this thread has been interrupted. More...
 
virtual bool isRunning () const
 Is this thread running? More...
 
virtual bool isJoined () const
 Has this thread been joined? More...
 
virtual bool isDetached () const
 Is this a detached thread. More...
 
bool isCancelEnabled () const
 Return true if the cancel state of this thread is PTHREAD_CANCEL_ENABLE. More...
 
bool isCancelDeferred () const
 Return true if the cancel type of this thread is PTHREAD_CANCEL_DEFERRED. More...
 
const std::string & getName () const throw ()
 Return the name of this thread. More...
 
const std::string & getFullName () throw ()
 Return a name with a bunch of descriptive fields, specifying whether it is detached, the real-time priority, etc. More...
 
bool setRealTimeRoundRobinPriority (int val) throw (Exception)
 
bool setRealTimeFIFOPriority (int val) throw (Exception)
 
bool setNonRealTimePriority () throw (Exception)
 
void setThreadScheduler (enum SchedPolicy policy, int priority) throw (Exception)
 
void blockSignal (int)
 Block a signal in this thread. More...
 
void unblockSignal (int)
 Install a signal handler and unblock the signal. More...
 

Static Public Member Functions

static ThreadcurrentThread ()
 Return the thread object for the current thread. More...
 
static pthread_t currentThreadId ()
 
static ThreadlookupThread (pthread_t id)
 
static const std::string & currentName ()
 Convenience routine to return the name for the current thread, or a string indicating that the name of the thread is unknown. More...
 
static std::string getPolicyString (int policy)
 Convenience function to return a string for the given scheduler policy: "Non-RT", "RT:FIFO", "RT:RR" or "RT:Unknown". More...
 
static int test (int argc, char **argv)
 a test method. More...
 

Static Public Attributes

static const int DEFAULT_PORT = 30001
 
static const float SORTER_LENGTH_SECS = 5.0
 
static const float RAW_SORTER_LENGTH_SECS = 5.0
 

Protected Member Functions

void setCancelEnabled (bool val)
 Set the cancel state for this thread - false means cancel requests are ignored. More...
 
void setCancelDeferred (bool val)
 Set the cancel type for this thread. More...
 
pthread_t getId () const
 
void testCancel () const
 Check if we have been cancelled. More...
 
virtual bool amInterrupted () const
 Call testCancel, and return true when this thread has been interrupted. More...
 

Private Member Functions

void initProject ()
 
void initSensors (SampleInputStream &sis)
 
void stop ()
 
void signalStop ()
 
void handleSample (nidas::core::Sample *sample)
 
 SyncServer (const SyncServer &)
 
SyncServeroperator= (const SyncServer &)
 

Private Attributes

SamplePipeline _pipeline
 
SyncRecordGenerator _syncGen
 
RawSampleInputStream_inputStream
 
SampleOutputStream_outputStream
 
std::string _xmlFileName
 
std::list< std::string > _dataFileNames
 
nidas::util::auto_ptr
< nidas::util::SocketAddress
_address
 
float _sorterLengthSecs
 
float _rawSorterLengthSecs
 
SampleClient_sampleClient
 
StopSignal_stop_signal
 
nidas::core::Sample_firstSample
 
dsm_time_t _startTime
 
dsm_time_t _startWindow
 
dsm_time_t _endWindow
 

Member Enumeration Documentation

Values that can be returned by run method.

User can define other values greater than RUN_EXCEPTION. These values are then returned by int join(). Note that PTHREAD_CANCELLED is -1

Enumerator
RUN_CANCELED 
RUN_OK 
NOT_RUNNING 
RUN_EXCEPTION 
Enumerator
NU_THREAD_OTHER 
NU_THREAD_FIFO 
NU_THREAD_RR 

Constructor & Destructor Documentation

SyncServer::SyncServer ( )
SyncServer::~SyncServer ( )
nidas::dynld::raf::SyncServer::SyncServer ( const SyncServer )
private

Member Function Documentation

void nidas::dynld::raf::SyncServer::addSampleClient ( SampleClient client)
inline

Specify a SampleClient instance to receive the sync samples instead of writing the sync samples to an output stream.

References _sampleClient.

Referenced by nidas::dynld::raf::SyncRecordReader::SyncRecordReader().

bool Runnable::amInterrupted ( ) const
protectedvirtualinherited

Call testCancel, and return true when this thread has been interrupted.

This is protected because it should only called within the run method of the thread.

Referenced by nidas::core::SampleOutputRequestThread::run(), nidas::core::Looper::run(), nidas::dynld::isff::GOESOutput::run(), nidas::core::DSMServerStat::run(), and nidas::util::Thread::test().

void Thread::blockSignal ( int  sig)
inherited
void Thread::cancel ( )
throw (Exception
)
virtualinherited
void SyncServer::connect ( SampleOutput output)
throw (
)
virtual
static const std::string& nidas::util::Thread::currentName ( )
inlinestaticinherited

Convenience routine to return the name for the current thread, or a string indicating that the name of the thread is unknown.

References nidas::util::Thread::currentThread(), nidas::util::Thread::getName(), and nidas::util::Thread::unknownName.

Referenced by nidas::util::Logger::msg_locked().

Thread * Thread::currentThread ( )
staticinherited

Return the thread object for the current thread.

Returns null if not found.

Referenced by nidas::util::Thread::blockSignal(), nidas::util::Thread::currentName(), and nidas::util::Thread::unblockSignal().

pthread_t Thread::currentThreadId ( )
staticinherited
void SyncServer::disconnect ( SampleOutput output)
throw (
)
virtual

Implementation of SampleConnectionRequester::disconnect().

If client has disconnected, interrupt the sample loop and exit.

Implements nidas::core::SampleConnectionRequester.

References interrupt().

const std::string & Thread::getFullName ( )
throw (
)
inherited

Return a name with a bunch of descriptive fields, specifying whether it is detached, the real-time priority, etc.

References nidas::util::Thread::_fullname.

Referenced by nidas::util::Thread::pRun(), nidas::util::Thread::test(), nidas::util::Thread::thr_cleanup(), and nidas::util::Thread::thr_cleanup_delete().

pthread_t nidas::util::Thread::getId ( ) const
inlineprotectedinherited
const std::string & Thread::getName ( ) const
throw (
)
inherited
string Thread::getPolicyString ( int  policy)
staticinherited

Convenience function to return a string for the given scheduler policy: "Non-RT", "RT:FIFO", "RT:RR" or "RT:Unknown".

References nidas::util::Thread::NU_THREAD_FIFO, nidas::util::Thread::NU_THREAD_OTHER, and nidas::util::Thread::NU_THREAD_RR.

Referenced by nidas::core::DSMSensor::getLooper(), and nidas::util::Thread::makeFullName().

void SyncServer::getTimeWindow ( nidas::util::UTime start,
nidas::util::UTime end 
)

References _endWindow, and _startWindow.

std::string nidas::dynld::raf::SyncServer::getXMLFileName ( )
inline

Return the current XML filename setting.

If called after calling openStream() and before setting it explicitly with setXMLFileName(), then the return value is the XML filename from the stream header.

References _xmlFileName.

void SyncServer::handleSample ( nidas::core::Sample sample)
private
void SyncServer::init ( )
throw (nidas::util::Exception
)
void SyncServer::initProject ( )
private
void SyncServer::initSensors ( SampleInputStream sis)
private
void SyncServer::interrupt ( )
virtual

Interrupt this thread.

This sets a boolean which can be tested with isInterrupted(). It is up to the run() implementation to return when interrupted. This is a "soft" request to terminate the thread.

What follows is a discussion of when to use cancel(), kill(), or interrupt() to terminate a thread.

If you can consistently check the state of isInterrupted() in the run method, and return if it is true, at a time interval which is an acceptable amount of time to wait for the thread to terminate, then using interrupt() should work well.

If the run method does I/O, then things are usually a bit more complicated.

If all I/O is guaranteed to finish quickly, which is a rare situation, or is done with a timeout, such as using select/poll with a timeout before every read/write, then one could still use interrupt() as above, and the thread will terminate within the timeout period.

If the thread does blocking I/O operations, and the I/O might block for a period of time longer than you're willing to wait for the thread to terminate, then you need to use kill(sig) or cancel().

If a signal is sent to the thread with kill(sig) while the thread is blocking on an I/O operation, the I/O will immediately return with an errno of EINTR, and one could then return from the run method, after any necessary cleanup.

However there is a possibility that the signal could be missed. In order to make sure you receive a signal, you must block it, so that any received signals are held as pending, and then atomically unblock it with pselect/ppoll/epoll_pwait before performing the I/O operation.

If you do not use I/O timeouts, or kill(sig) with pselect/ppoll/epoll_pwait, or other tricks such as writing to a pipe that is watched with select/poll in the run method, then using cancel() is the only way to guarantee that your thread will terminate in an acceptable amount of time.

All NIDAS Threads support deferred cancelation. Immediate asynchronous cancellation is not fully supported (and is very hard to get right). Deferred cancelation means that cancellation is delayed until the thread next calls a system function that is a cancellation point. At that point the thread run method will simply terminate without any return value from the system function, and then execute any cleanup methods that may have been registered with pthread_cleanup_push.

A list of cancellation points is provided in the pthreads(7) man page. Cancellation points are typically I/O operations, waits or sleeps.

This immediate thread termination can be a problem if there is a possibility that your objects could be left in a bad state, such as with a mutex locked, though it isn't generally a good practice to hold mutexes during a time-consuming I/O operation, wait or sleep.

One should check the run method to see if the state of the objects is OK if execution stops at any of the cancellation points. Note that logging a message, or writing to cerr is a cancellation point. Typically there is error/exception handling associated with an I/O operation. Ensuring the state is OK upon a cancellation is similar to preparation for a fatal I/O error that requires a return of the run method. One can use setCancelEnabled() to defer cancellation.

One can use pthread_cleanup_push and pthread_cleanup_pop to register cleanup routines that are called when a thread is cancelled if special handling is required.

Reimplemented from nidas::util::Thread.

References _pipeline, DLOG, nidas::core::SamplePipeline::interrupt(), nidas::util::Thread::interrupt(), and signalStop().

Referenced by disconnect(), and interrupt_sync_server().

bool Thread::isCancelDeferred ( ) const
inherited

Return true if the cancel type of this thread is PTHREAD_CANCEL_DEFERRED.

References nidas::util::Thread::_cancel_deferred.

bool Thread::isCancelEnabled ( ) const
inherited

Return true if the cancel state of this thread is PTHREAD_CANCEL_ENABLE.

References nidas::util::Thread::_cancel_enabled.

virtual bool nidas::util::Thread::isDetached ( ) const
inlinevirtualinherited

Is this a detached thread.

References nidas::util::Thread::_detached.

virtual bool nidas::util::Thread::isInterrupted ( ) const
inlinevirtualinherited
virtual bool nidas::util::Thread::isJoined ( ) const
inlinevirtualinherited
virtual bool nidas::util::Thread::isRunning ( ) const
inlinevirtualinherited
int Thread::join ( )
throw (Exception
)
virtualinherited

The calling thread joins this thread, waiting until the thread finishes, which means either that the run() method returned, the thread called pthread_exit() or the thread was cancelled.

The return value is the int return value of the run method, or PTHREAD_CANCELED (-1). If the run method threw an Exception, it will be caught and then re-thrown by join.

Reimplemented in nidas::core::SensorHandler.

References nidas::util::Thread::_exception, nidas::util::Thread::_id, nidas::util::Thread::_mutex, nidas::util::Exception::errnoToString(), nidas::util::Thread::getId(), nidas::util::Thread::getName(), nidas::util::Thread::RUN_EXCEPTION, and nidas::util::Thread::RUN_OK.

Referenced by nidas::core::DSMService::checkSubThreads(), nidas::dynld::UDPSampleOutput::XMLSocketListener::checkWorkers(), nidas::dynld::UDPSampleOutput::close(), nidas::util::McSocketListener::close(), nidas::core::DerivedDataReader::deleteInstance(), nidas::dynld::UDPSampleOutput::XMLSocketListener::fireWorkers(), nidas::core::DSMService::join(), nidas::core::SensorHandler::join(), nidas::core::SamplePipeline::join(), nidas::core::DSMEngine::joinDataThreads(), nidas::dynld::isff::GOESOutput::joinThread(), nidas::core::DSMServerApp::killStatusThread(), nidas::core::DSMServerApp::killXmlRpcThread(), nidas::core::DSMEngine::killXmlRpcThread(), main(), nidas::core::Looper::removeClient(), nidas::util::ThreadJoiner::run(), nidas::core::SampleBuffer::~SampleBuffer(), nidas::core::SampleSorter::~SampleSorter(), nidas::core::ServerSocket::~ServerSocket(), and nidas::dynld::UDPSampleOutput::~UDPSampleOutput().

void Thread::kill ( int  sig)
throw (Exception
)
virtualinherited
Thread * Thread::lookupThread ( pthread_t  id)
staticinherited
void SyncServer::openStream ( )
SyncServer& nidas::dynld::raf::SyncServer::operator= ( const SyncServer )
private
void SyncServer::read ( bool  once = false)
throw (nidas::util::IOException
)
void nidas::dynld::raf::SyncServer::resetAddress ( nidas::util::SocketAddress addr)
inline

References _address.

Referenced by parseRunstring().

int SyncServer::run ( )
throw (nidas::util::Exception
)
virtual

This method implements the Runnable interface for Threads, but it can also be called synchronously when run from main().

Implements nidas::util::Runnable.

References DLOG, read(), stop(), nidas::core::XMLImplementation::terminate(), and nidas::util::Exception::what().

Referenced by main().

void Thread::setCancelDeferred ( bool  val)
protectedinherited

Set the cancel type for this thread.

true means cancel requests are deferred until the next cancellation point. false means they occur instantly. This is protected, it should be called only from a thread's own run method. See the pthreads(7) man page for a list of the cancellation points.

Note: non-deferred canceling is difficult to get right. It has not been tested with this class, and is not recommended.

References nidas::util::Thread::_cancel_deferred.

void Thread::setCancelEnabled ( bool  val)
protectedinherited

Set the cancel state for this thread - false means cancel requests are ignored.

See pthread_setcancelstate. This is protected, it should be called only from a thread's own run method.

References nidas::util::Thread::_cancel_enabled.

void nidas::dynld::raf::SyncServer::setDataFileNames ( const std::list< std::string > &  dataFileNames)
inline

References _dataFileNames.

Referenced by parseRunstring().

bool Thread::setNonRealTimePriority ( )
throw (Exception
)
inherited
void nidas::dynld::raf::SyncServer::setRawSorterLengthSeconds ( float  sorter_secs)
inline

References _rawSorterLengthSecs.

Referenced by parseRunstring().

bool Thread::setRealTimeFIFOPriority ( int  val)
throw (Exception
)
inherited
bool Thread::setRealTimeRoundRobinPriority ( int  val)
throw (Exception
)
inherited
void nidas::dynld::raf::SyncServer::setSorterLengthSeconds ( float  sorter_secs)
inline

References _sorterLengthSecs.

Referenced by parseRunstring().

void nidas::dynld::raf::SyncServer::setStopSignal ( StopSignal stop_signal)
inline

Set a callback function which will be called when the SyncServer reaches EOF on its input stream or stops on an error.

Since the pipeline is flushed when the input is closed, no more sync samples will be sent after this function is called. It is called from the thread running the run() method.

References _stop_signal.

Referenced by nidas::dynld::raf::SyncRecordReader::SyncRecordReader().

void Thread::setThreadScheduler ( enum SchedPolicy  policy,
int  priority 
)
throw (Exception
)
inherited
void SyncServer::setTimeWindow ( nidas::util::UTime  start,
nidas::util::UTime  end 
)
void nidas::dynld::raf::SyncServer::setXMLFileName ( const std::string &  name)
inline

References _xmlFileName.

Referenced by parseRunstring().

void SyncServer::signalStop ( )
private
void Thread::start ( )
throw (Exception
)
virtualinherited
void SyncServer::stop ( )
private
int Thread::test ( int  argc,
char **  argv 
)
staticinherited
void nidas::util::Runnable::testCancel ( ) const
inlineprotectedinherited

Check if we have been cancelled.

Calls pthread_testcancel. This is protected since it only checks the current thread - i.e. it must be called within the run method. Since it just calls pthread_testcancel, it is a cancellation point.

void Thread::unblockSignal ( int  sig)
inherited

Install a signal handler and unblock the signal.

The signal handler will log a message about the receipt of the signal at severity LOG_INFO using the nidas::util::Logger. Then, if the signal handler is being invoked from a registered Thread, the virtual method signalHandler() for that Thread will be called.

The signal handler is installed with the sigaction() system call, and will be the action for the given signal in all threads, including the main() thread. If other threads do not wish to take action on a given signal, they should call blockSignal(sig). Or they can define their own signalHandler() method.

After installing the signal handler, the signal is added to those that are unblocked for the thread, or if the Thread is not yet running, the signal will be unblocked in the thread once it runs.

As with blockSignal(), this method is typically called on this Thread before it has started. If this Thread has started, then the signal will only be unblocked if the method is called from this Thread, i.e. from its own run() method.

To install a signal handler, and then block the signal so that it is held as pending until it is later unblocked, typically with pselect(), or sigwaitinfo(), do:

void Thread::run()
{
// get the existing signal mask
sigset_t sigmask;
pthread_sigmask(SIG_BLOCK,NULL,&sigmask);
// remove SIGUSR1 from the mask passed to pselect
sigdelset(&sigmask,SIGUSR1);
for (;;) {
pselect(nfd,&readfds,0,0,0,&sigmask);
...
}
}
...
thread.unblockSignal(SIGUSR1);
thread.blockSignal(SIGUSR1);
thread.start();
...
try {
if (thread.isRunning()) {
thread.kill(SIGUSR1);
thread.join()
}
}

References nidas::util::Thread::_blockedSignals, nidas::util::Thread::_unblockedSignals, nidas::util::Thread::currentThread(), nidas::util::Thread::isRunning(), and nidas::util::Thread::thr_add_sig().

Referenced by nidas::core::DerivedDataReader::DerivedDataReader(), nidas::core::FsMountWorkerThread::FsMountWorkerThread(), nidas::core::StatusListener::StatusListener(), nidas::core::StatusThread::StatusThread(), nidas::dynld::XMLConfigService::Worker::Worker(), nidas::core::XmlRpcThread::XmlRpcThread(), and nidas::dynld::UDPSampleOutput::XMLSocketListener::XMLSocketListener().

Member Data Documentation

nidas::util::auto_ptr<nidas::util::SocketAddress> nidas::dynld::raf::SyncServer::_address
private

Referenced by init(), and resetAddress().

std::list<std::string> nidas::dynld::raf::SyncServer::_dataFileNames
private

Referenced by openStream(), and setDataFileNames().

dsm_time_t nidas::dynld::raf::SyncServer::_endWindow
private
nidas::core::Sample* nidas::dynld::raf::SyncServer::_firstSample
private

Referenced by openStream(), and ~SyncServer().

RawSampleInputStream* nidas::dynld::raf::SyncServer::_inputStream
private
SampleOutputStream* nidas::dynld::raf::SyncServer::_outputStream
private

Referenced by init(), stop(), and ~SyncServer().

SamplePipeline nidas::dynld::raf::SyncServer::_pipeline
private

Referenced by init(), interrupt(), and stop().

float nidas::dynld::raf::SyncServer::_rawSorterLengthSecs
private

Referenced by init(), and setRawSorterLengthSeconds().

SampleClient* nidas::dynld::raf::SyncServer::_sampleClient
private

Referenced by addSampleClient(), init(), and stop().

float nidas::dynld::raf::SyncServer::_sorterLengthSecs
private

Referenced by init(), and setSorterLengthSeconds().

dsm_time_t nidas::dynld::raf::SyncServer::_startTime
private

Referenced by init(), and openStream().

dsm_time_t nidas::dynld::raf::SyncServer::_startWindow
private
StopSignal* nidas::dynld::raf::SyncServer::_stop_signal
private
SyncRecordGenerator nidas::dynld::raf::SyncServer::_syncGen
private

Referenced by init(), and stop().

std::string nidas::dynld::raf::SyncServer::_xmlFileName
private
const int nidas::dynld::raf::SyncServer::DEFAULT_PORT = 30001
static
const float SyncServer::RAW_SORTER_LENGTH_SECS = 5.0
static
const float SyncServer::SORTER_LENGTH_SECS = 5.0
static

The documentation for this class was generated from the following files: