nidas v1.2.3
SyncRecordReader.h
Go to the documentation of this file.
1// -*- mode: C++; indent-tabs-mode: nil; c-basic-offset: 4; tab-width: 4; -*-
2// vim: set shiftwidth=4 softtabstop=4 expandtab:
3/*
4 ********************************************************************
5 ** NIDAS: NCAR In-situ Data Acquistion Software
6 **
7 ** 2005, Copyright University Corporation for Atmospheric Research
8 **
9 ** This program is free software; you can redistribute it and/or modify
10 ** it under the terms of the GNU General Public License as published by
11 ** the Free Software Foundation; either version 2 of the License, or
12 ** (at your option) any later version.
13 **
14 ** This program is distributed in the hope that it will be useful,
15 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
16 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 ** GNU General Public License for more details.
18 **
19 ** The LICENSE.txt file accompanying this software contains
20 ** a copy of the GNU General Public License. If it is not found,
21 ** write to the Free Software Foundation, Inc.,
22 ** 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23 **
24 ********************************************************************
25*/
26
27#ifndef NIDAS_DYNLD_RAF_SYNCRECORDREADER_H
28#define NIDAS_DYNLD_RAF_SYNCRECORDREADER_H
29
30#include <deque>
31
34#include "SyncRecordVariable.h"
35#include "SyncServer.h"
36
38
39#ifdef SYNC_RECORD_JSON_OUTPUT
40#include <json/json.h>
41#endif
42
43namespace nidas { namespace dynld { namespace raf {
44
46{
47public:
48 SyncRecHeaderException(const std::string& expect, const std::string& got):
49 nidas::util::Exception("SyncRecHeaderException",
50 std::string("expected: \"") + expect +
51 "\", got: \"" + got + "\"")
52 {
53 }
54 SyncRecHeaderException(const std::string& msg):
55 nidas::util::Exception("SyncRecHeaderException",msg)
56 {
57 }
58
66};
67
86{
87public:
88
95
101
102 virtual ~SyncRecordReader();
103
104 const std::string& getProjectName() const { return projectName; }
105
106 const std::string& getTailNumber() const { return aircraftName; }
107
108 const std::string& getFlightName() const { return flightName; }
109
110 const std::string& getSoftwareVersion() const { return softwareVersion; }
111
115 time_t getStartTime() const { return startTime; }
116
122 const std::list<const SyncRecordVariable*> getVariables();
123
128 const SyncRecordVariable* getVariable(const std::string& name) const;
129
134 size_t getNumValues() const { return numDataValues; }
135
147 size_t read(dsm_time_t* tt, double *ptr, size_t len);
148
149 const std::string &
151 {
152 return _header;
153 }
154
155 virtual bool
156 receive(const Sample *s) throw();
157
158 virtual void
159 flush() throw();
160
165 void
166 endOfStream();
167
171 int
172 getSyncRecOffset(const nidas::core::Variable* var);
173
177 int
178 getLagOffset(const nidas::core::Variable* var);
179
184 const std::string&
186 {
188 }
189
190private:
191
192 void init();
193
194 void scanHeader(const Sample* samp);
195
196 const Sample*
197 nextSample();
198
201
206
207 std::string getQuotedString(std::istringstream& str);
208
212 void readKeyedQuotedValues(std::istringstream& header);
213
215
216 std::list<SampleTag*> sampleTags;
217
218 std::list<const SyncRecordVariable*> variables;
219
220 std::map<std::string,const SyncRecordVariable*> variableMap;
221
223
224 std::string projectName;
225
226 std::string aircraftName;
227
228 std::string flightName;
229
230 std::string softwareVersion;
231
232 time_t startTime;
233
234 bool _debug;
235
236 std::string _header;
237
239 bool _eoq;
240
242 std::deque<const Sample*> _syncRecords;
243
245
248
251};
252
253#ifdef SYNC_RECORD_JSON_OUTPUT
254inline void
255write_sync_record_header_as_json(std::ostream& json,
256 const std::string& textheader)
257{
258 Json::Value root;
259
260 std::istringstream iss(textheader);
261 std::vector<std::string> lines;
262 std::string line;
263 while (getline(iss, line))
264 {
265 // Skip empty lines, especially the last one.
266 if (line.length())
267 {
268 lines.push_back(line);
269 }
270 }
271 Json::Value header;
272 header.resize(lines.size());
273 for (unsigned int i = 0; i < lines.size(); ++i)
274 {
275 header[i] = lines[i];
276 }
277 root["header"] = header;
278 json << root;
279}
280
281
282std::string
283json_sync_record_header_as_string(Json::Value& root)
284{
285 Json::Value& header = root["header"];
286
287 std::ostringstream oss;
288 for (unsigned int i = 0; i < header.size(); ++i)
289 {
290 oss << header[i].asString() << "\n";
291 }
292 return oss.str();
293}
294
295
296inline void
297write_sync_record_data_as_json(std::ostream& json,
298 dsm_time_t tt,
299 const double* rec,
300 size_t numValues)
301{
302 Json::Value root;
303 root["time"] = Json::UInt64(tt);
304 root["numValues"] = (int)numValues;
305 // Unfortunately the JSON spec does not support NAN, and so
306 // the data values are written as strings. NANs have a
307 // string form like 'nan' which strtod() can reliably
308 // convert back to a double. Likewise for infinity (inf*),
309 // but those are not as likely to be seen in nidas data.
310 Json::Value data;
311 data.resize(numValues);
312 char buf[64];
313 for (unsigned int i = 0; i < numValues; ++i)
314 {
315 snprintf(buf, sizeof(buf), "%.16g", rec[i]);
316 data[i] = buf;
317 }
318 root["data"] = data;
319 json << root;
320}
321
322inline std::string
323double_to_string(double value)
324{
325 char buf[64];
326 snprintf(buf, sizeof(buf), "%.16g", value);
327 return buf;
328}
329
330inline
331Json::Value
332write_sync_variable_as_json(const SyncRecordVariable* var,
333 dsm_time_t tt,
334 const double* rec)
335{
336 Json::Value variable;
337 Json::Value values(Json::arrayValue);
338
339 // Unfortunately the JSON spec does not support NAN, and so
340 // the data values are written as strings. NANs have a
341 // string form like 'nan' which strtod() can reliably
342 // convert back to a double. Likewise for infinity (inf*),
343 // but those are not as likely to be seen in nidas data.
344 size_t varoffset = var->getSyncRecOffset();
345 int irate = (int)ceil(var->getSampleRate());
346 int vlen = var->getLength();
347
348 values.resize(vlen*irate);
349 for (int i = 0; i < vlen*irate; ++i)
350 {
351 values[i] = double_to_string(rec[varoffset+i]);
352 }
353 variable["name"] = var->getName();
354 variable["values"] = values;
355
356 size_t lagoffset = var->getLagOffset();
357 // int deltatUsec = (int)rint(USECS_PER_SEC / var->getSampleRate());
358 dsm_time_t vtime = tt;
359 if (!std::isnan(rec[lagoffset]))
360 vtime += (int) rec[lagoffset];
361 variable["time"] = Json::UInt64(vtime);
362 variable["lagoffset"] = double_to_string(rec[lagoffset]);
363 return variable;
364}
365
366inline
367void
368write_sync_record_as_json(std::ostream& json, dsm_time_t tt,
369 const double* record, int nvalues,
370 std::vector<const SyncRecordVariable*>& vars)
371{
372 Json::Value root;
373 root["time"] = Json::UInt64(tt);
374 root["numValues"] = nvalues;
375 Json::Value& data = root["data"];
376 std::vector<const SyncRecordVariable*>::const_iterator it;
377 for (it = vars.begin(); it != vars.end(); ++it)
378 {
379 Json::Value variable = write_sync_variable_as_json(*it, tt, record);
380 data.append(variable);
381 }
382 json << root;
383}
384#endif
385
386}}} // namespace nidas namespace dynld namespace raf
387
388#endif
A channel for Input or Output of data.
Definition IOChannel.h:65
Pure virtual interface of a client of Samples.
Definition SampleClient.h:38
Interface to a data sample.
Definition Sample.h:190
Class describing a sampled variable.
Definition Variable.h:47
An implementation of a SampleInput.
Definition SampleInputStream.h:173
Definition SyncRecordReader.h:46
SyncRecHeaderException(const std::string &expect, const std::string &got)
Definition SyncRecordReader.h:48
SyncRecHeaderException(const SyncRecHeaderException &x)
Copy constructor.
Definition SyncRecordReader.h:62
SyncRecHeaderException(const std::string &msg)
Definition SyncRecordReader.h:54
SyncRecordReader handles sync samples and provides an interface to access Variables and read sync rec...
Definition SyncRecordReader.h:86
std::string flightName
Definition SyncRecordReader.h:228
std::list< SampleTag * > sampleTags
Definition SyncRecordReader.h:216
SampleInputStream * inputStream
Definition SyncRecordReader.h:199
size_t getNumValues() const
Get number of data values in a sync record.
Definition SyncRecordReader.h:134
const std::string & getSoftwareVersion() const
Definition SyncRecordReader.h:110
nidas::util::Cond _qcond
Definition SyncRecordReader.h:238
const SyncRecordVariable * getVariable(const std::string &name) const
Get a pointer to a SyncRecordVariable, searching by name.
Definition SyncRecordReader.cc:641
std::string softwareVersion
Definition SyncRecordReader.h:230
size_t read(dsm_time_t *tt, double *ptr, size_t len)
Read a sync record.
Definition SyncRecordReader.cc:567
const std::string & getTailNumber() const
Definition SyncRecordReader.h:106
SyncRecordReader(const SyncRecordReader &)
No copying.
std::list< const SyncRecordVariable * > variables
Definition SyncRecordReader.h:218
int getLagOffset(const nidas::core::Variable *var)
Definition SyncRecordReader.cc:728
void scanHeader(const Sample *samp)
Definition SyncRecordReader.cc:207
std::string _sampleStreamConfigName
Definition SyncRecordReader.h:244
const std::string & getConfigName()
After creating a SyncRecordReader on a socket, this method returns the config name from the SampleInp...
Definition SyncRecordReader.h:185
const std::string & getProjectName() const
Definition SyncRecordReader.h:104
std::map< std::string, const SyncRecordVariable * > variableMap
Definition SyncRecordReader.h:220
SyncServer * syncServer
Definition SyncRecordReader.h:200
std::string projectName
Definition SyncRecordReader.h:224
SyncRecHeaderException * headException
Definition SyncRecordReader.h:214
const std::string & textHeader()
Definition SyncRecordReader.h:150
bool _read_sync_server
When true, explicitly read from the SyncServer, if given.
Definition SyncRecordReader.h:205
bool _debug
Definition SyncRecordReader.h:234
const Sample * nextSample()
Definition SyncRecordReader.cc:652
SyncRecordReader & operator=(const SyncRecordReader &)
No assignment.
SyncRecordReader(IOChannel *iochan)
Constructor of a SyncRecordReader to a connected IOChannel.
Definition SyncRecordReader.cc:64
size_t numDataValues
Definition SyncRecordReader.h:222
time_t startTime
Definition SyncRecordReader.h:232
int getSyncRecOffset(const nidas::core::Variable *var)
Definition SyncRecordReader.cc:758
std::string _header
Definition SyncRecordReader.h:236
time_t getStartTime() const
Get UNIX time of the start time of data in the SyncRecords.
Definition SyncRecordReader.h:115
virtual ~SyncRecordReader()
Definition SyncRecordReader.cc:166
void readKeyedQuotedValues(std::istringstream &header)
Definition SyncRecordReader.cc:541
std::deque< const Sample * > _syncRecords
Place to stash sample records received as a SampleClient.
Definition SyncRecordReader.h:242
bool _eoq
Definition SyncRecordReader.h:239
void endOfStream()
Signal the end of the sample stream, meaning EOF is reached once the queue is empty.
Definition SyncRecordReader.cc:716
virtual bool receive(const Sample *s)
Method called to pass a sample to this client.
Definition SyncRecordReader.cc:682
std::string getQuotedString(std::istringstream &str)
Definition SyncRecordReader.cc:532
void init()
Definition SyncRecordReader.cc:112
const std::list< const SyncRecordVariable * > getVariables()
Get the list of variables in a sync record.
Definition SyncRecordReader.cc:632
const std::string & getFlightName() const
Definition SyncRecordReader.h:108
std::string aircraftName
Definition SyncRecordReader.h:226
virtual void flush()
Ask that this SampleClient send out any buffered Samples that it may be holding.
Definition SyncRecordReader.cc:707
A Variable associated with a SyncRecord.
Definition SyncRecordVariable.h:39
Definition SyncServer.h:58
A wrapper class for a Posix condition variable.
Definition ThreadSupport.h:258
Definition Exception.h:35
Exception(const std::string &type, const std::string &n, const std::string &m)
Definition Exception.h:41
Sample * getSample(sampleType type, unsigned int len)
A convienence method for getting a sample of an enumerated type from a pool.
Definition Sample.cc:70
long long dsm_time_t
Posix time in microseconds, the number of non-leap microseconds since 1970 Jan 1 00:00 UTC.
Definition Sample.h:62
Root namespace for the NCAR In-Situ Data Acquisition Software.
Definition A2DConverter.h:31
int len
Definition sing.cc:948