41 #ifndef NIDAS_UTIL_MCSOCKET_H
42 #define NIDAS_UTIL_MCSOCKET_H
44 #include <nidas/Config.h>
54 #include <sys/select.h>
61 namespace nidas {
namespace util {
150 template <
class SocketT>
204 template <
class SocketT>
213 template<
class SocketTT>
408 virtual
void close() throw(IOException);
462 template<
class SocketT>
470 static int check() throw();
509 int run() throw(Exception);
530 McSocketListener& operator=(const McSocketListener&);
553 template<class SocketTT>
556 template<
class SocketT>
570 int run() throw(Exception);
601 namespace nidas {
namespace util {
603 template<
class SocketT>
606 _newsocket(0),_newpktinfo(),
607 _socketOffered(false),_offerErrno(0),
608 _multicaster(0),_multicaster_mutex()
612 template<
class SocketT>
614 _mcastAddr(x._mcastAddr),_iface(x._iface),
615 _requestType(x._requestType), _connectCond(),
616 _newsocket(0),_newpktinfo(),
617 _socketOffered(false),_offerErrno(0),
618 _multicaster(0),_multicaster_mutex()
622 template<
class SocketT>
626 _mcastAddr = rhs._mcastAddr;
628 _requestType = rhs._requestType;
630 _socketOffered =
false;
636 template<
class SocketT>
640 template<
class SocketT>
644 std::list<Inet4NetworkInterface> ifcs = tmpsock.
getInterfaces();
649 template<
class SocketT>
652 if (getRequestType() < 0)
653 throw IOException(_mcastAddr.
toString(),
"listen",
654 "request number has not been set");
656 _socketOffered =
false;
660 catch (
const Exception& e) {
661 throw IOException(_mcastAddr.
toString(),
"accept",e.
what());
668 template<
class SocketT>
673 while(!_socketOffered) _connectCond.
wait();
674 SocketT* socket = _newsocket;
675 pktinfo = _newpktinfo;
678 VLOG((
"accept offerErrno=%d", _offerErrno));
679 if (!socket)
throw IOException(
"McSocket",
"accept",_offerErrno);
683 template<
class SocketT>
686 if (getRequestType() < 0)
687 throw IOException(_mcastAddr.
toString(),
"listen",
688 "request number has not been set");
690 _socketOffered =
false;
694 _multicaster_mutex.
lock();
698 _multicaster->
start();
700 catch(
const Exception& e) {
701 throw IOException(
"McSocket",
"request",e.
what());
704 _multicaster_mutex.
unlock();
710 template<
class SocketT>
715 while(!_socketOffered) _connectCond.
wait();
716 SocketT* socket = _newsocket;
717 pktinfo = _newpktinfo;
720 VLOG((
"connect offerErrno=%d",_offerErrno));
721 if (!socket)
throw IOException(
"McSocket",
"connect",_offerErrno);
725 template<
class SocketT>
732 _multicaster_mutex.
lock();
735 VLOG((
"Mcsocket::offer creating/starting joiner"));
742 catch(
const Exception& e) {
745 VLOG((
"Mcsocket::offer joiner started"));
747 VLOG((
"Mcsocket::offer doing connectCond.lock"));
748 _multicaster_mutex.
unlock();
755 template<
class SocketT>
760 if (socket) connected(socket,pktinfo);
763 _socketOffered =
true;
766 _newpktinfo = pktinfo;
775 template<
class SocketT>
781 _socketOffered =
true;
792 template<
class SocketT>
795 VLOG((
"Mcsocket::close"));
796 _multicaster_mutex.
lock();
798 _multicaster_mutex.
unlock();
805 catch (
const Exception& e) {
806 VLOG((
"Mcsocket::close exception: %s", e.
what()));
807 throw IOException(_mcastAddr.
toString(),
"close", e.
what());
809 VLOG((
"Mcsocket::close done, this=0x%x",
this));
812 template<
class SocketTT>
814 Thread(
"McSocketMulticaster"),
815 _mcsocket(mcsock),_serverSocket(0),_datagramSocket(0),_requestSocket(0),
832 template<
class SocketT>
836 _serverSocket->
close();
837 delete _serverSocket;
839 if (_datagramSocket) {
840 _datagramSocket->
close();
841 delete _datagramSocket;
843 if (_requestSocket) {
844 _requestSocket->
close();
845 delete _requestSocket;
849 template<
class SocketT>
852 _mcsocketMutex.
lock();
858 catch(
const Exception& e) {
866 std::vector<Inet4NetworkInterface>& ifaces);
868 template<
class SocketT>
873 int sockfd = (_serverSocket ? _serverSocket->
getFd() :
874 _datagramSocket->
getFd());
880 fds.events = POLLIN | POLLRDHUP;
891 struct timespec timeout;
897 pthread_sigmask(SIG_BLOCK,NULL,&sigmask);
899 sigdelset(&sigmask,SIGUSR1);
902 _mcsocket->getInet4McastSocketAddress();
905 std::vector<Inet4NetworkInterface> ifaces;
909 VLOG((
"") << mcsockaddr.
toString() <<
" is a multicast address.");
911 if (_mcsocket->getInterface().getAddress() ==
Inet4Address(INADDR_ANY))
919 <<
" is not a multicast address, using a datagram request socket.");
933 for (
int numCasts=0; !isInterrupted() ; numCasts++) {
936 if (requestmsock && ifaces.size() > 0) {
939 _requestSocket->
send(dgram);
942 _requestSocket->
send(dgram);
944 if (!(numCasts % 300)) {
945 ILOG((
"") <<
"sent " << numCasts <<
" dgrams" <<
950 ", #mcifaces=" << ifaces.size());
953 catch(
const IOException& e)
956 WLOG((
"McSocketMulticaster: ")
958 <<
": " << e.
what());
962 _requestSocket->
close();
963 delete _requestSocket;
974 res = ::ppoll(&fds,1,&timeout,&sigmask);
978 if (fds.revents & POLLERR) {
979 _mcsocketMutex.
lock();
980 if (_mcsocket) _mcsocket->offer(errno);
982 if (_serverSocket) _serverSocket->
close();
983 if (_datagramSocket) _datagramSocket->
close();
984 _requestSocket->
close();
985 throw IOException(
"McSocket",
"ppoll",errno);
989 if (fds.revents & (POLLHUP | POLLRDHUP))
991 if (fds.revents & POLLHUP)
993 WLOG((
"%s POLLHUP",
"McSocket"));
998 assert(sockfd >= 0 && sockfd < FD_SETSIZE);
999 FD_SET(sockfd, &fdset);
1000 res = ::pselect(sockfd+1,&fdset,0,0,&timeout,&sigmask);
1003 if (res == 0)
continue;
1004 if (errno == EINTR)
break;
1005 _mcsocketMutex.
lock();
1006 if (_mcsocket) _mcsocket->offer(errno);
1008 if (_serverSocket) _serverSocket->
close();
1009 if (_datagramSocket) _datagramSocket->
close();
1010 _requestSocket->
close();
1011 throw IOException(
"McSocket",
"poll/select",errno);
1014 if (_serverSocket) {
1016 DLOG((
"accepted socket connection from ") <<
1019 _serverSocket->
close();
1034 _mcsocketMutex.
lock();
1035 if (_mcsocket) _mcsocket->offer((SocketT*)socket,pktinfo);
1041 if (numCasts < 3)
continue;
1046 _datagramSocket->
receive(dgram,pktinfo,MSG_PEEK);
1054 _mcsocketMutex.
lock();
1055 if (_mcsocket) _mcsocket->offer((SocketT*)_datagramSocket,pktinfo);
1056 _datagramSocket = 0;
1059 _requestSocket->
close();
1063 VLOG((
"McSocketMulticaster break"));
1064 _mcsocketMutex.
lock();
1065 if (_mcsocket) _mcsocket->offer(EINTR);
1067 if (_serverSocket) _serverSocket->
close();
1068 if (_datagramSocket) _datagramSocket->
close();
1069 _requestSocket->
close();
1070 VLOG((
"McSocketMulticaster run method exiting"));
McSocketMulticaster(McSocket< SocketTT > *mcsocket)
Definition: McSocket.h:813
void signal()
Unblock at least one thread waiting on the condition variable.
Definition: ThreadSupport.h:310
static const int magicVal
Magic value that should be found at the beginning of all received McSocketDatagrams.
Definition: McSocket.h:143
Class for listening on McSocket requests on a specific multicast address and UDP port number...
Definition: McSocket.h:460
McSocket()
Create a McSocket for requesting or accepting multicast requests for a socket connection.
Definition: McSocket.h:604
SocketT * connect(Inet4PacketInfoX &)
Do a request(), and then wait until a TCP connection is established, or a UDP datagram is received ba...
Definition: McSocket.h:711
void listen()
Register with a McSocketListener to listen on the multicast address.
Definition: McSocket.h:650
A datagram socket to be used for multicasts.
Definition: Socket.h:1241
McSocketDatagram & operator=(const McSocketDatagram &rhs)
Assignment operator.
Definition: McSocket.cc:82
Mutex _multicaster_mutex
Definition: McSocket.h:447
void send(const DatagramPacketBase &packet, int flags=0)
Definition: Socket.h:1058
bool isMultiCastAddress() const
Is this address a multicast address? Multicast addresses are in the range 224.0.0.1 to 239.255.255.255, their first four bits are 1110=0xe.
Definition: Inet4Address.h:138
Thread * _multicaster
The thread we start which multicasts for connections.
Definition: McSocket.h:445
void interrupt()
Interrupt this thread.
Definition: McSocket.h:850
void setRemoteSocketAddress(const Inet4SocketAddress &val)
Definition: Inet4PacketInfo.h:104
void lock()
Lock the mutex associated with the condition variable.
Definition: ThreadSupport.h:290
int getMagic() const
Definition: McSocket.h:116
virtual ~McSocket()
Definition: McSocket.h:637
SocketAddress & getSocketAddress() const
Definition: DatagramPacket.h:100
McSocketDatagram(int requestType=-1)
Definition: McSocket.cc:64
void request()
Start issuing requests for a connection by multicasting McSocketDatagrams.
Definition: McSocket.h:684
int getFd() const
Definition: Socket.h:1031
#define err(format, arg...)
Definition: ck_lams.cc:55
void receive(DatagramPacketBase &packet)
Read a packet from the DatagramSocket.
Definition: Socket.h:1038
std::string toString() const
Java style toString: returns "inet:hostname:port".
Definition: Inet4SocketAddress.cc:84
A socket for sending or receiving datagrams, either unicast, broadcast or multicast.
Definition: Socket.h:912
Datagram that is multicast by a host when it wants a service.
Definition: McSocket.h:101
int getRequesterListenPort() const
What port is the requester listening on for the connection back?
Definition: McSocket.h:127
int getFd() const
Definition: Socket.h:788
void setSocketType(int val)
Definition: McSocket.h:137
void setRequesterListenPort(int val)
Definition: McSocket.h:129
Inet4PacketInfoX _newpktinfo
Definition: McSocket.h:436
Inet4SocketAddress _mcastAddr
Definition: McSocket.h:426
#define ILOG(MSG)
Definition: Logger.h:305
unsigned short _listenPort
Socket port that the remote host is listening on.
Definition: McSocket.h:81
int _requestType
Definition: McSocket.h:430
A IP version 4 socket address, containing a host address, and a port number.
Definition: Inet4SocketAddress.h:41
int _offerErrno
Definition: McSocket.h:440
virtual void setLocalAddress(const Inet4Address &val)
Definition: Inet4PacketInfo.h:56
SocketT * _newsocket
Definition: McSocket.h:434
void listMulticastInterfaces(MulticastSocket *requestmsock, std::vector< Inet4NetworkInterface > &ifaces)
Definition: McSocket.cc:489
void close()
Definition: Socket.h:958
int _requestType
An integer which identifies the type of the request.
Definition: McSocket.h:75
const Inet4SocketAddress & getInet4McastSocketAddress() const
Get the multicast address for listening to requests.
Definition: McSocket.h:323
Cond _connectCond
Definition: McSocket.h:432
virtual std::string toString() const =0
Java style toString.
virtual int getFamily() const =0
Get the family of this SocketAddress, one of the values from /usr/include/sys/socket.h: AF_UNIX, AF_INET, AF_INET6, etc.
virtual void start()
Start the thread running, meaning execute the run method in a separate thread.
Definition: Thread.cc:406
int getLocalPort() const
Get local port number of this socket.
Definition: Socket.h:1107
McSocketData()
Constructor.
Definition: McSocket.h:91
SocketT * accept(Inet4PacketInfoX &)
Like ServerSocket::accept(), this method will return a connected socket.
Definition: McSocket.h:669
Definition: McSocket.h:63
Definition: Inet4PacketInfo.h:92
int run()
The method which will run in its own thread.
Definition: McSocket.h:869
struct McSocketData mcdata
Definition: McSocket.h:146
void log(va_alist)
Build a message from a printf-format string and variable args, and log the message.
Definition: Logger.cc:444
const SocketAddress & getLocalSocketAddress() const
Fetch the local address that this socket is bound to.
Definition: Socket.h:834
A stream (TCP) socket that is used to listen for connections.
Definition: Socket.h:755
void blockSignal(int)
Block a signal in this thread.
Definition: Thread.cc:211
friend class McSocketMulticaster
Definition: McSocket.h:214
void setMagic(int val)
Definition: McSocket.h:118
int getMcSocketType(McSocket< Socket > *)
Definition: McSocket.h:153
bool _socketOffered
Definition: McSocket.h:438
int getRequestType() const
Definition: McSocket.h:120
#define LOG_ERR
Definition: Logger.h:207
Definition: Exception.h:35
void setInterface(Inet4NetworkInterface iaddr)
Set a specific interface for the multicasts.
Definition: McSocket.h:309
Thread which is started by McSocket to multicast requests for connections.
Definition: McSocket.h:554
static Logger * getInstance()
Retrieve the current Logger singleton instance.
Definition: Logger.cc:293
Socket * accept()
Accept connection, return a connected Socket instance.
Definition: Socket.h:808
const SocketAddress & getLocalSocketAddress() const
Get local address of this socket.
Definition: Socket.h:1099
std::list< Inet4NetworkInterface > getInterfaces() const
Return all network interfaces on this system.
Definition: McSocket.h:641
Definition: Inet4NetworkInterface.h:35
virtual ~McSocketMulticaster()
Definition: McSocket.h:833
A McSocket provides a way to establish a TCP stream socket connection, or a pair of UDP datagram sock...
Definition: McSocket.h:151
void lock()
Lock the Mutex.
Definition: ThreadSupport.h:206
static void accept(McSocket< Socket > *sock)
How a McSocket<Socket> registers with a McSocketListener.
Definition: McSocket.cc:95
Inet4NetworkInterface _iface
Definition: McSocket.h:428
int getLocalPort() const
Definition: Socket.h:839
McSocket< SocketT > & operator=(const McSocket< SocketT > &rhs)
Assignment operator.
Definition: McSocket.h:623
A DatagramPacket with a specific structure of data.
Definition: DatagramPacket.h:178
virtual int getLength() const
Get the value for the current number of bytes in data.
Definition: DatagramPacket.h:135
#define VLOG(MSG)
Definition: Logger.h:307
const SocketAddress & getRemoteSocketAddress() const
Get remote address of this socket.
Definition: Socket.h:677
Definition: IOException.h:37
void wait()
Wait on the condition variable.
Definition: ThreadSupport.cc:306
Inet4NetworkInterface getInterface() const
Definition: McSocket.h:313
Inet4Address getInet4Address() const
Return the IP address portion.
Definition: Inet4SocketAddress.h:91
void joinMulticaster()
Definition: McSocket.h:726
void offer(SocketT *sock, const Inet4PacketInfoX &pktinfo)
How a McSocketListener passes back a connected TCP socket or DatagramSocket.
Definition: McSocket.h:756
int getPort() const
Return the port number.
Definition: Inet4SocketAddress.h:81
void close()
close the socket.
Definition: Socket.cc:1549
A stream (TCP) socket.
Definition: Socket.h:430
int getSocketType() const
What socket type does the requester want to establish? SOCK_STREAM or SOCK_DGRAM. ...
Definition: McSocket.h:135
virtual const char * what() const
Definition: Exception.h:98
A wrapper class for a Posix condition variable.
Definition: ThreadSupport.h:245
std::list< Inet4NetworkInterface > getInterfaces() const
Return the IP addresses of all my network interfaces.
Definition: Socket.h:1385
virtual void setDestinationAddress(const Inet4Address &val)
Definition: Inet4PacketInfo.h:65
static void close(McSocket< Socket > *sock)
Remove the given McSocket from the list being served by this listener.
Definition: McSocket.cc:151
short _socketType
Either SOCK_STREAM=1, or SOCK_DGRAM=2.
Definition: McSocket.h:86
virtual void interrupt()
Interrupt this thread.
Definition: Thread.cc:600
In certain situations one needs to "join oneself", which would be a deadlock.
Definition: Thread.h:573
#define NSECS_PER_SEC
Definition: types.h:98
void setSocketAddress(const SocketAddress &val)
Definition: DatagramPacket.h:102
virtual void connected(SocketT *, const Inet4PacketInfoX &)
Virtual method that is called when a socket connection is established.
Definition: McSocket.h:385
const SocketAddress & getLocalSocketAddress() const
Get local address of this socket.
Definition: Socket.h:691
#define WLOG(MSG)
Definition: Logger.h:303
#define DLOG(MSG)
Definition: Logger.h:306
virtual struct sockaddr * getConstSockAddrPtr() const =0
void unlock()
Unlock the mutex associated with the condition variable.
Definition: ThreadSupport.h:299
#define PLOG(MSG)
Definition: Logger.h:302
void setInterface(Inet4Address maddr, const Inet4NetworkInterface &iface)
Set the interface for a given multicast address.
Definition: Socket.h:1355
void setRequestType(int val)
Set the request type value.
Definition: McSocket.h:340
A C++ wrapper for a POSIX mutex.
Definition: ThreadSupport.h:154
int getRequestType() const
Get the request type number.
Definition: McSocket.h:345
void setInet4McastSocketAddress(const Inet4SocketAddress &val)
Set the multicast address for listening to requests.
Definition: McSocket.h:332
virtual void close()
Unregister this McSocket from the multicasting and listening threads.
Definition: McSocket.h:793
virtual bool isRunning() const
Is this thread running?
Definition: Thread.h:266
void setRequestType(int val)
Definition: McSocket.h:122
Support for IP version 4 host address.
Definition: Inet4Address.h:46
int magic
Magic value that should be found at the beginning of all received datagrams.
Definition: McSocket.h:68
void unlock()
Unlock the Mutex.
Definition: ThreadSupport.h:218