diff options
Diffstat (limited to 'src/dev/net/tcp_iface.cc')
-rw-r--r-- | src/dev/net/tcp_iface.cc | 265 |
1 files changed, 225 insertions, 40 deletions
diff --git a/src/dev/net/tcp_iface.cc b/src/dev/net/tcp_iface.cc index 035ec8fd0..38fc7aef2 100644 --- a/src/dev/net/tcp_iface.cc +++ b/src/dev/net/tcp_iface.cc @@ -35,25 +35,35 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * Authors: Gabor Dozsa + * Mohammad Alian */ /* @file - * TCP stream socket based interface class implementation for multi gem5 runs. + * TCP stream socket based interface class implementation for dist-gem5 runs. */ #include "dev/net/tcp_iface.hh" #include <arpa/inet.h> #include <netdb.h> +#include <netinet/tcp.h> #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> #include <cerrno> #include <cstring> +#include <vector> #include "base/types.hh" -#include "debug/MultiEthernet.hh" +#include "debug/DistEthernet.hh" +#include "debug/DistEthernetCmd.hh" +#include "sim/sim_exit.hh" + +#if defined(__FreeBSD__) +#include <netinet/in.h> + +#endif // MSG_NOSIGNAL does not exists on OS X #if defined(__APPLE__) || defined(__MACH__) @@ -64,42 +74,181 @@ using namespace std; +std::vector<std::pair<TCPIface::NodeInfo, int> > TCPIface::nodes; vector<int> TCPIface::sockRegistry; +int TCPIface::fdStatic = -1; +bool TCPIface::anyListening = false; TCPIface::TCPIface(string server_name, unsigned server_port, - unsigned multi_rank, Tick sync_start, Tick sync_repeat, - EventManager *em) : - MultiIface(multi_rank, sync_start, sync_repeat, em) + unsigned dist_rank, unsigned dist_size, + Tick sync_start, Tick sync_repeat, + EventManager *em, bool is_switch, int num_nodes) : + DistIface(dist_rank, dist_size, sync_start, sync_repeat, em, + is_switch, num_nodes), serverName(server_name), + serverPort(server_port), isSwitch(is_switch), listening(false) { - struct addrinfo addr_hint, *addr_results; + if (is_switch && isMaster) { + while (!listen(serverPort)) { + DPRINTF(DistEthernet, "TCPIface(listen): Can't bind port %d\n", + serverPort); + serverPort++; + } + inform("tcp_iface listening on port %d", serverPort); + // Now accept the first connection requests from each compute node and + // store the node info. The compute nodes will then wait for ack + // messages. Ack messages will be sent by initTransport() in the + // appropriate order to make sure that every compute node is always + // connected to the same switch port. + NodeInfo ni; + for (int i = 0; i < size; i++) { + accept(); + DPRINTF(DistEthernet, "First connection, waiting for link info\n"); + if (!recvTCP(sock, &ni, sizeof(ni))) + panic("Failed to receive link info"); + nodes.push_back(make_pair(ni, sock)); + } + } +} + +bool +TCPIface::listen(int port) +{ + if (listening) + panic("Socket already listening!"); + + struct sockaddr_in sockaddr; int ret; - string port_str = to_string(server_port); + fdStatic = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + panic_if(fdStatic < 0, "socket() failed: %s", strerror(errno)); - sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - panic_if(sock < 0, "socket() failed: %s", strerror(errno)); + sockaddr.sin_family = PF_INET; + sockaddr.sin_addr.s_addr = INADDR_ANY; + sockaddr.sin_port = htons(port); + // finally clear sin_zero + memset(&sockaddr.sin_zero, 0, sizeof(sockaddr.sin_zero)); + ret = ::bind(fdStatic, (struct sockaddr *)&sockaddr, sizeof (sockaddr)); - bzero(&addr_hint, sizeof(addr_hint)); - addr_hint.ai_family = AF_INET; - addr_hint.ai_socktype = SOCK_STREAM; - addr_hint.ai_protocol = IPPROTO_TCP; + if (ret != 0) { + if (ret == -1 && errno != EADDRINUSE) + panic("ListenSocket(listen): bind() failed!"); + return false; + } - ret = getaddrinfo(server_name.c_str(), port_str.c_str(), - &addr_hint, &addr_results); - panic_if(ret < 0, "getaddrinf() failed: %s", strerror(errno)); + if (::listen(fdStatic, 24) == -1) { + if (errno != EADDRINUSE) + panic("ListenSocket(listen): listen() failed!"); - DPRINTF(MultiEthernet, "Connecting to %s:%u\n", - server_name.c_str(), port_str.c_str()); + return false; + } - ret = ::connect(sock, (struct sockaddr *)(addr_results->ai_addr), - addr_results->ai_addrlen); - panic_if(ret < 0, "connect() failed: %s", strerror(errno)); + listening = true; + anyListening = true; + return true; +} - freeaddrinfo(addr_results); - // add our socket to the static registry +void +TCPIface::establishConnection() +{ + static unsigned cur_rank = 0; + static unsigned cur_id = 0; + NodeInfo ni; + + if (isSwitch) { + if (cur_id == 0) { // first connection accepted in the ctor already + auto const &iface0 = + find_if(nodes.begin(), nodes.end(), + [](const pair<NodeInfo, int> &cn) -> bool { + return cn.first.rank == cur_rank; + }); + assert(iface0 != nodes.end()); + assert(iface0->first.distIfaceId == 0); + sock = iface0->second; + ni = iface0->first; + } else { // additional connections from the same compute node + accept(); + DPRINTF(DistEthernet, "Next connection, waiting for link info\n"); + if (!recvTCP(sock, &ni, sizeof(ni))) + panic("Failed to receive link info"); + assert(ni.rank == cur_rank); + assert(ni.distIfaceId == cur_id); + } + inform("Link okay (iface:%d -> (node:%d, iface:%d))", + distIfaceId, ni.rank, ni.distIfaceId); + if (ni.distIfaceId < ni.distIfaceNum - 1) { + cur_id++; + } else { + cur_rank++; + cur_id = 0; + } + // send ack + ni.distIfaceId = distIfaceId; + ni.distIfaceNum = distIfaceNum; + sendTCP(sock, &ni, sizeof(ni)); + } else { // this is not a switch + connect(); + // send link info + ni.rank = rank; + ni.distIfaceId = distIfaceId; + ni.distIfaceNum = distIfaceNum; + sendTCP(sock, &ni, sizeof(ni)); + DPRINTF(DistEthernet, "Connected, waiting for ack (distIfaceId:%d\n", + distIfaceId); + if (!recvTCP(sock, &ni, sizeof(ni))) + panic("Failed to receive ack"); + assert(ni.rank == rank); + inform("Link okay (iface:%d -> switch iface:%d)", distIfaceId, + ni.distIfaceId); + } sockRegistry.push_back(sock); - // let the server know who we are - sendTCP(sock, &multi_rank, sizeof(multi_rank)); +} + +void +TCPIface::accept() +{ + struct sockaddr_in sockaddr; + socklen_t slen = sizeof (sockaddr); + sock = ::accept(fdStatic, (struct sockaddr *)&sockaddr, &slen); + if (sock != -1) { + int i = 1; + if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&i, + sizeof(i)) < 0) + warn("ListenSocket(accept): setsockopt() TCP_NODELAY failed!"); + } +} + +void +TCPIface::connect() +{ + struct addrinfo addr_hint, *addr_results; + int ret; + + string port_str = to_string(serverPort); + + sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + panic_if(sock < 0, "socket() failed: %s", strerror(errno)); + + int fl = 1; + if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&fl, sizeof(fl)) < 0) + warn("ConnectSocket(connect): setsockopt() TCP_NODELAY failed!"); + + bzero(&addr_hint, sizeof(addr_hint)); + addr_hint.ai_family = AF_INET; + addr_hint.ai_socktype = SOCK_STREAM; + addr_hint.ai_protocol = IPPROTO_TCP; + + ret = getaddrinfo(serverName.c_str(), port_str.c_str(), + &addr_hint, &addr_results); + panic_if(ret < 0, "getaddrinf() failed: %s", strerror(errno)); + + DPRINTF(DistEthernet, "Connecting to %s:%s\n", + serverName.c_str(), port_str.c_str()); + + ret = ::connect(sock, (struct sockaddr *)(addr_results->ai_addr), + addr_results->ai_addrlen); + panic_if(ret < 0, "connect() failed: %s", strerror(errno)); + + freeaddrinfo(addr_results); } TCPIface::~TCPIface() @@ -111,12 +260,20 @@ TCPIface::~TCPIface() } void -TCPIface::sendTCP(int sock, void *buf, unsigned length) +TCPIface::sendTCP(int sock, const void *buf, unsigned length) { ssize_t ret; ret = ::send(sock, buf, length, MSG_NOSIGNAL); - panic_if(ret < 0, "send() failed: %s", strerror(errno)); + if (ret < 0) { + if (errno == ECONNRESET || errno == EPIPE) { + inform("send(): %s", strerror(errno)); + exit_message("info", 0, "Message server closed connection, " + "simulation is exiting"); + } else { + panic("send() failed: %s", strerror(errno)); + } + } panic_if(ret != length, "send() failed"); } @@ -140,19 +297,47 @@ TCPIface::recvTCP(int sock, void *buf, unsigned length) } void -TCPIface::syncRaw(MultiHeaderPkt::MsgType sync_req, Tick sync_tick) +TCPIface::sendPacket(const Header &header, const EthPacketPtr &packet) +{ + sendTCP(sock, &header, sizeof(header)); + sendTCP(sock, packet->data, packet->length); +} + +void +TCPIface::sendCmd(const Header &header) { - /* - * Barrier is simply implemented by point-to-point messages to the server - * for now. This method is called by only one TCPIface object. - * The server will send back an 'ack' message when it gets the - * sync request from all clients. - */ - MultiHeaderPkt::Header header_pkt; - header_pkt.msgType = sync_req; - header_pkt.sendTick = sync_tick; - - for (auto s : sockRegistry) - sendTCP(s, (void *)&header_pkt, sizeof(header_pkt)); + DPRINTF(DistEthernetCmd, "TCPIface::sendCmd() type: %d\n", + static_cast<int>(header.msgType)); + // Global commands (i.e. sync request) are always sent by the master + // DistIface. The transfer method is simply implemented as point-to-point + // messages for now + for (auto s: sockRegistry) + sendTCP(s, (void*)&header, sizeof(header)); } +bool +TCPIface::recvHeader(Header &header) +{ + bool ret = recvTCP(sock, &header, sizeof(header)); + DPRINTF(DistEthernetCmd, "TCPIface::recvHeader() type: %d ret: %d\n", + static_cast<int>(header.msgType), ret); + return ret; +} + +void +TCPIface::recvPacket(const Header &header, EthPacketPtr &packet) +{ + packet = make_shared<EthPacketData>(header.dataPacketLength); + bool ret = recvTCP(sock, packet->data, header.dataPacketLength); + panic_if(!ret, "Error while reading socket"); + packet->length = header.dataPacketLength; +} + +void +TCPIface::initTransport() +{ + // We cannot setup the conections in the constructor because the number + // of dist interfaces (per process) is unknown until the (simobject) init + // phase. That information is necessary for global connection ordering. + establishConnection(); +} |