nidas  v1.2-1520
NetcdfRPCChannel.h
Go to the documentation of this file.
1 // -*- mode: C++; indent-tabs-mode: nil; c-basic-offset: 4; tab-width: 4; -*-
2 // vim: set shiftwidth=4 softtabstop=4 expandtab:
3 /*
4  ********************************************************************
5  ** NIDAS: NCAR In-situ Data Acquistion Software
6  **
7  ** 2006, Copyright University Corporation for Atmospheric Research
8  **
9  ** This program is free software; you can redistribute it and/or modify
10  ** it under the terms of the GNU General Public License as published by
11  ** the Free Software Foundation; either version 2 of the License, or
12  ** (at your option) any later version.
13  **
14  ** This program is distributed in the hope that it will be useful,
15  ** but WITHOUT ANY WARRANTY; without even the implied warranty of
16  ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  ** GNU General Public License for more details.
18  **
19  ** The LICENSE.txt file accompanying this software contains
20  ** a copy of the GNU General Public License. If it is not found,
21  ** write to the Free Software Foundation, Inc.,
22  ** 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23  **
24  ********************************************************************
25 */
26 
27 #include <nidas/Config.h>
28 
29 #ifdef HAVE_LIBNC_SERVER_RPC
30 
31 #ifndef NIDAS_DYNLD_ISFF_NETCDFRPCCHANNEL_H
32 #define NIDAS_DYNLD_ISFF_NETCDFRPCCHANNEL_H
33 
34 #include <nidas/core/IOChannel.h>
35 #include <nidas/core/SampleTag.h>
36 #include <nidas/core/Parameter.h>
37 
38 #include <nc_server_rpc.h>
39 
40 #include <string>
41 #include <iostream>
42 #include <vector>
43 
44 namespace nidas { namespace dynld { namespace isff {
45 
46 using namespace nidas::core;
47 
48 class NcVarGroupFloat;
49 
54 class NetcdfRPCChannel: public IOChannel {
55 
56 public:
57 
61  NetcdfRPCChannel();
62 
66  ~NetcdfRPCChannel();
67 
71  NetcdfRPCChannel* clone() const { return new NetcdfRPCChannel(*this); }
72 
76  void requestConnection(IOChannelRequester* rqstr)
78 
82  IOChannel* connect() throw(nidas::util::IOException);
83 
84  void setNonBlocking(bool val __attribute__ ((unused))) throw(nidas::util::IOException)
85  {
86  // ignore for now.
87  }
88 
89  bool isNonBlocking() const throw(nidas::util::IOException)
90  {
91  return true;
92  }
93 
94  virtual bool isNewFile() const { return false; }
95 
99  size_t read(void*, size_t) throw (nidas::util::IOException)
100  {
101  throw nidas::util::IOException(getName(),"read","not supported");
102  }
103 
107  size_t write(const void*, size_t) throw (nidas::util::IOException)
108  {
109  throw nidas::util::IOException(getName(),"default write","not supported");
110  }
111 
115  size_t write(const struct iovec*, int) throw (nidas::util::IOException)
116  {
117  throw nidas::util::IOException(getName(),"default write","not supported");
118  }
119 
123  void write(const Sample*) throw (nidas::util::IOException);
124 
128  void write(datarec_float*) throw (nidas::util::IOException);
129 
130  void close() throw (nidas::util::IOException);
131 
132  int getFd() const
133  {
134  return -1;
135  }
136 
137  const std::string& getName() const
138  {
139  return _name;
140  }
141 
142  void setName(const std::string& val);
143 
144  const std::string& getServer() const { return _server; }
145 
146  void setServer(const std::string& val);
147 
148  const std::string& getFileNameFormat() const { return _fileNameFormat; }
149 
150  void setFileNameFormat(const std::string& val);
151 
152  const std::string& getDirectory() const { return _directory; }
153 
154  void setDirectory(const std::string& val);
155 
156  const std::string& getCDLFileName() const { return _cdlFileName; }
157 
158  void setCDLFileName(const std::string& val) { _cdlFileName = val; }
159 
160  void setFillValue(float val) { _fillValue = val; }
161 
162  float getFillValue() const { return _fillValue; }
163 
168  void setTimeInterval(int val)
169  {
170  _timeInterval = val;
171  }
172 
173  int getTimeInterval() const
174  {
175  return _timeInterval;
176  }
177 
181  int getFileLength() const { return _fileLength; }
182 
183  void setFileLength(int val) { _fileLength = val; }
184 
189  void checkError() throw(nidas::util::IOException);
190 
191  void setRPCTimeout(int secs);
192 
193  int getRPCTimeout() const;
194 
198  void setRPCBatchPeriod(int val);
199 
200  int getRPCBatchPeriod() const;
201 
202  void fromDOMElement(const xercesc::DOMElement* node)
203  throw(nidas::util::InvalidParameterException);
204 
209  void addSampleTag(const SampleTag*);
210 
211  std::list<const SampleTag*> getSampleTags() const
212  {
213  return _constSampleTags;
214  }
215 
216  void writeGlobalAttr(const std::string& name, const std::string& value)
218 
219  void writeGlobalAttr(const std::string& name, int value)
221 
222 private:
223 
224  friend class NcVarGroupFloat;
225 
226  CLIENT* getRPCClient() { return _clnt; }
227 
228  int getConnectionId() const { return _connectionId; }
229 
230  struct timeval& getRPCWriteTimeoutVal();
231 
232  struct timeval& getRPCOtherTimeoutVal();
233 
234  struct timeval& getRPCBatchTimeoutVal();
235 
236 protected:
237 
241  NetcdfRPCChannel(const NetcdfRPCChannel&);
242 
243  void writeHistory(const std::string&) throw (nidas::util::IOException);
244 
245  void nonBatchWrite(datarec_float*) throw (nidas::util::IOException);
246 
247  NcVarGroupFloat* getNcVarGroupFloat(
248  const std::vector<ParameterT<int> >& dims,
249  const SampleTag* stag);
250 
251 private:
252 
253  std::string _name;
254 
255  std::string _server;
256 
260  std::string _fileNameFormat;
261 
262  std::string _directory;
263 
264  std::string _cdlFileName;
265 
266  float _fillValue;
267 
268  int _fileLength;
269 
270  CLIENT* _clnt;
271 
275  int _connectionId;
276 
277  int _rpcBatchPeriod;
278 
279  struct timeval _rpcWriteTimeout;
280 
281  struct timeval _rpcOtherTimeout;
282 
283  struct timeval _rpcBatchTimeout;
284 
285  int _ntry;
286 
287  static const int NTRY = 10;
288 
289  time_t _lastNonBatchWrite;
290 
291  std::map<dsm_sample_id_t,NcVarGroupFloat*> _groupById;
292 
293  std::map<dsm_sample_id_t,int> _stationIndexById;
294 
295  std::list<NcVarGroupFloat*> _groups;
296 
297  std::list<SampleTag*> _sampleTags;
298 
299  std::list<const SampleTag*> _constSampleTags;
300 
304  int _timeInterval;
305 
307  NetcdfRPCChannel& operator=(const NetcdfRPCChannel&);
308 
309 };
310 
311 class NcVarGroupFloat {
312 public:
313  NcVarGroupFloat(const std::vector<ParameterT<int> >& dims,
314  const SampleTag* stag,float fillValue);
315 
316  ~NcVarGroupFloat();
317 
318  const std::vector<const Variable*>& getVariables() const
319  {
320  return _sampleTag.getVariables();
321  }
322 
323  const std::vector<ParameterT<int> >& getDimensions()
324  {
325  return _dimensions;
326  }
327 
331  double getInterval() const { return _interval; }
332 
333 protected:
334 
335  friend class NetcdfRPCChannel;
336 
337  void connect(NetcdfRPCChannel* conn,float fillValue)
339 
340  void write(NetcdfRPCChannel* conn,const Sample* samp,
341  int stationNumber) throw(nidas::util::IOException);
342 
343 private:
344 
345  std::vector<ParameterT<int> > _dimensions;
346 
347  SampleTag _sampleTag;
348 
349  datarec_float _rec;
350 
351  int _weightsIndex;
352 
353  float _fillValue;
354 
360  double _interval;
361 
362 private:
363 
364  NcVarGroupFloat(const NcVarGroupFloat&); // prevent copying
365 
366  NcVarGroupFloat& operator =(const NcVarGroupFloat&);// prevent assignment
367 
368 };
369 
370 }}} // namespace nidas namespace dynld namespace isff
371 
372 #endif
373 #endif // HAVE_LIBNC_SERVER_RPC
static int __attribute__((__unused__)) cksum_test(int
unsigned int dsm_sample_id_t
Definition: Sample.h:63
A channel for Input or Output of data.
Definition: IOChannel.h:64
Interface to a data sample.
Definition: Sample.h:189
A typed Parameter, with data of type T.
Definition: Parameter.h:117
Definition: IOException.h:37
Class describing a group of variables that are sampled and handled together.
Definition: SampleTag.h:87
Interface for an object that requests connections to Inputs or Outputs.
Definition: IOChannel.h:54