nidas v1.2.3
SampleInputStream.h
Go to the documentation of this file.
1// -*- mode: C++; indent-tabs-mode: nil; c-basic-offset: 4; tab-width: 4; -*-
2// vim: set shiftwidth=4 softtabstop=4 expandtab:
3/*
4 ********************************************************************
5 ** NIDAS: NCAR In-situ Data Acquistion Software
6 **
7 ** 2005, Copyright University Corporation for Atmospheric Research
8 **
9 ** This program is free software; you can redistribute it and/or modify
10 ** it under the terms of the GNU General Public License as published by
11 ** the Free Software Foundation; either version 2 of the License, or
12 ** (at your option) any later version.
13 **
14 ** This program is distributed in the hope that it will be useful,
15 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
16 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 ** GNU General Public License for more details.
18 **
19 ** The LICENSE.txt file accompanying this software contains
20 ** a copy of the GNU General Public License. If it is not found,
21 ** write to the Free Software Foundation, Inc.,
22 ** 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23 **
24 ********************************************************************
25*/
26
27#ifndef NIDAS_DYNLD_SAMPLEINPUTSTREAM_H
28#define NIDAS_DYNLD_SAMPLEINPUTSTREAM_H
29
34#include <nidas/core/Sample.h>
36#include <nidas/core/NidasApp.h>
37#include <nidas/util/UTime.h>
39
40namespace nidas {
41
42namespace core {
43class DSMService;
44class DSMConfig;
45class SampleTag;
46class SampleStats;
47class SampleClient;
48class SampleSource;
49class Sample;
50class IOChannel;
51class IOStream;
52}
53
54namespace dynld {
55
60struct BlockStats {
61
63
68 BlockStats(bool goodblock=true, size_t startblock=0);
69
77 bool
78 addGoodSample(nidas::core::Sample* samp, long long offset);
79
89 void
90 addBadSample(long long offset, unsigned int nbadbytes);
91
96 void
97 startBadBlock(long long offset);
98
105 void
106 endBadBlock(nidas::core::Sample* samp, long long offset);
107
108 inline size_t
109 blockEnd() const
110 {
111 return block_start + nbytes;
112 }
113
123
127 bool good;
128
132 size_t nsamples;
133
134 /*
135 * File position of start of this block.
136 */
138
142 size_t nbytes;
143
149};
150
173{
174public:
176
181 SampleInputStream(bool raw=false);
182
191 SampleInputStream(nidas::core::IOChannel* iochannel,bool raw=false);
192
197
198 virtual ~SampleInputStream();
199
200 std::string getName() const;
201
202 int getFd() const;
203
207 virtual void setIOChannel(nidas::core::IOChannel* val);
208
214 void readInputHeader();
215
219 bool parseInputHeader();
220
222 {
223 return _inputHeader;
224 }
225
230
232 {
233 return _original;
234 }
235
243
247 void setNonBlocking(bool val);
248
252 bool isNonBlocking() const;
253
258
259 // void init() throw();
260
261 void setKeepStats(bool val)
262 {
264 }
265
269 void addSampleTag(const nidas::core::SampleTag* tag) throw()
270 {
271 return _source.addSampleTag(tag);
272 }
273
275 {
277 }
278
283
288
292 std::list<const nidas::core::SampleTag*> getSampleTags() const
293 {
294 return _source.getSampleTags();
295 }
296
304
309 {
310 _source.addSampleClient(client);
311 }
312
314 {
316 }
317
323 {
324 _source.addSampleClientForTag(client,tag);
325 }
326
331
332 int getClientCount() const throw()
333 {
334 return _source.getClientCount();
335 }
336
341 void flush() throw();
342
343 const nidas::core::SampleStats& getSampleStats() const
344 {
345 return _source.getSampleStats();
346 }
347
363 bool readSamples();
364
373 void search(const nidas::util::UTime& tt);
374
384
385
390 void distribute(const nidas::core::Sample* s) throw()
391 {
392 return _source.distribute(s);
393 }
394
395 size_t getBadSamples() const { return _badSamples; }
396
400 void close();
401
406 {
407 _bsf = bsf;
408 }
409
413 void setFilterBadSamples(bool val)
414 {
416 }
417
421 void setMinDsmId(int val)
422 {
423 _bsf.setMinDsmId(val);
424 }
425
429 void setMaxDsmId(int val)
430 {
431 _bsf.setMaxDsmId(val);
432 }
433
437 void setMinSampleLength(unsigned int val)
438 {
440 }
441
445 void setMaxSampleLength(unsigned int val)
446 {
448 }
449
454 {
456 }
457
462 {
464 }
465
469 void fromDOMElement(const xercesc::DOMElement* node);
470
471 void setExpectHeader(bool val) { _expectHeader = val; }
472
473 bool getExpectHeader() const { return _expectHeader; }
474
475protected:
476
481
483
485
486private:
487
495
504 nidas::core::Sample* nextSample(bool keepreading,
505 bool searching=false,
506 dsm_time_t search_time=LONG_LONG_MIN);
507
508 void checkUnexpectedEOF();
509
516 {
517 public:
518 ReadResult(size_t ilen=0, bool inewinput=false, bool ieof=false):
519 len(ilen), newinput(inewinput), eof(ieof)
520 {}
521 size_t len;
523 bool eof;
524 };
525
527 readBlock(bool keepreading, char* &ptr, size_t& lentoread);
528
530 read(bool keepreading, char* ptr, size_t lentoread);
531
532 void
534
536 handleEOF(bool keepreading);
537
538 void closeBlocks();
539
544
549
551
552 /* mutable */ const nidas::core::DSMConfig* _dsm;
553
555
557
559
561
562 char* _hptr;
563
569
576
582
586 char* _dptr;
587
595
600
606
611
613
615
617
618 // XXX This is no longer used and is deprecated. Someday it will be
619 // removed from here and from the constructor signatures. XXX
620 bool _raw;
621
627 std::string _last_name;
628
629 nidas::util::EOFException _eofx;
630 bool _ateof;
631
636
641
642};
643
644}} // namespace nidas namespace dynld
645
646#endif
Implement rules for filtering bad samples.
Definition BadSampleFilter.h:23
void setMaxSampleLength(unsigned int val)
The default maximum sample length is 32768.
Definition BadSampleFilter.cc:63
void setMinSampleLength(unsigned int val)
The default minimum sample length is 1.
Definition BadSampleFilter.cc:55
void setMaxSampleTime(const nidas::util::UTime &val)
Set the maximum sample time for a valid sample.
Definition BadSampleFilter.cc:79
void setMinSampleTime(const nidas::util::UTime &val)
Set the minimum sample time for a valid sample.
Definition BadSampleFilter.cc:71
void setMinDsmId(unsigned int val)
The default minimum DSM ID for a valid sample is 1.
Definition BadSampleFilter.cc:39
void setMaxDsmId(unsigned int val)
The default maximum DSM ID for a valid sample is 1024.
Definition BadSampleFilter.cc:47
void setFilterBadSamples(bool val)
Enable filtering of bad samples.
Definition BadSampleFilter.cc:32
Class that should include all that is configurable about a DSM.
Definition DSMConfig.h:55
Base class for a service, as built from a <service> XML tag.
Definition DSMService.h:48
A channel for Input or Output of data.
Definition IOChannel.h:65
A base class for buffering data.
Definition IOStream.h:41
Pure virtual interface of a client of Samples.
Definition SampleClient.h:38
The gcc buildin atomic operations are not supported on arm, and one must use -march=i686 for them to ...
Definition Sample.h:107
Definition SampleInputHeader.h:37
Interface of an input SampleSource.
Definition SampleInput.h:48
A source of samples.
Definition SampleSourceSupport.h:47
void removeSampleClient(SampleClient *c)
Remove a SampleClient from this SampleSource This will also remove a SampleClient if it has been adde...
Definition SampleSourceSupport.cc:88
SampleSource * getRawSampleSource()
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
Definition SampleSourceSupport.h:60
void removeSampleClientForTag(SampleClient *c, const SampleTag *)
Add a SampleClient to this SampleSource.
Definition SampleSourceSupport.cc:121
void distribute(const Sample *s)
Distribute a sample to my clients, calling the receive() method of each client, passing the const poi...
Definition SampleSourceSupport.cc:141
void addSampleClientForTag(SampleClient *c, const SampleTag *)
Add a SampleClient to this SampleSource.
Definition SampleSourceSupport.cc:101
void addSampleClient(SampleClient *c)
Add a SampleClient to this SampleSource.
Definition SampleSourceSupport.cc:80
SampleSource * getProcessedSampleSource()
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
Definition SampleSourceSupport.h:66
void setKeepStats(bool val)
Definition SampleSourceSupport.h:154
SampleTagIterator getSampleTagIterator() const
Definition SampleSourceSupport.cc:75
int getClientCount() const
How many SampleClients are currently in my list.
Definition SampleSourceSupport.cc:136
void removeSampleTag(const SampleTag *tag)
Definition SampleSourceSupport.cc:67
const SampleStats & getSampleStats() const
Definition SampleSourceSupport.h:149
void addSampleTag(const SampleTag *tag)
Add a SampleTag to this SampleSource.
Definition SampleSourceSupport.cc:60
std::list< const SampleTag * > getSampleTags() const
What SampleTags am I a SampleSource for?
Definition SampleSourceSupport.cc:54
Pure virtual interface for a source of Samples.
Definition SampleSource.h:48
A source of samples.
Definition SampleStats.h:41
Class for iterating over the SampleTags of a Project, Site, DSMConfig, or a SampleSource.
Definition NidsIterators.h:218
Class describing a group of variables that are sampled and handled together.
Definition SampleTag.h:88
Interface to a data sample.
Definition Sample.h:190
An implementation of a SampleInput.
Definition SampleInputStream.h:173
void setMinSampleTime(const nidas::util::UTime &val)
See BadSampleFilter.
Definition SampleInputStream.h:453
bool _ateof
Definition SampleInputStream.h:630
void search(const nidas::util::UTime &tt)
Search forward until a sample header is read whose time is greater than or equal to tt.
Definition SampleInputStream.cc:1056
void addSampleClient(nidas::core::SampleClient *client)
Implementation of SampleSource::addSampleClient().
Definition SampleInputStream.h:308
nidas::core::Sample * handleEOF(bool keepreading)
When EOF has been caught, then either we need to return the last pending sample, or else we need to t...
Definition SampleInputStream.cc:720
void setFilterBadSamples(bool val)
See BadSampleFilter.
Definition SampleInputStream.h:413
bool _raw
Definition SampleInputStream.h:620
nidas::core::Sample * nextSample()
Unpack the next sample from the InputStream.
Definition SampleInputStream.cc:697
nidas::core::Sample * _sampPending
The currently pending sample.
Definition SampleInputStream.h:575
void setMinSampleLength(unsigned int val)
See BadSampleFilter.
Definition SampleInputStream.h:437
void distribute(const nidas::core::Sample *s)
Distribute a sample to my clients.
Definition SampleInputStream.h:390
bool getExpectHeader() const
Definition SampleInputStream.h:473
void removeSampleTag(const nidas::core::SampleTag *tag)
Definition SampleInputStream.h:274
size_t _goodSamples
Number of good samples in the stream so far.
Definition SampleInputStream.h:610
std::string getName() const
Definition SampleInputStream.cc:300
size_t _headerToRead
Definition SampleInputStream.h:560
void addSampleTag(const nidas::core::SampleTag *tag)
Implementation of SampleInput::addSampleTag().
Definition SampleInputStream.h:269
bool _skipSample
This is set if the data for the current sample header should be read but skipped, because the sample ...
Definition SampleInputStream.h:594
const nidas::core::DSMConfig * _dsm
Definition SampleInputStream.h:552
nidas::core::SampleInput * connected(nidas::core::IOChannel *iochan)
Implementation of IOChannelRequester::connected.
Definition SampleInputStream.cc:329
void closeBlocks()
Definition SampleInputStream.cc:353
nidas::core::Sample * sampleFromHeader()
Check the current header for validity and generate a sample for it.
Definition SampleInputStream.cc:926
nidas::core::SampleTagIterator getSampleTagIterator() const
Implementation of SampleSource::getSampleTagIterator().
Definition SampleInputStream.h:300
int getClientCount() const
How many SampleClients are currently in my list.
Definition SampleInputStream.h:332
void setKeepStats(bool val)
Definition SampleInputStream.h:261
size_t _dataToRead
How many bytes left to read from the stream into the data portion of samp.
Definition SampleInputStream.h:581
int getFd() const
Definition SampleInputStream.cc:305
void setNonBlocking(bool val)
Definition SampleInputStream.cc:289
void addSampleClientForTag(nidas::core::SampleClient *client, const nidas::core::SampleTag *tag)
Add a Client for a given SampleTag.
Definition SampleInputStream.h:322
void fromDOMElement(const xercesc::DOMElement *node)
Definition SampleInputStream.cc:1065
bool _inputHeaderParsed
Definition SampleInputStream.h:556
nidas::core::SampleInputHeader _inputHeader
Definition SampleInputStream.h:612
nidas::util::EOFException _eofx
Definition SampleInputStream.h:629
const nidas::core::SampleStats & getSampleStats() const
Definition SampleInputStream.h:343
char * _hptr
Definition SampleInputStream.h:562
virtual ~SampleInputStream()
Definition SampleInputStream.cc:262
SampleInputStream * _original
Definition SampleInputStream.h:616
void readInputHeader()
Read archive information at beginning of input stream or file.
Definition SampleInputStream.cc:403
nidas::core::BadSampleFilter _bsf
Definition SampleInputStream.h:614
BlockStats _block
Information about the current block of samples, good or bad.
Definition SampleInputStream.h:599
std::list< const nidas::core::SampleTag * > getSampleTags() const
Get the output SampleTags.
Definition SampleInputStream.h:292
void setMaxSampleTime(const nidas::util::UTime &val)
See BadSampleFilter.
Definition SampleInputStream.h:461
virtual void setIOChannel(nidas::core::IOChannel *val)
Set the IOChannel for this SampleInputStream.h.
Definition SampleInputStream.cc:272
const nidas::core::DSMConfig * getDSMConfig() const
What DSM am I connnected to? May be NULL if it cannot be determined.
Definition SampleInputStream.cc:398
ReadResult readBlock(bool keepreading, char *&ptr, size_t &lentoread)
Read a block into memory, updating the given block pointer and length counter accordingly.
Definition SampleInputStream.cc:675
void setBadSampleFilter(const nidas::core::BadSampleFilter &bsf)
Replace the bad sample filter rules for this stream with bsf.
Definition SampleInputStream.h:405
std::string _last_name
Keep the current name of the input stream so it can be referenced even after the input stream has a...
Definition SampleInputStream.h:627
bool isNonBlocking() const
Definition SampleInputStream.cc:294
nidas::core::Sample * _samp
Will be non-null if we have previously read part of a sample from the stream.
Definition SampleInputStream.h:568
void checkUnexpectedEOF()
Definition SampleInputStream.cc:559
nidas::core::IOChannel * _iochan
Definition SampleInputStream.h:482
void setExpectHeader(bool val)
Definition SampleInputStream.h:471
nidas::core::DSMService * _service
Service that has requested my input.
Definition SampleInputStream.h:548
char * _dptr
Pointer into the data portion of samp where we will read next.
Definition SampleInputStream.h:586
virtual SampleInputStream * clone(nidas::core::IOChannel *iochannel)
Create a clone, with a new, connected IOChannel.
Definition SampleInputStream.cc:257
nidas::core::SampleHeader _sheader
Definition SampleInputStream.h:558
nidas::core::SampleSource * getRawSampleSource()
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
Definition SampleInputStream.h:279
size_t _badSamples
Number of bad samples in the stream so far, which is to say number of bytes checked which did not con...
Definition SampleInputStream.h:605
void removeSampleClientForTag(nidas::core::SampleClient *client, const nidas::core::SampleTag *tag)
Remove a SampleClient for a given SampleTag from this SampleSource.
Definition SampleInputStream.h:327
virtual nidas::core::SampleInput * getOriginal() const
Definition SampleInputStream.h:231
nidas::core::Sample * readSample()
Read the next sample from the InputStream.
Definition SampleInputStream.cc:1048
void setMaxSampleLength(unsigned int val)
See BadSampleFilter.
Definition SampleInputStream.h:445
void close()
Definition SampleInputStream.cc:386
void flush()
Implementation of SampleSource::flush(), unpacks and distributes any samples currently in the read bu...
Definition SampleInputStream.cc:311
nidas::core::SampleSource * getProcessedSampleSource()
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
Definition SampleInputStream.h:284
void handleNewInput()
Whenever we are at the start of a new input, we need to start over with parsing the input header.
Definition SampleInputStream.cc:496
void removeSampleClient(nidas::core::SampleClient *client)
Remove a SampleClient from this SampleSource.
Definition SampleInputStream.h:313
nidas::core::SampleSourceSupport _source
Definition SampleInputStream.h:484
const nidas::core::SampleInputHeader & getInputHeader() const
Definition SampleInputStream.h:221
bool readSamples()
Read a buffer of data, serialize the data into samples, and distribute() samples to the receive() met...
Definition SampleInputStream.cc:518
ReadResult read(bool keepreading, char *ptr, size_t lentoread)
All reads of the iostream go through here.
Definition SampleInputStream.cc:598
bool parseInputHeader()
Definition SampleInputStream.cc:427
void setMaxDsmId(int val)
See BadSampleFilter.
Definition SampleInputStream.h:429
nidas::core::dsm_time_t dsm_time_t
Definition SampleInputStream.h:175
bool _expectHeader
Definition SampleInputStream.h:554
SampleInputStream(bool raw=false)
Constructor.
Definition SampleInputStream.cc:193
nidas::core::IOStream * _iostream
Definition SampleInputStream.h:550
void setMinDsmId(int val)
See BadSampleFilter.
Definition SampleInputStream.h:421
size_t getBadSamples() const
Definition SampleInputStream.h:395
void requestConnection(nidas::core::DSMService *)
Definition SampleInputStream.cc:323
A class for parsing, formatting and doing operations on time, based on Unix time conventions,...
Definition UTime.h:95
Sample * getSample(sampleType type, unsigned int len)
A convienence method for getting a sample of an enumerated type from a pool.
Definition Sample.cc:70
long long dsm_time_t
Posix time in microseconds, the number of non-leap microseconds since 1970 Jan 1 00:00 UTC.
Definition Sample.h:62
Root namespace for the NCAR In-Situ Data Acquisition Software.
Definition A2DConverter.h:31
Keep track of statistics for a contiguous block of good or bad samples in a stream.
Definition SampleInputStream.h:60
BlockStats(bool goodblock=true, size_t startblock=0)
Initialize a block as good or bad and give it an offset.
Definition SampleInputStream.cc:69
nidas::core::dsm_time_t dsm_time_t
Definition SampleInputStream.h:62
size_t nbytes
Size in bytes of this block.
Definition SampleInputStream.h:142
size_t block_start
Definition SampleInputStream.h:137
void endBadBlock(nidas::core::Sample *samp, long long offset)
Mark the end of a bad block by assigning the end_time and setting nbytes according to the current off...
Definition SampleInputStream.cc:128
size_t blockEnd() const
Definition SampleInputStream.h:109
bool good
True if this is a block of good samples.
Definition SampleInputStream.h:127
void addBadSample(long long offset, unsigned int nbadbytes)
An alternative to calling startBadBlock()/endBadBlock(), it adds nbadbytes to a bad block,...
Definition SampleInputStream.cc:101
bool addGoodSample(nidas::core::Sample *samp, long long offset)
On a good sample, reset this block to a good block if not already, and update the last good sample an...
Definition SampleInputStream.cc:81
dsm_time_t start_time
Sample times bounding this block.
Definition SampleInputStream.h:121
size_t nsamples
Number of good samples in a row.
Definition SampleInputStream.h:132
unsigned int last_good_sample_size
Size of the last good sample in a good sample block.
Definition SampleInputStream.h:148
void startBadBlock(long long offset)
Start a bad block.
Definition SampleInputStream.cc:115
dsm_time_t end_time
Definition SampleInputStream.h:122
Tuple for all the possible results of iostream reads.
Definition SampleInputStream.h:516
bool eof
Definition SampleInputStream.h:523
size_t len
Definition SampleInputStream.h:521
bool newinput
Definition SampleInputStream.h:522
ReadResult(size_t ilen=0, bool inewinput=false, bool ieof=false)
Definition SampleInputStream.h:518