nidas
v1.2-1520
|
#include <SyncRecordSource.h>
Public Member Functions | |
SyncRecordSource () | |
SyncRecordSource builds "sync records" from a sample stream. More... | |
virtual | ~SyncRecordSource () |
SampleSource * | getRawSampleSource () |
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples. More... | |
SampleSource * | getProcessedSampleSource () |
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples. More... | |
std::list< const SampleTag * > | getSampleTags () const |
Get the output SampleTags. More... | |
SampleTagIterator | getSampleTagIterator () const |
Implementation of SampleSource::getSampleTagIterator(). More... | |
void | addSampleClient (SampleClient *client) throw () |
Implementation of SampleSource::addSampleClient(). More... | |
void | removeSampleClient (SampleClient *client) throw () |
Remove a SampleClient from this SampleSource. More... | |
void | addSampleClientForTag (SampleClient *client, const SampleTag *) throw () |
Add a Client for a given SampleTag. More... | |
void | removeSampleClientForTag (SampleClient *client, const SampleTag *) throw () |
Remove a SampleClient for a given SampleTag from this SampleSource. More... | |
int | getClientCount () const throw () |
How many SampleClients are currently in my list. More... | |
void | flush () throw () |
Implementation of Resampler::flush(). More... | |
const SampleStats & | getSampleStats () const |
void | connect (SampleSource *source) throw () |
Connect the resampler to a source. More... | |
void | disconnect (SampleSource *source) throw () |
void | init () |
Create the SyncInfo objects for all samples and their variables that will go in the sync record. More... | |
void | sendSyncHeader () throw () |
Generate and send a sync record header sample. More... | |
bool | receive (const Sample *) throw () |
Method called to pass a sample to this client. More... | |
void | preLoadCalibrations (dsm_time_t sampleTime) throw () |
bool | prevRecord (SyncInfo &sinfo) |
If the previous sync record is non-null, do sinfo.decrementRecord(). More... | |
bool | nextRecord (SyncInfo &sinfo) |
If the next sync record is non-null, do sinfo.incrementRecord(). More... | |
int | computeSlotIndex (const Sample *samp, SyncInfo &sinfo) |
Which time slot should a sample be placed. More... | |
Static Public Member Functions | |
static void | selectVariablesFromProject (Project *project, std::list< const Variable * > &variables) |
This method and selectVariablesFromSensor() are used to select the list of variables from a Project configuration in order of sensor, with variables in order for each sensor, accepting only the variables which make sense for Aircraft SyncRecords. More... | |
static void | selectVariablesFromSensor (DSMSensor *sensor, std::list< const Variable * > &variables) |
See selectVariablesFromProject(). More... | |
static int | nextRecordIndex (int i) |
Return the index of the next sync record. More... | |
static int | prevRecordIndex (int i) |
Return the index of the previous sync record. More... | |
Static Public Attributes | |
static const int | NSLOT_LIMIT = 2 |
Maximum number of delta-Ts allowed between a sample time tag and its slot in the sync record. More... | |
static const int | NSYNCREC = 2 |
Private Member Functions | |
bool | checkTime (const Sample *samp, SyncInfo &sinfo, SampleTracer &stracer, nidas::util::LogContext &lc, int warn_times) |
void | slog (SampleTracer &stracer, const std::string &msg, const Sample *samp, const SyncInfo &sinfo) |
void | log (nidas::util::LogContext &lc, const std::string &msg, const Sample *samp, const SyncInfo &sinfo) |
void | log (nidas::util::LogContext &lc, const std::string &msg, const SyncInfo &sinfo) |
void | addSampleTag (const SampleTag *tag) throw () |
Add a SampleTag to this SampleSource. More... | |
void | removeSampleTag (const SampleTag *tag) throw () |
void | createHeader (std::ostream &) throw () |
void | sendSyncRecord () |
void | allocateRecord (int irec, dsm_time_t timetag) |
int | advanceRecord (dsm_time_t timetag) |
SyncRecordSource (const SyncRecordSource &) | |
No copying. More... | |
SyncRecordSource & | operator= (const SyncRecordSource &) |
No assignment. More... | |
Private Attributes | |
SampleSourceSupport | _source |
std::map< dsm_sample_id_t, SyncInfo > | _syncInfo |
Info kept for each sample in order to assemble and write sync records. More... | |
std::list< const Variable * > | _variables |
List of all variables in the sync record. More... | |
SampleTag | _syncRecordHeaderSampleTag |
SampleTag | _syncRecordDataSampleTag |
size_t | _recSize |
dsm_time_t | _syncHeaderTime |
dsm_time_t | _syncTime [2] |
int | _current |
Index of current sync record. More... | |
int | _halfMaxUsecsPerSample |
SampleT< double > * | _syncRecord [2] |
double * | _dataPtr [2] |
size_t | _unrecognizedSamples |
std::ostringstream | _headerStream |
const Aircraft * | _aircraft |
bool | _initialized |
int | _unknownSampleType |
unsigned int | _badLaterSamples |
SyncRecordSource::SyncRecordSource | ( | ) |
SyncRecordSource builds "sync records" from a sample stream.
This is a lossy process whete data is being munged into a one-second, ragged matrix of double precision values, called a sync record. Each row of the matrix contains the data for the variables with a given sample id. In NIDAS, a sample is a group of variables all with the same timetag. In order to build sync records a sample also must have a known sampling rate, SampleTag::getRate().
Each row of the matrix is formatted as follows, where the sample row includes variables var0, var1, etc: toffset, var0[0], var0[1], ... var0[N-1], var1[0], var1[1], ... toffset is the number of microseconds into the second to associate with the zeroth value of each variable. N is the sampling rate (#/sec) of all the variables in the sample, rounded up to the next highest integer if the rate is not integral. var0[i] are the vector of values of a variable at time i in the record. The vector may have more then one value, if for example the variable is a histogram.
So a sync record is a 4D ragged array with the following dimensions, where last dimension varies most rapidly:
syncrec[nsamples][nvars][nSlots][varlen]
nsamples is the number of different sample IDs in the configuration. nvars is the number of variables in a sample. varlen is the number of values for a sample of a variable, usually 1. nSlots is the number of samples per second for a variable/sample, rounded up if necessary. toffset is like an initial variable in the sample with a time and nvalue dimension of 1.
Typical sampling rates are:
rate usec/sample nSlots 1000 1000 1000 100 10000 100 50 20000 50 12.5 80000 13 ARINC (next highest integer) 10 100000 10 8 125000 8 6.25 160000 7 ARINC 3.125 320000 4 ARINC 3 333333 in-exact 3 1.5625 640000 2 ARINC 1 1000000 1
The information to unpack a sync record is stored in the header, which is passed first to clients.
When a sync record is read, the timetags corresponding to the time slot of each variable value are re-constructed from:
The slot time tags are then: sampleTime = syncTime + toffset + (timeIndex * dt)
If the samples are not actually evenly spaced, then exact time information is lost in resampling into the sync record. Data samples can be discarded, or slots skipped in the output, if a sample time tag is more than NSLOT_LIMIT*dt from the slot time.
For example, assume these timetags for a 10 Hz variable, with some usual timetag jitter:
sample times dataval 00:00:00.09 0 00:00:00.18 1 00:00:00.32 2 00:00:00.60 3 gap, followed by closely spaced samples, typically 00:00:00.62 4 due to DSM latency. 00:00:00.65 5 00:00:00.71 6 00:00:00.79 7 00:00:01.20 8 big gap 0.41 sec 00:00:01.22 9
The corresponding values will be put into a row of a sync record whose time is 00:00:00. The time offset for the sample row will be .09 sec (the offset of the first sample).
Placing this data in the sync record loses time information for all but the first sample. The slot times in the sync record are (syncTime + toffset + N * dT), where N=[0,9], and dT=0.1 sec:
As each sample is received, it is placed in the next slot in the sync record, unless its time tag differs from the slot time by more than NSLOT_LIMIT time dt.
syncrec dataval time difference from actual sample time 00:00:00.09 0 0.0 00:00:00.19 1 +0.1 00:00:00.29 2 -0.3 00:00:00.39 3 -0.21 value from 00:00:00.60 00:00:00.49 4 -0.13 value from 00:00:00.62 00:00:00.59 5 -0.03 value from 00:00:00.65 00:00:00.69 6 -0.2 00:00:00.79 7 0.0 00:00:00.89 NaN result of gap, diff of 0.31 sec 00:00:00.99 8 -0.21 value from 00:00:01.20 00:00:01.10 9 0.0 next record, offset of 0.10 sec
A previous version of SyncRecordSource computed a slot time index for each sample timeIndex = (sampleTime - syncTime - toffset + dt/2) / dt This resulted in an more loss of samples over gaps and latency jitter: 00:00:00.09 0 0.0 00:00:00.19 1 +0.1 00:00:00.29 2 -0.3 00:00:00.39 NaN 00:00:00.49 NaN 00:00:00.59 4 data from 0.62 00:00:00.69 NaN 00:00:00.79 7 data from 0.79 00:00:00.89 NaN 00:00:00.99 NaN 00:00:01.22 NaN
The received samples are sorted in time before they are placed in the sync record, but because a sample from the next second may be placed in the previous sync record, we have maintain two sync records as they are filled.
References _syncRecordDataSampleTag, _syncRecordHeaderSampleTag, _syncTime, addSampleTag(), nidas::core::SampleTag::setDSMId(), nidas::core::SampleTag::setRate(), nidas::core::SampleTag::setSampleId(), nidas::core::SampleTag::setSensorId(), SYNC_RECORD_HEADER_ID, and SYNC_RECORD_ID.
|
virtual |
References _syncInfo, _syncRecord, nidas::dynld::raf::SyncInfo::discarded, nidas::core::SampleT< DataT >::freeReference(), GET_DSM_ID, GET_SPS_ID, ILOG, nidas::dynld::raf::SyncInfo::noverWritten, nidas::dynld::raf::SyncInfo::rate, nidas::dynld::raf::SyncInfo::skipped, and nidas::dynld::raf::SyncInfo::total.
|
private |
No copying.
|
inlinevirtual |
Implementation of SampleSource::addSampleClient().
Implements nidas::core::SampleSource.
|
inlinevirtual |
Add a Client for a given SampleTag.
Implementation of SampleSource::addSampleClient().
Implements nidas::core::SampleSource.
|
inlineprivatevirtual |
Add a SampleTag to this SampleSource.
Implements nidas::core::SampleSource.
References nidas::core::SampleSourceSupport::addSampleTag().
Referenced by SyncRecordSource().
|
private |
References _current, _syncInfo, _syncRecord, _syncTime, allocateRecord(), nidas::util::UTime::format(), nextRecordIndex(), and sendSyncRecord().
|
private |
References _dataPtr, _recSize, _syncRecord, _syncTime, nidas::util::LogContext::active(), nidas::core::doubleNAN, nidas::util::LogContext::log(), LOG_DEBUG, nidas::core::Sample::setTimeTag(), SYNC_RECORD_ID, and USECS_PER_SEC.
Referenced by advanceRecord(), and nextRecord().
|
private |
References _syncTime, nidas::dynld::raf::SyncInfo::computeSlotIndex(), nidas::dynld::raf::SyncInfo::decrementSlot(), nidas::dynld::raf::SyncInfo::discarded, nidas::dynld::raf::SyncInfo::dtUsec, format_time(), nidas::dynld::raf::SyncInfo::getRecordIndex(), nidas::dynld::raf::SyncInfo::getSlotIndex(), nidas::core::Sample::getTimeTag(), nidas::dynld::raf::SyncInfo::incrementSlot(), log(), nidas::dynld::raf::SyncInfo::minDiff, nidas::dynld::raf::SyncInfo::minDiffInit, nidas::dynld::raf::SyncInfo::nEarlySamp, nidas::dynld::raf::SyncInfo::nLateSamp, nidas::dynld::raf::SyncInfo::nskips, NSLOT_LIMIT, nidas::dynld::raf::SyncInfo::nSlots, nidas::dynld::raf::SyncInfo::outOfSlotMax, nidas::dynld::raf::SyncInfo::overWritten, nidas::dynld::raf::SyncInfo::skipped, slog(), nidas::dynld::raf::SyncInfo::TDIFF_CHECK_USEC, and USECS_PER_SEC.
Which time slot should a sample be placed.
References _syncRecord, _syncTime, nidas::dynld::raf::SyncInfo::dtUsec, nidas::dynld::raf::SyncInfo::getRecordIndex(), nidas::core::Sample::getTimeTag(), log(), LOG_ERR, nextRecord(), and USECS_PER_SEC.
Referenced by nidas::dynld::raf::SyncInfo::computeSlotIndex().
|
virtual |
Connect the resampler to a source.
Implements nidas::core::Resampler.
References DLOG, nidas::dynld::raf::Aircraft::getAircraft(), and nidas::core::Project::getInstance().
Referenced by nidas::dynld::raf::SyncRecordGenerator::connectSource().
|
private |
References nidas::core::VariableConverter::getCalFile(), nidas::core::Polynomial::getCoefficients(), nidas::core::Variable::getConverter(), nidas::core::CalFile::getFile(), nidas::core::Linear::getIntercept(), nidas::core::Variable::getLength(), nidas::core::Variable::getLongName(), nidas::core::Variable::getName(), nidas::core::CalFile::getPath(), nidas::core::Linear::getSlope(), nidas::core::Version::getSoftwareVersion(), nidas::core::Variable::getType(), nidas::core::VariableConverter::getUnits(), nidas::core::Variable::getUnits(), rate, nidas::dynld::raf::SyncInfo::rate, nidas::dynld::raf::SyncInfo::variables, and WLOG.
Referenced by sendSyncHeader().
|
virtual |
Implements nidas::core::Resampler.
|
virtual |
Implementation of Resampler::flush().
Send current sync record, whether finished or not.
Implements nidas::core::Resampler.
References _current, DLOG, nextRecordIndex(), NSYNCREC, and sendSyncRecord().
|
inlinevirtual |
How many SampleClients are currently in my list.
Implements nidas::core::SampleSource.
|
inlinevirtual |
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
SampleClients use this method to get a pointer to whatever sample source they are interested in. Derived classes can return NULL if they are not a SampleSource of processed samples.
Implements nidas::core::SampleSource.
|
inlinevirtual |
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
SampleClients use this method to get a pointer to whatever sample source they are interested in. Derived classes can return NULL if they are not a SampleSource of raw samples.
Implements nidas::core::SampleSource.
|
inlinevirtual |
Implements nidas::core::SampleSource.
|
inlinevirtual |
Implementation of SampleSource::getSampleTagIterator().
Implements nidas::core::SampleSource.
|
inlinevirtual |
Get the output SampleTags.
Implements nidas::core::SampleSource.
Referenced by nidas::dynld::raf::SyncRecordGenerator::connectSource().
void SyncRecordSource::init | ( | ) |
Create the SyncInfo objects for all samples and their variables that will go in the sync record.
SyncRecordSource first generates the list of variables with selectVariablesFromProject() prior to calling init().
References _halfMaxUsecsPerSample, _initialized, _recSize, _syncInfo, _syncRecordDataSampleTag, _syncRecordHeaderSampleTag, _variables, nidas::core::SampleTag::addVariable(), nidas::dynld::raf::SyncInfo::addVariable(), nidas::core::SampleTag::getId(), nidas::core::SampleTag::getRate(), nidas::core::Variable::getSampleTag(), rate, nidas::dynld::raf::SyncInfo::sampleLength, nidas::dynld::raf::SyncInfo::sampleSRIndex, USECS_PER_SEC, nidas::dynld::raf::SyncInfo::variables, and nidas::dynld::raf::SyncInfo::varSRIndex.
|
private |
Referenced by checkTime(), computeSlotIndex(), and nextRecord().
|
private |
bool SyncRecordSource::nextRecord | ( | SyncInfo & | sinfo | ) |
If the next sync record is non-null, do sinfo.incrementRecord().
References _current, _syncRecord, _syncTime, allocateRecord(), nidas::dynld::raf::SyncInfo::getRecordIndex(), nidas::dynld::raf::SyncInfo::incrementRecord(), log(), LOG_ERR, nextRecordIndex(), nidas::dynld::raf::SyncInfo::overWritten, and USECS_PER_SEC.
Referenced by computeSlotIndex(), and nidas::dynld::raf::SyncInfo::incrementSlot().
|
static |
Return the index of the next sync record.
References NSYNCREC.
Referenced by advanceRecord(), nidas::dynld::raf::SyncInfo::advanceRecord(), flush(), nextRecord(), and prevRecordIndex().
|
private |
No assignment.
void SyncRecordSource::preLoadCalibrations | ( | dsm_time_t | sampleTime | ) | |
throw | ( | ||||
) |
References nidas::util::LogContext::active(), nidas::util::endlog(), nidas::core::Polynomial::getCoefficients(), nidas::core::Variable::getConverter(), nidas::core::Linear::getIntercept(), nidas::core::Variable::getName(), nidas::core::Linear::getSlope(), ILOG, LOG_INFO, and nidas::core::VariableConverter::readCalFile().
bool SyncRecordSource::prevRecord | ( | SyncInfo & | sinfo | ) |
If the previous sync record is non-null, do sinfo.decrementRecord().
References _current, _syncRecord, nidas::dynld::raf::SyncInfo::decrementRecord(), nidas::dynld::raf::SyncInfo::getRecordIndex(), and prevRecordIndex().
Referenced by nidas::dynld::raf::SyncInfo::decrementSlot().
|
static |
Return the index of the previous sync record.
Since currently NSYNCREC==2, this return the same value as nextRecordIndex().
References nextRecordIndex().
Referenced by prevRecord().
|
virtual |
Method called to pass a sample to this client.
This method is typically called by a SampleSource for each of its SampleClients when it has a sample ready. Returns true: success false: sample rejected. This is meant to signal a warning-type situation - like a socket not being available temporarily. True errors will be thrown as an IOException.
Implements nidas::core::SampleClient.
References nidas::dynld::raf::SyncInfo::computeSlotIndex(), nidas::dynld::raf::SyncInfo::discarded, nidas::core::DOUBLE_ST, nidas::core::FLOAT_ST, nidas::core::Sample::getId(), nidas::dynld::raf::SyncInfo::getRecordIndex(), nidas::dynld::raf::SyncInfo::getSlotIndex(), nidas::core::Sample::getTimeTag(), nidas::core::Sample::getType(), nidas::dynld::raf::SyncInfo::incrementSlot(), LOG_ERR, LOG_INFO, LOG_VERBOSE, LOG_WARNING, nidas::dynld::raf::SyncInfo::minDiff, nidas::dynld::raf::SyncInfo::noverWritten, nidas::dynld::raf::SyncInfo::nSlots, nidas::dynld::raf::SyncInfo::overWritten, nidas::dynld::raf::SyncInfo::sampleSRIndex, nidas::dynld::raf::SyncInfo::total, nidas::core::UINT32_ST, USECS_PER_MSEC, USECS_PER_SEC, nidas::dynld::raf::SyncInfo::varLengths, and nidas::dynld::raf::SyncInfo::varSRIndex.
|
inlinevirtual |
Remove a SampleClient from this SampleSource.
Implements nidas::core::SampleSource.
Referenced by nidas::dynld::raf::SyncRecordGenerator::~SyncRecordGenerator().
|
inlinevirtual |
Remove a SampleClient for a given SampleTag from this SampleSource.
The pointer to the SampleClient must remain valid, until after it is removed.
Implements nidas::core::SampleSource.
|
inlineprivatevirtual |
Implements nidas::core::SampleSource.
|
static |
This method and selectVariablesFromSensor() are used to select the list of variables from a Project configuration in order of sensor, with variables in order for each sensor, accepting only the variables which make sense for Aircraft SyncRecords.
The variables are appended to the variables
list.
Rather than rely on the sample tags from the source, this allows a single function to be shared to accumulate processed sample tags and variables directly from the Project. Applications (like nimbus) can use this to get the same list of Variables as would be retrieved from SyncRecordReader, except they include all the metadata directly from the Project instead of the sync record header.
References nidas::core::Project::getSensorIterator(), nidas::core::SensorIterator::hasNext(), nidas::core::SensorIterator::next(), and selectVariablesFromSensor().
|
static |
See selectVariablesFromProject().
References nidas::core::Variable::CONTINUOUS, nidas::core::Variable::COUNTER, GET_DSM_ID, GET_SPS_ID, nidas::core::SampleTag::getId(), nidas::util::Logger::getInstance(), nidas::core::DSMSensor::getSampleTags(), nidas::core::Variable::getType(), nidas::core::SampleTag::getVariables(), nidas::core::SampleTag::isProcessed(), nidas::util::Logger::log(), and LOG_WARNING.
Referenced by selectVariablesFromProject().
void SyncRecordSource::sendSyncHeader | ( | ) | ||
throw | ( | |||
) |
Generate and send a sync record header sample.
The sync record should have been laid out already, but that happens when a source is connected with connect(SampleSource*). Typically sendSyncHeader() should be called after clients are connected with addSampleClient(). The header time tag is the time of the first raw sample read from the input stream, set in the call to preLoadCalibrations(). This is not to be confused with a NIDAS stream header, like would be generated by a HeaderSource, thus the distinction in the name.
References _headerStream, _source, _syncHeaderTime, createHeader(), nidas::core::SampleSourceSupport::distribute(), DLOG, nidas::core::SampleT< DataT >::getDataPtr(), nidas::core::Sample::getTimeTag(), nidas::core::Sample::setId(), nidas::core::Sample::setTimeTag(), and SYNC_RECORD_HEADER_ID.
|
private |
References _current, _source, _syncRecord, _syncTime, nidas::util::LogContext::active(), nidas::core::SampleSourceSupport::distribute(), format_time(), nidas::core::Sample::getTimeTag(), nidas::util::LogContext::log(), LOG_DEBUG, and NSYNCREC.
Referenced by advanceRecord(), and flush().
|
private |
References _current, _syncTime, nidas::core::SampleTracer::active(), nidas::util::endlog(), format_time(), nidas::dynld::raf::SyncInfo::getRecordIndex(), nidas::dynld::raf::SyncInfo::getSlotIndex(), nidas::dynld::raf::SyncInfo::minDiff, nidas::core::SampleTracer::msg(), nidas::dynld::raf::SyncInfo::nSlots, nidas::dynld::raf::SyncInfo::rate, nidas::dynld::raf::SyncInfo::skipMod, nidas::dynld::raf::SyncInfo::skipModCount, and USECS_PER_SEC.
Referenced by checkTime().
|
private |
|
private |
|
private |
Index of current sync record.
Referenced by advanceRecord(), flush(), nextRecord(), prevRecord(), sendSyncRecord(), and slog().
|
private |
Referenced by allocateRecord().
|
private |
Referenced by init().
|
private |
Referenced by sendSyncHeader().
|
private |
Referenced by init().
|
private |
Referenced by allocateRecord(), and init().
|
private |
Referenced by sendSyncHeader(), and sendSyncRecord().
|
private |
Referenced by sendSyncHeader().
|
private |
Info kept for each sample in order to assemble and write sync records.
Note there is not a default, no-arg constuctor for SyncInfo. So you can't do: SyncInfo& sinfo = _syncInfo[id]; which would need the no-arg constructor if the element is not found. Instead, do: map<dsm_sample_id_t, SyncInfo>::iterator si = _syncInfo.find(id); if (si != _syncInfo.end()) { SyncInfo& sinfo = si->second; // use reference to avoid copy ... }
Referenced by advanceRecord(), init(), and ~SyncRecordSource().
|
private |
Referenced by advanceRecord(), allocateRecord(), computeSlotIndex(), nextRecord(), prevRecord(), sendSyncRecord(), and ~SyncRecordSource().
|
private |
Referenced by init(), and SyncRecordSource().
|
private |
Referenced by init(), and SyncRecordSource().
|
private |
Referenced by advanceRecord(), allocateRecord(), checkTime(), computeSlotIndex(), nextRecord(), sendSyncRecord(), slog(), and SyncRecordSource().
|
private |
|
private |
|
private |
List of all variables in the sync record.
Referenced by init().
|
static |
Maximum number of delta-Ts allowed between a sample time tag and its slot in the sync record.
Referenced by checkTime().
|
static |
Referenced by flush(), nextRecordIndex(), and sendSyncRecord().