nidas  v1.2-1520
SampleInputStream.h
Go to the documentation of this file.
1 // -*- mode: C++; indent-tabs-mode: nil; c-basic-offset: 4; tab-width: 4; -*-
2 // vim: set shiftwidth=4 softtabstop=4 expandtab:
3 /*
4  ********************************************************************
5  ** NIDAS: NCAR In-situ Data Acquistion Software
6  **
7  ** 2005, Copyright University Corporation for Atmospheric Research
8  **
9  ** This program is free software; you can redistribute it and/or modify
10  ** it under the terms of the GNU General Public License as published by
11  ** the Free Software Foundation; either version 2 of the License, or
12  ** (at your option) any later version.
13  **
14  ** This program is distributed in the hope that it will be useful,
15  ** but WITHOUT ANY WARRANTY; without even the implied warranty of
16  ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  ** GNU General Public License for more details.
18  **
19  ** The LICENSE.txt file accompanying this software contains
20  ** a copy of the GNU General Public License. If it is not found,
21  ** write to the Free Software Foundation, Inc.,
22  ** 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23  **
24  ********************************************************************
25 */
26 
27 #ifndef NIDAS_DYNLD_SAMPLEINPUTSTREAM_H
28 #define NIDAS_DYNLD_SAMPLEINPUTSTREAM_H
29 
30 #include <nidas/core/SampleInput.h>
33 #include <nidas/core/SampleStats.h>
34 #include <nidas/core/Sample.h>
36 #include <nidas/core/NidasApp.h>
37 #include <nidas/util/UTime.h>
39 
40 namespace nidas {
41 
42 namespace core {
43 class DSMService;
44 class DSMConfig;
45 class SampleTag;
46 class SampleStats;
47 class SampleClient;
48 class SampleSource;
49 class Sample;
50 class IOChannel;
51 class IOStream;
52 }
53 
54 namespace dynld {
55 
60 struct BlockStats {
61 
63 
68  BlockStats(bool goodblock=true, size_t startblock=0);
69 
77  bool
78  addGoodSample(nidas::core::Sample* samp, long long offset);
79 
89  void
90  addBadSample(long long offset, unsigned int nbadbytes);
91 
96  void
97  startBadBlock(long long offset);
98 
105  void
106  endBadBlock(nidas::core::Sample* samp, long long offset);
107 
108  inline size_t
109  blockEnd() const
110  {
111  return block_start + nbytes;
112  }
113 
123 
127  bool good;
128 
132  size_t nsamples;
133 
134  /*
135  * File position of start of this block.
136  */
137  size_t block_start;
138 
142  size_t nbytes;
143 
148  unsigned int last_good_sample_size;
149 };
150 
173 {
174 public:
176 
181  SampleInputStream(bool raw=false);
182 
191  SampleInputStream(nidas::core::IOChannel* iochannel,bool raw=false);
192 
196  virtual SampleInputStream* clone(nidas::core::IOChannel* iochannel);
197 
198  virtual ~SampleInputStream();
199 
200  std::string getName() const;
201 
202  int getFd() const;
203 
207  virtual void setIOChannel(nidas::core::IOChannel* val);
208 
212  void readInputHeader() throw(nidas::util::IOException);
213 
214  bool parseInputHeader() throw(nidas::util::IOException);
215 
216  const nidas::core::SampleInputHeader& getInputHeader() const
217  {
218  return _inputHeader;
219  }
220 
221  void requestConnection(nidas::core::DSMService*) throw(nidas::util::IOException);
222 
223  virtual nidas::core::SampleInput* getOriginal() const
224  {
225  return _original;
226  }
227 
235 
236  void setNonBlocking(bool val) throw(nidas::util::IOException);
237 
238  bool isNonBlocking() const throw(nidas::util::IOException);
239 
243  const nidas::core::DSMConfig* getDSMConfig() const;
244 
245  // void init() throw();
246 
247  void setKeepStats(bool val)
248  {
249  _source.setKeepStats(val);
250  }
251 
255  void addSampleTag(const nidas::core::SampleTag* tag) throw()
256  {
257  return _source.addSampleTag(tag);
258  }
259 
260  void removeSampleTag(const nidas::core::SampleTag* tag) throw()
261  {
263  }
264 
266  {
267  return _source.getRawSampleSource();
268  }
269 
271  {
273  }
274 
278  std::list<const nidas::core::SampleTag*> getSampleTags() const
279  {
280  return _source.getSampleTags();
281  }
282 
287  {
288  return _source.getSampleTagIterator();
289  }
290 
295  {
296  _source.addSampleClient(client);
297  }
298 
300  {
301  _source.removeSampleClient(client);
302  }
303 
309  {
310  _source.addSampleClientForTag(client,tag);
311  }
312 
314  {
315  _source.removeSampleClientForTag(client,tag);
316  }
317 
318  int getClientCount() const throw()
319  {
320  return _source.getClientCount();
321  }
322 
327  void flush() throw();
328 
329  const nidas::core::SampleStats& getSampleStats() const
330  {
331  return _source.getSampleStats();
332  }
333 
347  bool readSamples() throw(nidas::util::IOException);
348 
355  void search(const nidas::util::UTime& tt) throw(nidas::util::IOException);
356 
363  nidas::core::Sample* readSample() throw(nidas::util::IOException);
364 
365 
370  void distribute(const nidas::core::Sample* s) throw()
371  {
372  return _source.distribute(s);
373  }
374 
375  size_t getBadSamples() const { return _badSamples; }
376 
377  void close() throw(nidas::util::IOException);
378 
382  void setBadSampleFilter(const nidas::core::BadSampleFilter& bsf)
383  {
384  _bsf = bsf;
385  }
386 
390  void setFilterBadSamples(bool val)
391  {
393  }
394 
398  void setMinDsmId(int val)
399  {
400  _bsf.setMinDsmId(val);
401  }
402 
406  void setMaxDsmId(int val)
407  {
408  _bsf.setMaxDsmId(val);
409  }
410 
414  void setMinSampleLength(unsigned int val)
415  {
417  }
418 
422  void setMaxSampleLength(unsigned int val)
423  {
425  }
426 
431  {
432  _bsf.setMinSampleTime(val);
433  }
434 
439  {
440  _bsf.setMaxSampleTime(val);
441  }
442 
443  void fromDOMElement(const xercesc::DOMElement* node)
445 
446  void setExpectHeader(bool val) { _expectHeader = val; }
447 
448  bool getExpectHeader() const { return _expectHeader; }
449 
450 protected:
451 
456 
458 
460 
461 private:
462 
469  nidas::core::Sample* nextSample() throw();
470 
477  nidas::core::Sample* nextSample(bool keepreading,
478  bool searching=false,
479  dsm_time_t search_time=LONG_LONG_MIN)
480  throw(nidas::util::IOException);
481 
482  void checkUnexpectedEOF();
483 
489  struct ReadResult
490  {
491  public:
492  ReadResult(size_t ilen=0, bool inewinput=false, bool ieof=false):
493  len(ilen), newinput(inewinput), eof(ieof)
494  {}
495  size_t len;
496  bool newinput;
497  bool eof;
498  };
499 
500  ReadResult
501  readBlock(bool keepreading, char* &ptr, size_t& lentoread);
502 
503  ReadResult
504  read(bool keepreading, char* ptr, size_t lentoread);
505 
506  void
507  handleNewInput();
508 
510  handleEOF(bool keepreading);
511 
512  void closeBlocks();
513 
518 
522  nidas::core::DSMService* _service;
523 
524  nidas::core::IOStream* _iostream;
525 
526  /* mutable */ const nidas::core::DSMConfig* _dsm;
527 
529 
531 
532  nidas::core::SampleHeader _sheader;
533 
535 
536  char* _hptr;
537 
542  nidas::core::Sample* _samp;
543 
549  nidas::core::Sample* _sampPending;
550 
555  size_t _dataToRead;
556 
560  char* _dptr;
561 
569 
574 
579  size_t _badSamples;
580 
584  size_t _goodSamples;
585 
587 
588  nidas::core::BadSampleFilter _bsf;
589 
591 
592  // XXX This is no longer used and is deprecated. Someday it will be
593  // removed from here and from the constructor signatures. XXX
594  bool _raw;
595 
601  std::string _last_name;
602 
603  nidas::util::EOFException _eofx;
604  bool _ateof;
605 
610 
614  SampleInputStream& operator =(const SampleInputStream& x);
615 
616 };
617 
618 }} // namespace nidas namespace dynld
619 
620 #endif
void removeSampleTag(const nidas::core::SampleTag *tag)
Definition: SampleInputStream.h:260
void closeBlocks()
Definition: SampleInputStream.cc:354
void setKeepStats(bool val)
Definition: SampleSourceSupport.h:154
void setMaxSampleTime(const nidas::util::UTime &val)
Set the maximum sample time for a valid sample.
Definition: BadSampleFilter.cc:80
nidas::core::Sample * readSample()
Read the next sample from the InputStream.
Definition: SampleInputStream.cc:1050
std::string getName() const
Definition: SampleInputStream.cc:300
A base class for buffering data.
Definition: IOStream.h:41
bool _inputHeaderParsed
Definition: SampleInputStream.h:530
Tuple for all the possible results of iostream reads.
Definition: SampleInputStream.h:489
A class for parsing, formatting and doing operations on time, based on Unix time conventions, where leap seconds are ignored, so that there are always 60 seconds in a minute, 3600 seconds in an hour and 86400 seconds in a day.
Definition: UTime.h:76
bool getExpectHeader() const
Definition: SampleInputStream.h:448
void setMaxSampleLength(unsigned int val)
The default maximum sample length is 32768.
Definition: BadSampleFilter.cc:64
bool newinput
Definition: SampleInputStream.h:496
void removeSampleClientForTag(nidas::core::SampleClient *client, const nidas::core::SampleTag *tag)
Remove a SampleClient for a given SampleTag from this SampleSource.
Definition: SampleInputStream.h:313
virtual SampleInputStream * clone(nidas::core::IOChannel *iochannel)
Create a clone, with a new, connected IOChannel.
Definition: SampleInputStream.cc:257
An implementation of a SampleInput.
Definition: SampleInputStream.h:172
nidas::core::SampleSource * getRawSampleSource()
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
Definition: SampleInputStream.h:265
size_t blockEnd() const
Definition: SampleInputStream.h:109
void handleNewInput()
Whenever we are at the start of a new input, we need to start over with parsing the input header...
Definition: SampleInputStream.cc:497
void addSampleClientForTag(nidas::core::SampleClient *client, const nidas::core::SampleTag *tag)
Add a Client for a given SampleTag.
Definition: SampleInputStream.h:308
SampleInputStream(bool raw=false)
Constructor.
Definition: SampleInputStream.cc:193
void setMinSampleLength(unsigned int val)
The default minimum sample length is 1.
Definition: BadSampleFilter.cc:56
size_t _goodSamples
Number of good samples in the stream so far.
Definition: SampleInputStream.h:584
The gcc buildin atomic operations are not supported on arm, and one must use -march=i686 for them to ...
Definition: Sample.h:106
int getFd() const
Definition: SampleInputStream.cc:305
void setMinDsmId(unsigned int val)
The default minimum DSM ID for a valid sample is 1.
Definition: BadSampleFilter.cc:40
void removeSampleTag(const SampleTag *tag)
Definition: SampleSourceSupport.cc:67
nidas::core::Sample * handleEOF(bool keepreading)
When EOF has been caught, then either we need to return the last pending sample, or else we need to t...
Definition: SampleInputStream.cc:721
void setMinSampleTime(const nidas::util::UTime &val)
See BadSampleFilter.
Definition: SampleInputStream.h:430
int getClientCount() const
How many SampleClients are currently in my list.
Definition: SampleInputStream.h:318
long long dsm_time_t
Posix time in microseconds, the number of non-leap microseconds since 1970 Jan 1 00:00 UTC...
Definition: Sample.h:61
nidas::core::SampleInputHeader _inputHeader
Definition: SampleInputStream.h:586
void setMinSampleLength(unsigned int val)
See BadSampleFilter.
Definition: SampleInputStream.h:414
void setMaxDsmId(unsigned int val)
The default maximum DSM ID for a valid sample is 1024.
Definition: BadSampleFilter.cc:48
void checkUnexpectedEOF()
Definition: SampleInputStream.cc:560
Class that should include all that is configurable about a DSM.
Definition: DSMConfig.h:55
Pure virtual interface for a source of Samples.
Definition: SampleSource.h:48
void setExpectHeader(bool val)
Definition: SampleInputStream.h:446
SampleTagIterator getSampleTagIterator() const
Definition: SampleSourceSupport.cc:75
nidas::core::SampleSourceSupport _source
Definition: SampleInputStream.h:459
unsigned int last_good_sample_size
Size of the last good sample in a good sample block.
Definition: SampleInputStream.h:148
Keep track of statistics for a contiguous block of good or bad samples in a stream.
Definition: SampleInputStream.h:60
void addSampleTag(const nidas::core::SampleTag *tag)
Implementation of SampleInput::addSampleTag().
Definition: SampleInputStream.h:255
nidas::core::dsm_time_t dsm_time_t
Definition: SampleInputStream.h:62
std::string _last_name
Keep the current name of the input stream so it can be referenced even after the input stream has adv...
Definition: SampleInputStream.h:601
nidas::core::IOChannel * _iochan
Definition: SampleInputStream.h:457
std::list< const nidas::core::SampleTag * > getSampleTags() const
Get the output SampleTags.
Definition: SampleInputStream.h:278
void setMinDsmId(int val)
See BadSampleFilter.
Definition: SampleInputStream.h:398
void readInputHeader()
Read archive information at beginning of input stream or file.
Definition: SampleInputStream.cc:404
nidas::core::dsm_time_t dsm_time_t
Definition: SampleInputStream.h:175
A source of samples.
Definition: SampleSourceSupport.h:47
Definition: SampleInputHeader.h:36
void removeSampleClient(SampleClient *c)
Remove a SampleClient from this SampleSource This will also remove a SampleClient if it has been adde...
Definition: SampleSourceSupport.cc:88
nidas::util::EOFException _eofx
Definition: SampleInputStream.h:603
Implement rules for filtering bad samples.
Definition: BadSampleFilter.h:22
void removeSampleClientForTag(SampleClient *c, const SampleTag *)
Add a SampleClient to this SampleSource.
Definition: SampleSourceSupport.cc:121
void setFilterBadSamples(bool val)
Enable filtering of bad samples.
Definition: BadSampleFilter.cc:33
void addSampleClient(SampleClient *c)
Add a SampleClient to this SampleSource.
Definition: SampleSourceSupport.cc:80
char * _hptr
Definition: SampleInputStream.h:536
void setFilterBadSamples(bool val)
See BadSampleFilter.
Definition: SampleInputStream.h:390
dsm_time_t start_time
Sample times bounding this block.
Definition: SampleInputStream.h:121
Pure virtual interface of a client of Samples.
Definition: SampleClient.h:38
bool _ateof
Definition: SampleInputStream.h:604
size_t _badSamples
Number of bad samples in the stream so far, which is to say number of bytes checked which did not con...
Definition: SampleInputStream.h:579
const SampleStats & getSampleStats() const
Definition: SampleSourceSupport.h:149
nidas::core::Sample * _samp
Will be non-null if we have previously read part of a sample from the stream.
Definition: SampleInputStream.h:542
bool good
True if this is a block of good samples.
Definition: SampleInputStream.h:127
nidas::core::SampleSource * getProcessedSampleSource()
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
Definition: SampleInputStream.h:270
size_t getBadSamples() const
Definition: SampleInputStream.h:375
void endBadBlock(nidas::core::Sample *samp, long long offset)
Mark the end of a bad block by assigning the end_time and setting nbytes according to the current off...
Definition: SampleInputStream.cc:128
nidas::core::SampleInput * connected(nidas::core::IOChannel *iochan)
Implementation of IOChannelRequester::connected.
Definition: SampleInputStream.cc:330
void fromDOMElement(const xercesc::DOMElement *node)
Initialize myself from a xercesc::DOMElement.
Definition: SampleInputStream.cc:1067
A channel for Input or Output of data.
Definition: IOChannel.h:64
size_t len
Definition: SampleInputStream.h:495
const nidas::core::SampleStats & getSampleStats() const
Definition: SampleInputStream.h:329
ReadResult(size_t ilen=0, bool inewinput=false, bool ieof=false)
Definition: SampleInputStream.h:492
ReadResult read(bool keepreading, char *ptr, size_t lentoread)
All reads of the iostream go through here.
Definition: SampleInputStream.cc:599
void setBadSampleFilter(const nidas::core::BadSampleFilter &bsf)
Replace the bad sample filter rules for this stream with bsf.
Definition: SampleInputStream.h:382
nidas::core::Sample * sampleFromHeader()
Check the current header for validity and generate a sample for it.
Definition: SampleInputStream.cc:928
dsm_time_t end_time
Definition: SampleInputStream.h:122
size_t block_start
Definition: SampleInputStream.h:137
nidas::core::DSMService * _service
Service that has requested my input.
Definition: SampleInputStream.h:522
void addSampleClient(nidas::core::SampleClient *client)
Implementation of SampleSource::addSampleClient().
Definition: SampleInputStream.h:294
void close()
Definition: SampleInputStream.cc:387
size_t nbytes
Size in bytes of this block.
Definition: SampleInputStream.h:142
void setMinSampleTime(const nidas::util::UTime &val)
Set the minimum sample time for a valid sample.
Definition: BadSampleFilter.cc:72
void setMaxSampleTime(const nidas::util::UTime &val)
See BadSampleFilter.
Definition: SampleInputStream.h:438
void search(const nidas::util::UTime &tt)
Search forward until a sample header is read whose time is greater than or equal to tt...
Definition: SampleInputStream.cc:1058
int len
Definition: sing.cc:934
Interface to a data sample.
Definition: Sample.h:189
const nidas::core::SampleInputHeader & getInputHeader() const
Definition: SampleInputStream.h:216
bool isNonBlocking() const
Definition: SampleInputStream.cc:294
void setKeepStats(bool val)
Definition: SampleInputStream.h:247
nidas::core::SampleTagIterator getSampleTagIterator() const
Implementation of SampleSource::getSampleTagIterator().
Definition: SampleInputStream.h:286
char * _dptr
Pointer into the data portion of samp where we will read next.
Definition: SampleInputStream.h:560
nidas::core::IOStream * _iostream
Definition: SampleInputStream.h:524
SampleSource * getRawSampleSource()
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
Definition: SampleSourceSupport.h:60
size_t _dataToRead
How many bytes left to read from the stream into the data portion of samp.
Definition: SampleInputStream.h:555
Definition: IOException.h:37
Class for iterating over the SampleTags of a Project, Site, DSMConfig, or a SampleSource.
Definition: NidsIterators.h:217
void setMaxDsmId(int val)
See BadSampleFilter.
Definition: SampleInputStream.h:406
nidas::core::SampleHeader _sheader
Definition: SampleInputStream.h:532
bool parseInputHeader()
Definition: SampleInputStream.cc:428
void removeSampleClient(nidas::core::SampleClient *client)
Remove a SampleClient from this SampleSource.
Definition: SampleInputStream.h:299
void setNonBlocking(bool val)
Definition: SampleInputStream.cc:289
Class describing a group of variables that are sampled and handled together.
Definition: SampleTag.h:87
void distribute(const Sample *s)
Distribute a sample to my clients, calling the receive() method of each client, passing the const poi...
Definition: SampleSourceSupport.cc:141
void addSampleClientForTag(SampleClient *c, const SampleTag *)
Add a SampleClient to this SampleSource.
Definition: SampleSourceSupport.cc:101
BlockStats _block
Information about the current block of samples, good or bad.
Definition: SampleInputStream.h:573
bool eof
Definition: SampleInputStream.h:497
SampleInputStream * _original
Definition: SampleInputStream.h:590
virtual nidas::core::SampleInput * getOriginal() const
Definition: SampleInputStream.h:223
A source of samples.
Definition: SampleStats.h:41
void flush()
Implementation of SampleSource::flush(), unpacks and distributes any samples currently in the read bu...
Definition: SampleInputStream.cc:311
bool _expectHeader
Definition: SampleInputStream.h:528
bool readSamples()
Read a buffer of data, serialize the data into samples, and distribute() samples to the receive() met...
Definition: SampleInputStream.cc:519
void addBadSample(long long offset, unsigned int nbadbytes)
An alternative to calling startBadBlock()/endBadBlock(), it adds nbadbytes to a bad block...
Definition: SampleInputStream.cc:101
nidas::core::Sample * nextSample()
Unpack the next sample from the InputStream.
Definition: SampleInputStream.cc:698
const nidas::core::DSMConfig * _dsm
Definition: SampleInputStream.h:526
nidas::core::Sample * _sampPending
The currently pending sample.
Definition: SampleInputStream.h:549
nidas::core::BadSampleFilter _bsf
Definition: SampleInputStream.h:588
size_t _headerToRead
Definition: SampleInputStream.h:534
int getClientCount() const
How many SampleClients are currently in my list.
Definition: SampleSourceSupport.cc:136
bool addGoodSample(nidas::core::Sample *samp, long long offset)
On a good sample, reset this block to a good block if not already, and update the last good sample an...
Definition: SampleInputStream.cc:81
std::list< const SampleTag * > getSampleTags() const
What SampleTags am I a SampleSource for?
Definition: SampleSourceSupport.cc:54
bool _raw
Definition: SampleInputStream.h:594
bool _skipSample
This is set if the data for the current sample header should be read but skipped, because the sample ...
Definition: SampleInputStream.h:568
Base class for a service, as built from a &lt;service&gt; XML tag.
Definition: DSMService.h:47
virtual void setIOChannel(nidas::core::IOChannel *val)
Set the IOChannel for this SampleInputStream.h.
Definition: SampleInputStream.cc:272
const nidas::core::DSMConfig * getDSMConfig() const
What DSM am I connnected to? May be NULL if it cannot be determined.
Definition: SampleInputStream.cc:399
SampleSource * getProcessedSampleSource()
Several objects in NIDAS can be both a SampleSource of raw Samples and processed Samples.
Definition: SampleSourceSupport.h:66
BlockStats(bool goodblock=true, size_t startblock=0)
Initialize a block as good or bad and give it an offset.
Definition: SampleInputStream.cc:69
void distribute(const nidas::core::Sample *s)
Distribute a sample to my clients.
Definition: SampleInputStream.h:370
virtual ~SampleInputStream()
Definition: SampleInputStream.cc:262
void setMaxSampleLength(unsigned int val)
See BadSampleFilter.
Definition: SampleInputStream.h:422
size_t nsamples
Number of good samples in a row.
Definition: SampleInputStream.h:132
void requestConnection(nidas::core::DSMService *)
Definition: SampleInputStream.cc:323
ReadResult readBlock(bool keepreading, char *&ptr, size_t &lentoread)
Read a block into memory, updating the given block pointer and length counter accordingly.
Definition: SampleInputStream.cc:676
Definition: InvalidParameterException.h:35
void startBadBlock(long long offset)
Start a bad block.
Definition: SampleInputStream.cc:115
void addSampleTag(const SampleTag *tag)
Add a SampleTag to this SampleSource.
Definition: SampleSourceSupport.cc:60
Interface of an input SampleSource.
Definition: SampleInput.h:47