27#ifndef NIDAS_UTIL_MCSOCKET_H
28#define NIDAS_UTIL_MCSOCKET_H
30#include <nidas/Config.h>
40#include <sys/select.h>
47namespace nidas {
namespace util {
136template <
class SocketT>
190template <
class SocketT>
199 template<
class SocketTT>
281 McSocket<SocketT>&
operator=(
const McSocket<SocketT>& rhs);
460 template<
class SocketT>
468 static int check() throw();
562template<class SocketTT>
565 template<
class SocketT>
613namespace nidas {
namespace util {
615template<
class SocketT>
618 _newsocket(0),_newpktinfo(),
619 _socketOffered(false),_offerErrno(0),
620 _multicaster(0),_multicaster_mutex()
624template<
class SocketT>
626 _mcastAddr(x._mcastAddr),_iface(x._iface),
627 _requestType(x._requestType), _connectCond(),
628 _newsocket(0),_newpktinfo(),
629 _socketOffered(false),_offerErrno(0),
630 _multicaster(0),_multicaster_mutex()
634template<
class SocketT>
638 _mcastAddr = rhs._mcastAddr;
640 _requestType = rhs._requestType;
642 _socketOffered =
false;
648template<
class SocketT>
652template<
class SocketT>
656 std::list<Inet4NetworkInterface> ifcs = tmpsock.getInterfaces();
661template<
class SocketT>
664 if (getRequestType() < 0)
666 "request number has not been set");
669 _socketOffered =
false;
670 _connectCond.unlock();
675 throw IOException(_mcastAddr.toString(),
"accept", e.what());
682template<
class SocketT>
687 while(!_socketOffered) _connectCond.wait();
688 SocketT* socket = _newsocket;
689 pktinfo = _newpktinfo;
691 _connectCond.unlock();
692 VLOG((
"accept offerErrno=%d", _offerErrno));
693 if (!socket)
throw IOException(
"McSocket",
"accept",_offerErrno);
697template<
class SocketT>
700 if (getRequestType() < 0)
702 "request number has not been set");
705 _socketOffered =
false;
706 _connectCond.unlock();
710 _multicaster_mutex.lock();
712 _multicaster =
new McSocketMulticaster<SocketT>(
this);
714 _multicaster->start();
720 _multicaster_mutex.unlock();
726template<
class SocketT>
731 while(!_socketOffered) _connectCond.wait();
732 SocketT* socket = _newsocket;
733 pktinfo = _newpktinfo;
735 _connectCond.unlock();
736 VLOG((
"connect offerErrno=%d",_offerErrno));
737 if (!socket)
throw IOException(
"McSocket",
"connect",_offerErrno);
741template<
class SocketT>
748 _multicaster_mutex.lock();
751 VLOG((
"Mcsocket::offer creating/starting joiner"));
761 VLOG((
"Mcsocket::offer joiner started"));
763 VLOG((
"Mcsocket::offer doing connectCond.lock"));
764 _multicaster_mutex.unlock();
771template<
class SocketT>
776 if (socket) connected(socket,pktinfo);
779 _socketOffered =
true;
782 _newpktinfo = pktinfo;
783 _connectCond.signal();
784 _connectCond.unlock();
791template<
class SocketT>
797 _socketOffered =
true;
800 _connectCond.signal();
801 _connectCond.unlock();
808template<
class SocketT>
811 VLOG((
"Mcsocket::close"));
812 _multicaster_mutex.lock();
813 if (_multicaster && _multicaster->isRunning()) _multicaster->interrupt();
814 _multicaster_mutex.unlock();
822 VLOG((
"Mcsocket::close exception: %s", e.what()));
823 throw IOException(_mcastAddr.toString(),
"close", e.what());
825 VLOG((
"Mcsocket::close done, this=0x%x",
this));
828template<
class SocketTT>
830 Thread(
"McSocketMulticaster"),
831 _mcsocket(mcsock),_serverSocket(0),_datagramSocket(0),_requestSocket(0),
848template<
class SocketT>
852 _serverSocket->close();
853 delete _serverSocket;
855 if (_datagramSocket) {
856 _datagramSocket->close();
857 delete _datagramSocket;
859 if (_requestSocket) {
860 _requestSocket->close();
861 delete _requestSocket;
865template<
class SocketT>
868 _mcsocketMutex.lock();
870 _mcsocketMutex.unlock();
875 PLOG((
"%s",e.what()));
882 std::vector<Inet4NetworkInterface>& ifaces);
884template<
class SocketT>
889 int sockfd = (_serverSocket ? _serverSocket->getFd() :
890 _datagramSocket->getFd());
896 fds.events = POLLIN | POLLRDHUP;
907 struct timespec timeout;
913 pthread_sigmask(SIG_BLOCK,NULL,&sigmask);
915 sigdelset(&sigmask,SIGUSR1);
918 _mcsocket->getInet4McastSocketAddress();
921 std::vector<Inet4NetworkInterface> ifaces;
923 if (mcaddr.isMultiCastAddress())
925 VLOG((
"") << mcsockaddr.toString() <<
" is a multicast address.");
927 if (_mcsocket->getInterface().getAddress() ==
Inet4Address(INADDR_ANY))
934 VLOG((
"") << mcsockaddr.toString()
935 <<
" is not a multicast address, using a datagram request socket.");
941 dgram.setSocketAddress(mcsockaddr);
942 dgram.setRequestType(_mcsocket->getRequestType());
945 dgram.setRequesterListenPort(_serverSocket->getLocalPort());
947 dgram.setRequesterListenPort(_datagramSocket->getLocalPort());
949 for (
int numCasts=0; !isInterrupted() ; numCasts++) {
952 if (requestmsock && ifaces.size() > 0) {
954 requestmsock->setInterface(mcaddr,iface);
955 _requestSocket->send(dgram);
958 _requestSocket->send(dgram);
960 if (!(numCasts % 300)) {
961 ILOG((
"") <<
"sent " << numCasts <<
" dgrams" <<
962 ", requestType=" << dgram.getRequestType() <<
963 ", port=" << dgram.getRequesterListenPort() <<
964 ", socketType=" << dgram.getSocketType() <<
965 ", len=" << dgram.getLength() <<
966 ", #mcifaces=" << ifaces.size());
972 WLOG((
"McSocketMulticaster: ")
973 << _requestSocket->getLocalSocketAddress().toString()
974 <<
": " << e.what());
978 _requestSocket->close();
979 delete _requestSocket;
990 res = ::ppoll(&fds,1,&timeout,&sigmask);
994 if (fds.revents & POLLERR) {
995 _mcsocketMutex.lock();
996 if (_mcsocket) _mcsocket->offer(errno);
997 _mcsocketMutex.unlock();
998 if (_serverSocket) _serverSocket->close();
999 if (_datagramSocket) _datagramSocket->close();
1000 _requestSocket->close();
1005 if (fds.revents & (POLLHUP | POLLRDHUP))
1007 if (fds.revents & POLLHUP)
1009 WLOG((
"%s POLLHUP",
"McSocket"));
1014 assert(sockfd >= 0 && sockfd < FD_SETSIZE);
1015 FD_SET(sockfd, &fdset);
1016 res = ::pselect(sockfd+1,&fdset,0,0,&timeout,&sigmask);
1019 if (res == 0)
continue;
1020 if (errno == EINTR)
break;
1021 _mcsocketMutex.lock();
1022 if (_mcsocket) _mcsocket->offer(errno);
1023 _mcsocketMutex.unlock();
1024 if (_serverSocket) _serverSocket->close();
1025 if (_datagramSocket) _datagramSocket->close();
1026 _requestSocket->close();
1027 throw IOException(
"McSocket",
"poll/select",errno);
1030 if (_serverSocket) {
1031 Socket* socket = _serverSocket->accept();
1032 DLOG((
"accepted socket connection from ") <<
1034 _serverSocket->getLocalSocketAddress().toString());
1035 _serverSocket->close();
1041 pktinfo.setRemoteSocketAddress(remoteAddr);
1047 pktinfo.setLocalAddress(localAddr.getInet4Address());
1048 pktinfo.setDestinationAddress(localAddr.getInet4Address());
1050 _mcsocketMutex.lock();
1051 if (_mcsocket) _mcsocket->offer((SocketT*)socket,pktinfo);
1052 _mcsocketMutex.unlock();
1057 if (numCasts < 3)
continue;
1062 _datagramSocket->receive(dgram,pktinfo,MSG_PEEK);
1063 if (dgram.getSocketAddress().getFamily() == AF_INET) {
1066 dgram.getSocketAddress().getConstSockAddrPtr());
1067 pktinfo.setRemoteSocketAddress(remoteAddr);
1070 _mcsocketMutex.lock();
1071 if (_mcsocket) _mcsocket->offer((SocketT*)_datagramSocket,pktinfo);
1072 _datagramSocket = 0;
1073 _mcsocketMutex.unlock();
1075 _requestSocket->close();
1079 VLOG((
"McSocketMulticaster break"));
1080 _mcsocketMutex.lock();
1081 if (_mcsocket) _mcsocket->offer(EINTR);
1082 _mcsocketMutex.unlock();
1083 if (_serverSocket) _serverSocket->close();
1084 if (_datagramSocket) _datagramSocket->close();
1085 _requestSocket->close();
1086 VLOG((
"McSocketMulticaster run method exiting"));
Header file for the nidas::util logging facility.
#define err(format, arg...)
Definition ck_lams.cc:55
A wrapper class for a Posix condition variable.
Definition ThreadSupport.h:258
A DatagramPacket with a specific structure of data.
Definition DatagramPacket.h:178
A socket for sending or receiving datagrams, either unicast, broadcast or multicast.
Definition Socket.h:1178
Definition Exception.h:35
Definition IOException.h:37
Support for IP version 4 host address.
Definition Inet4Address.h:46
Definition Inet4NetworkInterface.h:36
Definition Inet4PacketInfo.h:93
A IP version 4 socket address, containing a host address, and a port number.
Definition Inet4SocketAddress.h:41
static Logger * getInstance()
Retrieve the current Logger singleton instance.
Definition Logger.cc:318
Datagram that is multicast by a host when it wants a service.
Definition McSocket.h:88
void setRequestType(int val)
Definition McSocket.h:108
struct McSocketData mcdata
Definition McSocket.h:132
int getMagic() const
Definition McSocket.h:102
McSocketDatagram & operator=(const McSocketDatagram &rhs)
Assignment operator.
Definition McSocket.cc:68
void setMagic(int val)
Definition McSocket.h:104
int getRequesterListenPort() const
What port is the requester listening on for the connection back?
Definition McSocket.h:113
void setSocketType(int val)
Definition McSocket.h:123
int getRequestType() const
Definition McSocket.h:106
static const int magicVal
Magic value that should be found at the beginning of all received McSocketDatagrams.
Definition McSocket.h:129
void setRequesterListenPort(int val)
Definition McSocket.h:115
McSocketDatagram(int requestType=-1)
Definition McSocket.cc:50
int getSocketType() const
What socket type does the requester want to establish? SOCK_STREAM or SOCK_DGRAM.
Definition McSocket.h:121
Class for listening on McSocket requests on a specific multicast address and UDP port number.
Definition McSocket.h:459
int run()
Definition McSocket.cc:292
static int check()
Public method to return the number of McSocketListeners that are active.
Definition McSocket.cc:195
void add(McSocket< Socket > *mcsocket)
Definition McSocket.cc:217
std::map< int, McSocket< Socket > * > _tcpMcSockets
Definition McSocket.h:549
static void close(McSocket< Socket > *sock)
Remove the given McSocket from the list being served by this listener.
Definition McSocket.cc:135
void interrupt()
Interrupt this thread.
Definition McSocket.cc:281
Mutex _mcsocket_mutex
Definition McSocket.h:545
DatagramSocket * _readsock
Definition McSocket.h:547
int remove(McSocket< Socket > *mcsocket)
Definition McSocket.cc:241
std::map< int, McSocket< DatagramSocket > * > _udpMcSockets
Definition McSocket.h:551
static void accept(McSocket< Socket > *sock)
How a McSocket<Socket> registers with a McSocketListener.
Definition McSocket.cc:81
Inet4SocketAddress _mcastAddr
Definition McSocket.h:543
Thread which is started by McSocket to multicast requests for connections.
Definition McSocket.h:564
McSocketMulticaster(const McSocketMulticaster &)
No copying.
int run()
Definition McSocket.h:885
McSocketMulticaster(McSocket< SocketTT > *mcsocket)
Definition McSocket.h:829
Mutex _mcsocketMutex
Definition McSocket.h:605
DatagramSocket * _requestSocket
The DatagramSocket that requests are sent on.
Definition McSocket.h:603
void interrupt()
Interrupt this thread.
Definition McSocket.h:866
McSocketMulticaster & operator=(const McSocketMulticaster &)
No assignment.
virtual ~McSocketMulticaster()
Definition McSocket.h:849
McSocket< SocketTT > * _mcsocket
Definition McSocket.h:586
DatagramSocket * _datagramSocket
If multicasting for a UDP connection, then _datagramSocket will point to the UDP socket that is waiti...
Definition McSocket.h:598
ServerSocket * _serverSocket
If multicasting for a TCP connection, then _serverSocket will point to the TCP socket that listening ...
Definition McSocket.h:592
A McSocket provides a way to establish a TCP stream socket connection, or a pair of UDP datagram sock...
Definition McSocket.h:192
void joinMulticaster()
Definition McSocket.h:742
McSocket< SocketT > & operator=(const McSocket< SocketT > &rhs)
Assignment operator.
Definition McSocket.h:635
int getRequestType() const
Get the request type number.
Definition McSocket.h:333
void offer(SocketT *sock, const Inet4PacketInfoX &pktinfo)
How a McSocketListener passes back a connected TCP socket or DatagramSocket.
Definition McSocket.h:772
void request()
Start issuing requests for a connection by multicasting McSocketDatagrams.
Definition McSocket.h:698
const Inet4SocketAddress & getInet4McastSocketAddress() const
Get the multicast address for listening to requests.
Definition McSocket.h:311
virtual void close()
Unregister this McSocket from the multicasting and listening threads.
Definition McSocket.h:809
std::list< Inet4NetworkInterface > getInterfaces() const
Return all network interfaces on this system.
Definition McSocket.h:653
SocketT * _newsocket
Definition McSocket.h:432
void setInterface(Inet4NetworkInterface iaddr)
Set a specific interface for the multicasts.
Definition McSocket.h:295
Inet4PacketInfoX _newpktinfo
Definition McSocket.h:434
McSocket()
Create a McSocket for requesting or accepting multicast requests for a socket connection.
Definition McSocket.h:616
void setInet4McastSocketAddress(const Inet4SocketAddress &val)
Set the multicast address for listening to requests.
Definition McSocket.h:320
int _requestType
Definition McSocket.h:428
Inet4SocketAddress _mcastAddr
Definition McSocket.h:424
void setRequestType(int val)
Set the request type value.
Definition McSocket.h:328
McSocket(const McSocket< SocketT > &)
Copy constructor.
Definition McSocket.h:625
virtual ~McSocket()
Definition McSocket.h:649
Inet4NetworkInterface getInterface() const
Definition McSocket.h:299
int _offerErrno
Definition McSocket.h:438
bool _socketOffered
Definition McSocket.h:436
Cond _connectCond
Definition McSocket.h:430
Mutex _multicaster_mutex
Definition McSocket.h:445
SocketT * accept(Inet4PacketInfoX &)
Like ServerSocket::accept(), this method will return a connected socket.
Definition McSocket.h:683
virtual void connected(SocketT *, const Inet4PacketInfoX &)
Virtual method that is called when a socket connection is established.
Definition McSocket.h:377
void listen()
Register with a McSocketListener to listen on the multicast address.
Definition McSocket.h:662
void offer(int err)
Definition McSocket.h:792
SocketT * connect(Inet4PacketInfoX &)
Do a request(), and then wait until a TCP connection is established, or a UDP datagram is received ba...
Definition McSocket.h:727
Thread * _multicaster
The thread we start which multicasts for connections.
Definition McSocket.h:443
Inet4NetworkInterface _iface
Definition McSocket.h:426
A datagram socket to be used for multicasts.
Definition Socket.h:1582
A C++ wrapper for a POSIX mutex.
Definition ThreadSupport.h:161
A stream (TCP) socket that is used to listen for connections.
Definition Socket.h:981
A stream (TCP) socket.
Definition Socket.h:573
const SocketAddress & getRemoteSocketAddress() const
Get remote address of this socket.
Definition Socket.h:893
const SocketAddress & getLocalSocketAddress() const
Get local address of this socket.
Definition Socket.h:912
In certain situations one needs to "join oneself", which would be a deadlock.
Definition Thread.h:608
void blockSignal(int)
Block a signal in this thread.
Definition Thread.cc:181
#define WLOG(MSG)
Definition Logger.h:313
#define PLOG(MSG)
Definition Logger.h:312
#define DLOG(MSG)
Definition Logger.h:316
#define VLOG(MSG)
Definition Logger.h:317
#define ILOG(MSG)
Definition Logger.h:315
#define LOG_ERR
Definition Logger.h:216
int getMcSocketType(McSocket< Socket > *)
Definition McSocket.h:139
void listMulticastInterfaces(MulticastSocket *requestmsock, std::vector< Inet4NetworkInterface > &ifaces)
Definition McSocket.cc:472
Root namespace for the NCAR In-Situ Data Acquisition Software.
Definition A2DConverter.h:31
McSocketData()
Constructor.
Definition McSocket.h:77
int magic
Magic value that should be found at the beginning of all received datagrams.
Definition McSocket.h:54
unsigned short _listenPort
Socket port that the remote host is listening on.
Definition McSocket.h:67
short _socketType
Either SOCK_STREAM=1, or SOCK_DGRAM=2.
Definition McSocket.h:72
int _requestType
An integer which identifies the type of the request.
Definition McSocket.h:61
#define NSECS_PER_SEC
Definition types.h:98