nidas v1.2.3
UDPSampleOutput.h
Go to the documentation of this file.
1// -*- mode: C++; indent-tabs-mode: nil; c-basic-offset: 4; tab-width: 4; -*-
2// vim: set shiftwidth=4 softtabstop=4 expandtab:
3/*
4 ********************************************************************
5 ** NIDAS: NCAR In-situ Data Acquistion Software
6 **
7 ** 2009, Copyright University Corporation for Atmospheric Research
8 **
9 ** This program is free software; you can redistribute it and/or modify
10 ** it under the terms of the GNU General Public License as published by
11 ** the Free Software Foundation; either version 2 of the License, or
12 ** (at your option) any later version.
13 **
14 ** This program is distributed in the hope that it will be useful,
15 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
16 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 ** GNU General Public License for more details.
18 **
19 ** The LICENSE.txt file accompanying this software contains
20 ** a copy of the GNU General Public License. If it is not found,
21 ** write to the Free Software Foundation, Inc.,
22 ** 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23 **
24 ********************************************************************
25*/
26
27#ifndef NIDAS_DYNLD_UDPSAMPLEOUTPUT_H
28#define NIDAS_DYNLD_UDPSAMPLEOUTPUT_H
29
32#include <nidas/util/Thread.h>
33
34#include <poll.h>
35
36namespace nidas {
37
38namespace util {
39class Socket;
40class ServerSocket;
41}
42
43namespace core {
44class IOChannel;
45class MultipleUDPSockets;
46}
47
48namespace dynld {
53{
54public:
55
57
59
63 void flush() throw() {}
64
65 void allocateBuffer(size_t len);
66
68
69 bool receive(const nidas::core::Sample *s) throw();
70
71 size_t write(const struct iovec* iov,int iovcnt);
72
73 // void init() throw();
74
75 void close();
76
80 long long getNumOutputBytes() const { return _nbytesOut; }
81
82 void addNumOutputBytes(int val) { _nbytesOut += val; }
83
84 void fromDOMElement(const xercesc::DOMElement* node);
85
86protected:
87
93
99
100private:
106 xercesc::DOMDocument* getProjectDOM();
107
108 void releaseProjectDOM();
109
134
139 {
140 public:
142
144
145 int run();
146
150 void addConnection(nidas::util::Socket*,unsigned short udpport);
151
152 void removeConnection(nidas::util::Socket*,unsigned short udpport);
153
159 unsigned short udpport);
160 private:
161 void updatePollfds();
163 std::list<std::pair<nidas::util::Socket*,unsigned short> > _pendingSockets;
164 std::list<std::pair<nidas::util::Socket*,unsigned short> > _pendingRemoveSockets;
165 std::vector<std::pair<nidas::util::Socket*,unsigned short> > _sockets;
166 std::map<nidas::util::Inet4SocketAddress,nidas::core::ConnectionInfo> _destinations;
169 struct pollfd* _fds;
170 int _nfds;
175 };
176
182 {
183 public:
185 int xmlPortNumber,ConnectionMonitor* monitor);
187 int run();
188 void interrupt();
189 private:
190 void checkWorkers();
191 void fireWorkers();
195 std::list<VariableListWorker*> _workers;
201 };
202
204
205 xercesc::DOMDocument* _doc;
206
208
210
212
214
215 unsigned short _xmlPortNumber;
216
217 unsigned short _multicastOutPort;
218
220
222
223 long long _nbytesOut;
224
226 char *_buffer;
227
229 char* _head;
230
232 char* _tail;
233
237 size_t _buflen;
238
242 char* _eob;
243
248
253
254private:
255
258
261};
262
273{
274 unsigned int magic; // should be MAGIC
275 unsigned short int xmlTcpPort;
276 unsigned short int dataMulticastPort;
277
278 // a concatenated array of null terminated strings
279 // 1. multicast address to listen on for data: e.g. "239.0.0.10"
280 // 2. hostname which is providing UDP data feed.
281 // 3-N: names of DSM where data was sampled.
282 char strings[0];
283
284 static const unsigned int MAGIC;
285};
286
287
298{
299 unsigned int magic; // should be InitialUDPDataRequestReply::MAGIC
300 unsigned short int clientUdpPort;
301};
302
303}}
304
305#endif
Extra information associated with an IOChannel concerning the connection.
Definition ConnectionInfo.h:39
A channel for Input or Output of data.
Definition IOChannel.h:65
Definition MultipleUDPSockets.h:38
Implementation of portions of SampleOutput.
Definition SampleOutput.h:160
Interface of an output stream of samples.
Definition SampleOutput.h:49
Interface to a data sample.
Definition Sample.h:190
Thread that waits for connections to die.
Definition UDPSampleOutput.h:139
void addDestination(const nidas::core::ConnectionInfo &, unsigned short udpport)
A UDP request packet has arrived.
Definition UDPSampleOutput.cc:436
nidas::core::MultipleUDPSockets * _msock
Definition UDPSampleOutput.h:162
ConnectionMonitor(const ConnectionMonitor &)
No copying.
int _nfds
Definition UDPSampleOutput.h:170
std::list< std::pair< nidas::util::Socket *, unsigned short > > _pendingSockets
Definition UDPSampleOutput.h:163
~ConnectionMonitor()
Definition UDPSampleOutput.cc:412
void addConnection(nidas::util::Socket *, unsigned short udpport)
A TCP connection has been made.
Definition UDPSampleOutput.cc:462
ConnectionMonitor & operator=(const ConnectionMonitor &)
No assignment.
void removeConnection(nidas::util::Socket *, unsigned short udpport)
Definition UDPSampleOutput.cc:491
std::vector< std::pair< nidas::util::Socket *, unsigned short > > _sockets
Definition UDPSampleOutput.h:165
nidas::util::Mutex _sockLock
Definition UDPSampleOutput.h:168
bool _changed
Definition UDPSampleOutput.h:167
void updatePollfds()
Definition UDPSampleOutput.cc:515
std::list< std::pair< nidas::util::Socket *, unsigned short > > _pendingRemoveSockets
Definition UDPSampleOutput.h:164
ConnectionMonitor(nidas::core::MultipleUDPSockets *msock)
Definition UDPSampleOutput.cc:402
std::map< nidas::util::Inet4SocketAddress, nidas::core::ConnectionInfo > _destinations
Definition UDPSampleOutput.h:166
int run()
The method which will run in its own thread.
Definition UDPSampleOutput.cc:552
struct pollfd * _fds
Definition UDPSampleOutput.h:169
Worker thread that is run when a connection comes in, sending XML over a socket.
Definition UDPSampleOutput.h:115
nidas::util::Socket * _sock
Definition UDPSampleOutput.h:127
UDPSampleOutput * _output
Definition UDPSampleOutput.h:126
int run()
The method which will run in its own thread.
Definition UDPSampleOutput.cc:740
VariableListWorker(UDPSampleOutput *output, nidas::util::Socket *sock, bool keepOpen)
Constructor.
Definition UDPSampleOutput.cc:725
VariableListWorker(const VariableListWorker &)
No copying.
~VariableListWorker()
Definition UDPSampleOutput.cc:731
bool _keepOpen
Definition UDPSampleOutput.h:128
void interrupt()
Interrupt this thread.
Definition UDPSampleOutput.cc:734
VariableListWorker & operator=(const VariableListWorker &)
No assignment.
Thread that waits for a connection on a tcp socket, starting a VariableListWorker on each connection.
Definition UDPSampleOutput.h:182
std::list< VariableListWorker * > _workers
Definition UDPSampleOutput.h:195
UDPSampleOutput * _output
Definition UDPSampleOutput.h:192
XMLSocketListener(const XMLSocketListener &)
No copying.
int run()
The method which will run in its own thread.
Definition UDPSampleOutput.cc:595
void interrupt()
Interrupt this thread.
Definition UDPSampleOutput.cc:718
void checkWorkers()
Definition UDPSampleOutput.cc:681
nidas::util::ServerSocket * _sock
Definition UDPSampleOutput.h:193
~XMLSocketListener()
Definition UDPSampleOutput.cc:588
int _xmlPortNumber
Definition UDPSampleOutput.h:196
ConnectionMonitor * _monitor
Definition UDPSampleOutput.h:194
void fireWorkers()
Definition UDPSampleOutput.cc:701
XMLSocketListener(UDPSampleOutput *output, int xmlPortNumber, ConnectionMonitor *monitor)
Definition UDPSampleOutput.cc:577
XMLSocketListener & operator=(const XMLSocketListener &)
No assignment.
Interface of an output stream of samples.
Definition UDPSampleOutput.h:53
size_t _buflen
The actual buffer size.
Definition UDPSampleOutput.h:237
UDPSampleOutput(const UDPSampleOutput &)
No copying.
bool receive(const nidas::core::Sample *s)
Return true if SampleOutputBase considers the sample handled, such as if it is outside the clipping w...
Definition UDPSampleOutput.cc:221
char * _head
where we insert bytes into the buffer
Definition UDPSampleOutput.h:229
int _maxUsecs
Maximum number of microseconds between physical writes.
Definition UDPSampleOutput.h:252
nidas::util::Mutex _listenerLock
Definition UDPSampleOutput.h:213
void allocateBuffer(size_t len)
Definition UDPSampleOutput.cc:99
void addNumOutputBytes(int val)
Definition UDPSampleOutput.h:82
~UDPSampleOutput()
Definition UDPSampleOutput.cc:84
UDPSampleOutput & operator=(const UDPSampleOutput &)
No assignment.
unsigned short _multicastOutPort
Definition UDPSampleOutput.h:217
nidas::core::MultipleUDPSockets * _mochan
Definition UDPSampleOutput.h:203
nidas::core::SampleOutput * connected(nidas::core::IOChannel *)
Implementation of IOChannelRequester::connected().
Definition UDPSampleOutput.cc:122
void releaseProjectDOM()
Definition UDPSampleOutput.cc:366
nidas::util::RWLock _docRWLock
Definition UDPSampleOutput.h:211
void fromDOMElement(const xercesc::DOMElement *node)
Definition UDPSampleOutput.cc:371
long long _nbytesOut
Definition UDPSampleOutput.h:223
unsigned short _xmlPortNumber
Definition UDPSampleOutput.h:215
long long getNumOutputBytes() const
Total number of bytes written with this IOStream.
Definition UDPSampleOutput.h:80
char * _buffer
data buffer
Definition UDPSampleOutput.h:226
UDPSampleOutput()
Definition UDPSampleOutput.cc:49
bool _projectChanged
Definition UDPSampleOutput.h:207
xercesc::DOMDocument * _doc
Definition UDPSampleOutput.h:205
char * _tail
where we remove bytes from the buffer
Definition UDPSampleOutput.h:232
UDPSampleOutput * clone(nidas::core::IOChannel *iochannel)
This SampleOutput does not support cloning.
Definition UDPSampleOutput.cc:76
xercesc::DOMDocument * getProjectDOM()
Get a pointer to the current project DOM.
Definition UDPSampleOutput.cc:338
nidas::util::Mutex _docLock
Definition UDPSampleOutput.h:209
ConnectionMonitor * _monitor
Definition UDPSampleOutput.h:221
void flush()
Implementation of SampleClient::flush().
Definition UDPSampleOutput.h:63
nidas::core::dsm_time_t _lastWrite
Time of last physical write.
Definition UDPSampleOutput.h:247
XMLSocketListener * _listener
Definition UDPSampleOutput.h:219
size_t write(const struct iovec *iov, int iovcnt)
Definition UDPSampleOutput.cc:269
void close()
Definition UDPSampleOutput.cc:208
char * _eob
One past end of buffer.
Definition UDPSampleOutput.h:242
A C++ wrapper for a POSIX mutex.
Definition ThreadSupport.h:161
A C++ wrapper for a POSIX rwlock.
Definition ThreadSupport.h:379
A stream (TCP) socket that is used to listen for connections.
Definition Socket.h:981
A stream (TCP) socket.
Definition Socket.h:573
Definition Thread.h:83
Sample * getSample(sampleType type, unsigned int len)
A convienence method for getting a sample of an enumerated type from a pool.
Definition Sample.cc:70
long long dsm_time_t
Posix time in microseconds, the number of non-leap microseconds since 1970 Jan 1 00:00 UTC.
Definition Sample.h:62
Root namespace for the NCAR In-Situ Data Acquisition Software.
Definition A2DConverter.h:31
int len
Definition sing.cc:948
Structure sent back to client from the UDP feed server, in big-endian order, indicating what TCP port...
Definition UDPSampleOutput.h:273
char strings[0]
Definition UDPSampleOutput.h:282
unsigned int magic
Definition UDPSampleOutput.h:274
unsigned short int xmlTcpPort
Definition UDPSampleOutput.h:275
static const unsigned int MAGIC
Definition UDPSampleOutput.h:284
unsigned short int dataMulticastPort
Definition UDPSampleOutput.h:276
Structure which the client must send back to server on the TCP port.
Definition UDPSampleOutput.h:298
unsigned short int clientUdpPort
Definition UDPSampleOutput.h:300
unsigned int magic
Definition UDPSampleOutput.h:299