nidas  v1.2-1520
UDPSampleOutput.h
Go to the documentation of this file.
1 // -*- mode: C++; indent-tabs-mode: nil; c-basic-offset: 4; tab-width: 4; -*-
2 // vim: set shiftwidth=4 softtabstop=4 expandtab:
3 /*
4  ********************************************************************
5  ** NIDAS: NCAR In-situ Data Acquistion Software
6  **
7  ** 2009, Copyright University Corporation for Atmospheric Research
8  **
9  ** This program is free software; you can redistribute it and/or modify
10  ** it under the terms of the GNU General Public License as published by
11  ** the Free Software Foundation; either version 2 of the License, or
12  ** (at your option) any later version.
13  **
14  ** This program is distributed in the hope that it will be useful,
15  ** but WITHOUT ANY WARRANTY; without even the implied warranty of
16  ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  ** GNU General Public License for more details.
18  **
19  ** The LICENSE.txt file accompanying this software contains
20  ** a copy of the GNU General Public License. If it is not found,
21  ** write to the Free Software Foundation, Inc.,
22  ** 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23  **
24  ********************************************************************
25 */
26 
27 #ifndef NIDAS_DYNLD_UDPSAMPLEOUTPUT_H
28 #define NIDAS_DYNLD_UDPSAMPLEOUTPUT_H
29 
32 #include <nidas/util/Thread.h>
33 
34 #include <poll.h>
35 
36 namespace nidas {
37 
38 namespace util {
39 class Socket;
40 class ServerSocket;
41 }
42 
43 namespace core {
44 class IOChannel;
45 class MultipleUDPSockets;
46 }
47 
48 namespace dynld {
53 {
54 public:
55 
57 
59 
63  void flush() throw() {}
64 
65  void allocateBuffer(size_t len);
66 
68 
69  bool receive(const nidas::core::Sample *s) throw();
70 
71  size_t write(const struct iovec* iov,int iovcnt) throw (nidas::util::IOException);
72 
73  // void init() throw();
74 
75  void close() throw(nidas::util::IOException);
76 
80  long long getNumOutputBytes() const { return _nbytesOut; }
81 
82  void addNumOutputBytes(int val) { _nbytesOut += val; }
83 
84  void fromDOMElement(const xercesc::DOMElement* node)
86 
87 protected:
88 
94 
100 
101 private:
107  xercesc::DOMDocument* getProjectDOM() throw(xercesc::DOMException);
108 
109  void releaseProjectDOM();
110 
115  class VariableListWorker: public nidas::util::Thread
116  {
117  public:
122  nidas::util::Socket* sock,bool keepOpen);
124  int run() throw(nidas::util::Exception);
125  void interrupt();
126  private:
129  bool _keepOpen;
134  };
135 
140  {
141  public:
143 
145 
146  int run() throw(nidas::util::Exception);
147 
151  void addConnection(nidas::util::Socket*,unsigned short udpport);
152 
153  void removeConnection(nidas::util::Socket*,unsigned short udpport);
154 
160  unsigned short udpport);
161  private:
162  void updatePollfds();
164  std::list<std::pair<nidas::util::Socket*,unsigned short> > _pendingSockets;
165  std::list<std::pair<nidas::util::Socket*,unsigned short> > _pendingRemoveSockets;
166  std::vector<std::pair<nidas::util::Socket*,unsigned short> > _sockets;
167  std::map<nidas::util::Inet4SocketAddress,nidas::core::ConnectionInfo> _destinations;
168  bool _changed;
170  struct pollfd* _fds;
171  int _nfds;
176  };
177 
183  {
184  public:
186  int xmlPortNumber,ConnectionMonitor* monitor);
188  int run() throw(nidas::util::Exception);
189  void interrupt();
190  private:
191  void checkWorkers() throw();
192  void fireWorkers() throw();
196  std::list<VariableListWorker*> _workers;
202  };
203 
205 
206  xercesc::DOMDocument* _doc;
207 
209 
211 
213 
215 
216  unsigned short _xmlPortNumber;
217 
218  unsigned short _multicastOutPort;
219 
221 
223 
224  long long _nbytesOut;
225 
227  char *_buffer;
228 
230  char* _head;
231 
233  char* _tail;
234 
238  size_t _buflen;
239 
243  char* _eob;
244 
249 
254 
255 private:
256 
259 
262 };
263 
274 {
275  unsigned int magic; // should be MAGIC
276  unsigned short int xmlTcpPort;
277  unsigned short int dataMulticastPort;
278 
279  // a concatenated array of null terminated strings
280  // 1. multicast address to listen on for data: e.g. "239.0.0.10"
281  // 2. hostname which is providing UDP data feed.
282  // 3-N: names of DSM where data was sampled.
283  char strings[0];
284 
285  static const unsigned int MAGIC;
286 };
287 
288 
299 {
300  unsigned int magic; // should be InitialUDPDataRequestReply::MAGIC
301  unsigned short int clientUdpPort;
302 };
303 
304 }}
305 
306 #endif
xercesc::DOMDocument * _doc
Definition: UDPSampleOutput.h:206
bool _changed
Definition: UDPSampleOutput.h:168
int _nfds
Definition: UDPSampleOutput.h:171
void close()
Definition: UDPSampleOutput.cc:208
void releaseProjectDOM()
Definition: UDPSampleOutput.cc:366
Structure sent back to client from the UDP feed server, in big-endian order, indicating what TCP port...
Definition: UDPSampleOutput.h:273
std::vector< std::pair< nidas::util::Socket *, unsigned short > > _sockets
Definition: UDPSampleOutput.h:166
XMLSocketListener(UDPSampleOutput *output, int xmlPortNumber, ConnectionMonitor *monitor)
Definition: UDPSampleOutput.cc:578
Thread that waits for a connection on a tcp socket, starting a VariableListWorker on each connection...
Definition: UDPSampleOutput.h:182
unsigned short _xmlPortNumber
Definition: UDPSampleOutput.h:216
XMLSocketListener & operator=(const XMLSocketListener &)
No assignment.
void addNumOutputBytes(int val)
Definition: UDPSampleOutput.h:82
unsigned short int dataMulticastPort
Definition: UDPSampleOutput.h:277
void fromDOMElement(const xercesc::DOMElement *node)
Initialize myself from a xercesc::DOMElement.
Definition: UDPSampleOutput.cc:371
size_t _buflen
The actual buffer size.
Definition: UDPSampleOutput.h:238
nidas::util::Mutex _docLock
Definition: UDPSampleOutput.h:210
char * _tail
where we remove bytes from the buffer
Definition: UDPSampleOutput.h:233
std::list< VariableListWorker * > _workers
Definition: UDPSampleOutput.h:196
long long getNumOutputBytes() const
Total number of bytes written with this IOStream.
Definition: UDPSampleOutput.h:80
void flush()
Implementation of SampleClient::flush().
Definition: UDPSampleOutput.h:63
Interface of an output stream of samples.
Definition: SampleOutput.h:47
long long dsm_time_t
Posix time in microseconds, the number of non-leap microseconds since 1970 Jan 1 00:00 UTC...
Definition: Sample.h:61
nidas::util::Mutex _listenerLock
Definition: UDPSampleOutput.h:214
XMLSocketListener * _listener
Definition: UDPSampleOutput.h:220
ConnectionMonitor(nidas::core::MultipleUDPSockets *msock)
Definition: UDPSampleOutput.cc:403
bool _keepOpen
Definition: UDPSampleOutput.h:129
UDPSampleOutput * _output
Definition: UDPSampleOutput.h:127
~ConnectionMonitor()
Definition: UDPSampleOutput.cc:413
std::list< std::pair< nidas::util::Socket *, unsigned short > > _pendingSockets
Definition: UDPSampleOutput.h:164
size_t write(const struct iovec *iov, int iovcnt)
Definition: UDPSampleOutput.cc:269
long long _nbytesOut
Definition: UDPSampleOutput.h:224
Implementation of portions of SampleOutput.
Definition: SampleOutput.h:147
bool _projectChanged
Definition: UDPSampleOutput.h:208
Extra information associated with an IOChannel concerning the connection.
Definition: ConnectionInfo.h:38
unsigned int magic
Definition: UDPSampleOutput.h:275
UDPSampleOutput * clone(nidas::core::IOChannel *iochannel)
This SampleOutput does not support cloning.
Definition: UDPSampleOutput.cc:76
A stream (TCP) socket that is used to listen for connections.
Definition: Socket.h:755
ConnectionMonitor & operator=(const ConnectionMonitor &)
No assignment.
nidas::util::ServerSocket * _sock
Definition: UDPSampleOutput.h:194
int run()
The method which will run in its own thread.
Definition: UDPSampleOutput.cc:553
Worker thread that is run when a connection comes in, sending XML over a socket.
Definition: UDPSampleOutput.h:115
void interrupt()
Interrupt this thread.
Definition: UDPSampleOutput.cc:719
A channel for Input or Output of data.
Definition: IOChannel.h:64
~XMLSocketListener()
Definition: UDPSampleOutput.cc:589
UDPSampleOutput & operator=(const UDPSampleOutput &)
No assignment.
Definition: MultipleUDPSockets.h:37
Interface of an output stream of samples.
Definition: UDPSampleOutput.h:52
Definition: Exception.h:35
A C++ wrapper for a POSIX rwlock.
Definition: ThreadSupport.h:358
ConnectionMonitor * _monitor
Definition: UDPSampleOutput.h:195
unsigned short int clientUdpPort
Definition: UDPSampleOutput.h:301
Definition: Thread.h:80
nidas::util::Mutex _sockLock
Definition: UDPSampleOutput.h:169
static const unsigned int MAGIC
Definition: UDPSampleOutput.h:285
struct pollfd * _fds
Definition: UDPSampleOutput.h:170
int run()
The method which will run in its own thread.
Definition: UDPSampleOutput.cc:596
nidas::core::SampleOutput * connected(nidas::core::IOChannel *)
Implementation of IOChannelRequester::connected().
Definition: UDPSampleOutput.cc:122
UDPSampleOutput * _output
Definition: UDPSampleOutput.h:193
UDPSampleOutput()
Definition: UDPSampleOutput.cc:49
nidas::core::MultipleUDPSockets * _msock
Definition: UDPSampleOutput.h:163
ConnectionMonitor * _monitor
Definition: UDPSampleOutput.h:222
int len
Definition: sing.cc:934
void fireWorkers()
Definition: UDPSampleOutput.cc:702
Interface to a data sample.
Definition: Sample.h:189
nidas::core::MultipleUDPSockets * _mochan
Definition: UDPSampleOutput.h:204
void allocateBuffer(size_t len)
Definition: UDPSampleOutput.cc:99
char strings[0]
Definition: UDPSampleOutput.h:283
void addDestination(const nidas::core::ConnectionInfo &, unsigned short udpport)
A UDP request packet has arrived.
Definition: UDPSampleOutput.cc:437
Definition: IOException.h:37
nidas::util::RWLock _docRWLock
Definition: UDPSampleOutput.h:212
nidas::util::Socket * _sock
Definition: UDPSampleOutput.h:128
char * _head
where we insert bytes into the buffer
Definition: UDPSampleOutput.h:230
void addConnection(nidas::util::Socket *, unsigned short udpport)
A TCP connection has been made.
Definition: UDPSampleOutput.cc:463
unsigned short _multicastOutPort
Definition: UDPSampleOutput.h:218
A stream (TCP) socket.
Definition: Socket.h:430
char * _eob
One past end of buffer.
Definition: UDPSampleOutput.h:243
Structure which the client must send back to server on the TCP port.
Definition: UDPSampleOutput.h:298
bool receive(const nidas::core::Sample *s)
Method called to pass a sample to this client.
Definition: UDPSampleOutput.cc:221
int _xmlPortNumber
Definition: UDPSampleOutput.h:197
std::map< nidas::util::Inet4SocketAddress, nidas::core::ConnectionInfo > _destinations
Definition: UDPSampleOutput.h:167
~UDPSampleOutput()
Definition: UDPSampleOutput.cc:84
char * _buffer
data buffer
Definition: UDPSampleOutput.h:227
unsigned int magic
Definition: UDPSampleOutput.h:300
void removeConnection(nidas::util::Socket *, unsigned short udpport)
Definition: UDPSampleOutput.cc:492
int _maxUsecs
Maximum number of microseconds between physical writes.
Definition: UDPSampleOutput.h:253
A C++ wrapper for a POSIX mutex.
Definition: ThreadSupport.h:154
void checkWorkers()
Definition: UDPSampleOutput.cc:682
xercesc::DOMDocument * getProjectDOM()
Get a pointer to the current project DOM.
Definition: UDPSampleOutput.cc:338
unsigned short int xmlTcpPort
Definition: UDPSampleOutput.h:276
std::list< std::pair< nidas::util::Socket *, unsigned short > > _pendingRemoveSockets
Definition: UDPSampleOutput.h:165
Thread that waits for connections to die.
Definition: UDPSampleOutput.h:139
void updatePollfds()
Definition: UDPSampleOutput.cc:516
Definition: InvalidParameterException.h:35
nidas::core::dsm_time_t _lastWrite
Time of last physical write.
Definition: UDPSampleOutput.h:248