Skip to content

Commit

Permalink
SINGA-21 Code review
Browse files Browse the repository at this point in the history
review socket.h and socket.cc
  -- remove BasePoll class
  -- rename Socket to SocketInterface
  -- refine functions: Dealer.Connect, Router.Bind
  -- formatting
  • Loading branch information
wangsheng1001 authored and nudles committed Jun 24, 2015
1 parent b0483f2 commit b2d7332
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 148 deletions.
12 changes: 7 additions & 5 deletions include/communication/msg.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
#ifndef SINGA_COMMUNICATION_MSG_H_
#define SINGA_COMMUNICATION_MSG_H_

#include <czmq.h>
#include <glog/logging.h>
// TODO(wangwei): make it a compiler argument
#define USE_ZMQ

#include <algorithm>
#include <string>

namespace singa {
#ifdef USE_ZMQ
#include <czmq.h>
#endif

#define USE_ZMQ
namespace singa {

class Msg {
public:
Expand Down Expand Up @@ -60,7 +63,6 @@ class Msg {
src_ = msg->src_;
dst_ = msg->dst_;
}

/**
* Add a frame (a chunck of bytes) into the message
*/
Expand Down
161 changes: 83 additions & 78 deletions include/communication/socket.h
Original file line number Diff line number Diff line change
@@ -1,144 +1,149 @@
#ifndef INCLUDE_COMMUNICATION_SOCKET_H_
#define INCLUDE_COMMUNICATION_SOCKET_H_
#ifndef SINGA_COMMUNICATION_SOCKET_H_
#define SINGA_COMMUNICATION_SOCKET_H_

#include <map>
#include <string>
#include <vector>

#include "communication/msg.h"

#ifdef USE_ZMQ
#include <czmq.h>
#endif

namespace singa {

const std::string kInprocRouterEndpoint="inproc://router";
class Socket{
public:
Socket(){}
virtual ~Socket(){}
const char kInprocRouterEndpoint[] = "inproc://router";

class SocketInterface {
public:
virtual ~SocketInterface() {}
/**
* Send a message to connected socket(s), non-blocking. The message will
* be deallocated after sending, thus should not be used after calling Send();
* @param the message to be sent
* Send a message to connected socket(s), non-blocking. The message
* will be deallocated after sending, thus should not be used after
* calling Send();
*
* @param msg The message to be sent
* @return 1 for success queuing the message for sending, 0 for failure
*/
virtual int Send(Msg** msg)=0;
virtual int Send(Msg** msg) = 0;
/**
* Receive a message from any connected socket.
*
* @return a message pointer if success; nullptr if failure
*/
virtual Msg* Receive()=0;
virtual Msg* Receive() = 0;
/**
* @return Identifier of the implementation dependent socket. E.g., zsock_t*
* for ZeroMQ implementation and rank for MPI implementation.
*/
virtual void* InternalID() const=0;

protected:
int local_id_;
virtual void* InternalID() const = 0;
};

class BasePoller{
class Poller {
public:
Poller();
/**
* Add a socket for polling; Multiple sockets can be polled together by
* adding them into the same poller.
*/
virtual void Add(Socket* socket)=0;
void Add(SocketInterface* socket);
/**
* Poll for all sockets added into this poller.
* @param timeout stop after this number of mseconds
* @return pointer to the socket if it has one message in the receiving
* @param timeout Stop after this number of mseconds
* @return pointer To the socket if it has one message in the receiving
* queue; nullptr if no message in any sockets,
*/
virtual Socket* Wait(int timeout)=0;
};
SocketInterface* Wait(int duration);

#define USE_ZMQ
#include <czmq.h>

#ifdef USE_ZMQ
class Poller: public BasePoller{
public:
Poller();
virtual void Add(Socket* socket);
virtual Socket* Wait(int duration);
protected:
#ifdef USE_ZMQ
zpoller_t *poller_;
std::map<zsock_t*, Socket*> zsock2Socket_;
std::map<zsock_t*, SocketInterface*> zsock2Socket_;
#endif
};

class Dealer : public Socket{
class Dealer : public SocketInterface {
public:
/*
* @param id local dealer ID within a procs if the dealer is from worker or
* @param id Local dealer ID within a procs if the dealer is from worker or
* server thread, starts from 1 (0 is used by the router); or the connected
* remote procs ID for inter-process dealers from the stub thread.
*/
Dealer(int id=-1);
virtual ~Dealer();
Dealer();
explicit Dealer(int id);
~Dealer() override;
/**
* Setup the connection with the router.
*
* @param endpoint identifier of the router. For intra-process
* @param endpoint Identifier of the router. For intra-process
* connection, the endpoint follows the format of ZeroMQ, i.e.,
* starting with "inproc://"; in Singa, since each process has one
* router, hence we can fix the endpoint to be "inproc://router" for
* intra-process. For inter-process, the endpoint follows ZeroMQ's
* format, i.e., IP:port, where IP is the connected process.
* @return 1 connection sets up successfully; 0 otherwise
*/
virtual int Connect(std::string endpoint);
virtual int Send(Msg** msg);
virtual Msg* Receive();
virtual void* InternalID() const{
return dealer_;
}
int Connect(const std::string& endpoint);
int Send(Msg** msg) override;
Msg* Receive() override;
void* InternalID() const override;

protected:
int id_;
zsock_t* dealer_;
zpoller_t* poller_;
int id_ = -1;
#ifdef USE_ZMQ
zsock_t* dealer_ = nullptr;
zpoller_t* poller_ = nullptr;
#endif
};

class Router : public Socket{
class Router : public SocketInterface {
public:
virtual ~Router();
Router();
/**
* Constructor.
*
* There is only one router per procs, hence its local id is 0 and is not set
* explicitly.
*
* @param bufsize buffer at most this number of messages
* @param bufsize Buffer at most this number of messages
*/
explicit Router(int bufsize);
~Router() override;
/**
* Setup the connection with dealers.
*
* It automatically binds to the endpoint for intra-process communication,
* i.e., "inproc://router".
*
* @param endpoint The identifier for the Dealer socket in other process
* to connect. It has the format IP:Port, where IP is the host machine.
* If endpoint is empty, it means that all connections are
* intra-process connection.
* @return number of connected dealers.
*/
Router(int bufsize=100);
/**
* Setup the connection with dealers.
*
* It automatically binds to the endpoint for intra-process communication,
* i.e., "inproc://router".
*
* @param endpoint the identifier for the Dealer socket in other process
* to connect. It has the format IP:Port, where IP is the host machine.
* If endpoint is empty, it means that all connections are
* intra-process connection.
* @return number of connected dealers.
*/
virtual int Bind(std::string endpoint);
/**
int Bind(const std::string& endpoint);
/**
* If the destination socket has not connected yet, buffer this the message.
*/
virtual int Send(Msg** msg);
virtual Msg* Receive();
virtual void* InternalID() const{
return router_;
}
int Send(Msg** msg) override;
Msg* Receive() override;
void* InternalID() const override;

protected:
zsock_t* router_;
zpoller_t* poller_;
int nBufmsg_ = 0;
int bufsize_ = 100;
#ifdef USE_ZMQ
zsock_t* router_ = nullptr;
zpoller_t* poller_ = nullptr;
std::map<int, zframe_t*> id2addr_;
std::map<int, std::vector<zmsg_t*>> bufmsg_;
int nBufmsg_, bufsize_;
#endif
};

#elif USE_MPI
vector<shared_ptr<SafeQueue>> MPIQueues;
#ifdef USE_MPI
// TODO(wangsheng): add intra-process communication using shared queue
std::vector<SafeQueue*> MPIQueues;
#endif
} /* singa */

#endif // INCLUDE_COMMUNICATION_SOCKET_H_
} // namespace singa

#endif // SINGA_COMMUNICATION_SOCKET_H_
Loading

0 comments on commit b2d7332

Please sign in to comment.