diff options
author | Gabor Dozsa <gabor.dozsa@arm.com> | 2016-01-07 16:33:47 -0600 |
---|---|---|
committer | Gabor Dozsa <gabor.dozsa@arm.com> | 2016-01-07 16:33:47 -0600 |
commit | 5dec4e07b89786aa67ce64aadeeb14c81b3977b3 (patch) | |
tree | 44535119ad1f458cbe2a26b56c8c8377a25fe0ff /util | |
parent | e67749426054d8ddb7f11b53a89741d4808f3acb (diff) | |
download | gem5-5dec4e07b89786aa67ce64aadeeb14c81b3977b3.tar.xz |
dev: Distributed Ethernet link for distributed gem5 simulations
Distributed gem5 (abbreviated dist-gem5) is the result of the
convergence effort between multi-gem5 and pd-gem5 (from Univ. of
Wisconsin). It relies on the base multi-gem5 infrastructure for packet
forwarding, synchronisation and checkpointing but combines those with
the elaborated network switch model from pd-gem5.
--HG--
rename : src/dev/net/multi_etherlink.cc => src/dev/net/dist_etherlink.cc
rename : src/dev/net/multi_etherlink.hh => src/dev/net/dist_etherlink.hh
rename : src/dev/net/multi_iface.cc => src/dev/net/dist_iface.cc
rename : src/dev/net/multi_iface.hh => src/dev/net/dist_iface.hh
rename : src/dev/net/multi_packet.hh => src/dev/net/dist_packet.hh
Diffstat (limited to 'util')
-rw-r--r-- | util/multi/Makefile | 63 | ||||
-rw-r--r-- | util/multi/tcp_server.cc | 463 | ||||
-rw-r--r-- | util/multi/tcp_server.hh | 254 |
3 files changed, 0 insertions, 780 deletions
diff --git a/util/multi/Makefile b/util/multi/Makefile deleted file mode 100644 index a58dd3307..000000000 --- a/util/multi/Makefile +++ /dev/null @@ -1,63 +0,0 @@ -# -# Copyright (c) 2015 ARM Limited -# All rights reserved -# -# The license below extends only to copyright in the software and shall -# not be construed as granting a license to any other intellectual -# property including but not limited to intellectual property relating -# to a hardware implementation of the functionality of the software -# licensed hereunder. You may use the software subject to the license -# terms below provided that you ensure that this notice is replicated -# unmodified and in its entirety in all distributions of the software, -# modified or unmodified, in source code or in binary form. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer; -# redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution; -# neither the name of the copyright holders nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# -# Authors: Gabor Dozsa - -CXX= g++ - -DEBUG= -DDEBUG - -M5_ARCH?= ARM -M5_DIR?= ../.. - -vpath % $(M5_DIR)/build/$(M5_ARCH)/dev - -INCDIRS= -I. -I$(M5_DIR)/build/$(M5_ARCH) -I$(M5_DIR)/ext -CCFLAGS= -g -Wall -O3 $(DEBUG) -std=c++11 -MMD $(INCDIRS) - -default: tcp_server - -clean: - @rm -f tcp_server *.o *.d *~ - -tcp_server: tcp_server.o multi_packet.o - $(CXX) $(LFLAGS) -o $@ $^ - -%.o: %.cc - @echo '$(CXX) $(CCFLAGS) -c $(notdir $<) -o $@' - @$(CXX) $(CCFLAGS) -c $< -o $@ - --include *.d diff --git a/util/multi/tcp_server.cc b/util/multi/tcp_server.cc deleted file mode 100644 index 1ec4303d3..000000000 --- a/util/multi/tcp_server.cc +++ /dev/null @@ -1,463 +0,0 @@ -/* - * Copyright (c) 2015 ARM Limited - * All rights reserved - * - * The license below extends only to copyright in the software and shall - * not be construed as granting a license to any other intellectual - * property including but not limited to intellectual property relating - * to a hardware implementation of the functionality of the software - * licensed hereunder. You may use the software subject to the license - * terms below provided that you ensure that this notice is replicated - * unmodified and in its entirety in all distributions of the software, - * modified or unmodified, in source code or in binary form. - * - * Copyright (c) 2008 The Regents of The University of Michigan - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer; - * redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution; - * neither the name of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * Authors: Gabor Dozsa - */ - - -/* @file - * Message server implementation using TCP stream sockets for parallel gem5 - * runs. - */ -#include <arpa/inet.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <unistd.h> - -#include <cstdio> -#include <cstdlib> - -#include "tcp_server.hh" - -using namespace std; - -// Some basic macros for information and error reporting. -#define PRINTF(...) fprintf(stderr, __VA_ARGS__) - -#ifdef DEBUG -static bool debugSetup = true; -static bool debugPeriodic = false; -static bool debugSync = true; -static bool debugPkt = false; -#define DPRINTF(v,...) if (v) PRINTF(__VA_ARGS__) -#else -#define DPRINTF(v,...) -#endif - -#define inform(...) do { PRINTF("info: "); \ - PRINTF(__VA_ARGS__); } while(0) - -#define panic(...) do { PRINTF("panic: "); \ - PRINTF(__VA_ARGS__); \ - PRINTF("\n[%s:%s], line %d\n", \ - __FUNCTION__, __FILE__, __LINE__); \ - exit(-1); } while(0) - -TCPServer *TCPServer::instance = nullptr; - -TCPServer::Channel::Channel() : fd(-1), isAlive(false), state(SyncState::idle) -{ - MultiHeaderPkt::clearAddress(address); -} - -unsigned -TCPServer::Channel::recvRaw(void *buf, unsigned size) const -{ - ssize_t n; - // This is a blocking receive. - n = recv(fd, buf, size, MSG_WAITALL); - - if (n < 0) - panic("read() failed:%s", strerror(errno)); - else if (n > 0 && n < size) - // the recv() call should wait for the full message - panic("read() failed"); - - return n; -} - -void -TCPServer::Channel::sendRaw(const void *buf, unsigned size) const -{ - ssize_t n; - n = send(fd, buf, size, MSG_NOSIGNAL); - if (n < 0) - panic("write() failed:%s", strerror(errno)); - else if (n != size) - panic("write() failed"); -} - -void TCPServer::Channel::updateAddress(const AddressType &new_address) -{ - // check if the known address has changed (e.g. the client reconfigured - // its Ethernet NIC) - if (MultiHeaderPkt::isAddressEqual(address, new_address)) - return; - - // So we have to update the address. Note that we always - // store the same address as key in the map but the ordering - // may change so we need to erase and re-insert it again. - auto info = TCPServer::instance->addressMap.find(&address); - if (info != TCPServer::instance->addressMap.end()) { - TCPServer::instance->addressMap.erase(info); - } - - MultiHeaderPkt::copyAddress(address, new_address); - TCPServer::instance->addressMap[&address] = this; -} - -void -TCPServer::Channel::headerPktIn() -{ - ssize_t n; - Header hdr_pkt; - - n = recvRaw(&hdr_pkt, sizeof(hdr_pkt)); - - if (n == 0) { - // EOF - nothing to do here, we will handle this as a POLLRDHUP event - // in the main loop. - return; - } - - if (hdr_pkt.msgType == MsgType::dataDescriptor) { - updateAddress(hdr_pkt.srcAddress); - TCPServer::instance->xferData(hdr_pkt, *this); - } else { - processCmd(hdr_pkt.msgType, hdr_pkt.sendTick); - } -} - -void TCPServer::Channel::processCmd(MsgType cmd, Tick send_tick) -{ - switch (cmd) { - case MsgType::cmdAtomicSyncReq: - DPRINTF(debugSync,"Atomic sync request (rank:%d)\n",rank); - assert(state == SyncState::idle); - state = SyncState::atomic; - TCPServer::instance->syncTryComplete(SyncState::atomic, - MsgType::cmdAtomicSyncAck); - break; - case MsgType::cmdPeriodicSyncReq: - DPRINTF(debugPeriodic,"PERIODIC sync request (at %ld)\n",send_tick); - // sanity check - if (TCPServer::instance->periodicSyncTick() == 0) { - TCPServer::instance->periodicSyncTick(send_tick); - } else if ( TCPServer::instance->periodicSyncTick() != send_tick) { - panic("Out of order periodic sync request - rank:%d " - "(send_tick:%ld ongoing:%ld)", rank, send_tick, - TCPServer::instance->periodicSyncTick()); - } - switch (state) { - case SyncState::idle: - state = SyncState::periodic; - TCPServer::instance->syncTryComplete(SyncState::periodic, - MsgType::cmdPeriodicSyncAck); - break; - case SyncState::asyncCkpt: - // An async ckpt request has already been sent to this client and - // that will interrupt this periodic sync. We can simply drop this - // message. - break; - default: - panic("Unexpected state for periodic sync request (rank:%d)", - rank); - break; - } - break; - case MsgType::cmdCkptSyncReq: - DPRINTF(debugSync, "CKPT sync request (rank:%d)\n",rank); - switch (state) { - case SyncState::idle: - TCPServer::instance->ckptPropagate(*this); - // we fall through here to complete #clients==1 case - case SyncState::asyncCkpt: - state = SyncState::ckpt; - TCPServer::instance->syncTryComplete(SyncState::ckpt, - MsgType::cmdCkptSyncAck); - break; - default: - panic("Unexpected state for ckpt sync request (rank:%d)", rank); - break; - } - break; - default: - panic("Unexpected header packet (rank:%d)",rank); - break; - } -} - -TCPServer::TCPServer(unsigned clients_num, - unsigned listen_port, - int timeout_in_sec) -{ - assert(instance == nullptr); - construct(clients_num, listen_port, timeout_in_sec); - instance = this; -} - -TCPServer::~TCPServer() -{ - for (auto &c : clientsPollFd) - close(c.fd); -} - -void -TCPServer::construct(unsigned clients_num, unsigned port, int timeout_in_sec) -{ - int listen_sock, new_sock, ret; - unsigned client_len; - struct sockaddr_in server_addr, client_addr; - struct pollfd new_pollfd; - Channel new_channel; - - DPRINTF(debugSetup, "Start listening on port %u ...\n", port); - - listen_sock = socket(AF_INET, SOCK_STREAM, 0); - if (listen_sock < 0) - panic("socket() failed:%s", strerror(errno)); - - bzero(&server_addr, sizeof(server_addr)); - server_addr.sin_family = AF_INET; - server_addr.sin_addr.s_addr = INADDR_ANY; - server_addr.sin_port = htons(port); - if (bind(listen_sock, (struct sockaddr *) &server_addr, - sizeof(server_addr)) < 0) - panic("bind() failed:%s", strerror(errno)); - listen(listen_sock, 10); - - clientsPollFd.reserve(clients_num); - clientsChannel.reserve(clients_num); - - new_pollfd.events = POLLIN | POLLRDHUP; - new_pollfd.revents = 0; - while (clientsPollFd.size() < clients_num) { - new_pollfd.fd = listen_sock; - ret = poll(&new_pollfd, 1, timeout_in_sec*1000); - if (ret == 0) - panic("Timeout while waiting for clients to connect"); - assert(ret == 1 && new_pollfd.revents == POLLIN); - client_len = sizeof(client_addr); - new_sock = accept(listen_sock, - (struct sockaddr *) &client_addr, - &client_len); - if (new_sock < 0) - panic("accept() failed:%s", strerror(errno)); - new_pollfd.fd = new_sock; - new_pollfd.revents = 0; - clientsPollFd.push_back(new_pollfd); - new_channel.fd = new_sock; - new_channel.isAlive = true; - new_channel.recvRaw(&new_channel.rank, sizeof(new_channel.rank)); - clientsChannel.push_back(new_channel); - - DPRINTF(debugSetup, "New client connection addr:%u port:%hu rank:%d\n", - client_addr.sin_addr.s_addr, client_addr.sin_port, - new_channel.rank); - } - ret = close(listen_sock); - assert(ret == 0); - - DPRINTF(debugSetup, "Setup complete\n"); -} - -void -TCPServer::run() -{ - int nfd; - unsigned num_active_clients = clientsPollFd.size(); - - DPRINTF(debugSetup, "Entering run() loop\n"); - while (num_active_clients == clientsPollFd.size()) { - nfd = poll(&clientsPollFd[0], clientsPollFd.size(), -1); - if (nfd == -1) - panic("poll() failed:%s", strerror(errno)); - - for (unsigned i = 0, n = 0; - i < clientsPollFd.size() && (signed)n < nfd; - i++) { - struct pollfd &pfd = clientsPollFd[i]; - if (pfd.revents) { - if (pfd.revents & POLLERR) - panic("poll() returned POLLERR"); - if (pfd.revents & POLLIN) { - clientsChannel[i].headerPktIn(); - } - if (pfd.revents & POLLRDHUP) { - // One gem5 process exited or aborted. Either way, we - // assume the full simulation should stop now (either - // because m5 exit was called or a serious error - // occurred.) So we quit the run loop here and close all - // sockets to notify the remaining peer gem5 processes. - pfd.events = 0; - clientsChannel[i].isAlive = false; - num_active_clients--; - DPRINTF(debugSetup, "POLLRDHUP event"); - } - n++; - if ((signed)n == nfd) - break; - } - } - } - DPRINTF(debugSetup, "Exiting run() loop\n"); -} - -void -TCPServer::xferData(const Header &hdr_pkt, const Channel &src) -{ - unsigned n; - assert(hdr_pkt.dataPacketLength <= sizeof(packetBuffer)); - n = src.recvRaw(packetBuffer, hdr_pkt.dataPacketLength); - - if (n == 0) - panic("recvRaw() failed"); - DPRINTF(debugPkt, "Incoming data packet (from rank %d) " - "src:0x%02x%02x%02x%02x%02x%02x " - "dst:0x%02x%02x%02x%02x%02x%02x\n", - src.rank, - hdr_pkt.srcAddress[0], - hdr_pkt.srcAddress[1], - hdr_pkt.srcAddress[2], - hdr_pkt.srcAddress[3], - hdr_pkt.srcAddress[4], - hdr_pkt.srcAddress[5], - hdr_pkt.dstAddress[0], - hdr_pkt.dstAddress[1], - hdr_pkt.dstAddress[2], - hdr_pkt.dstAddress[3], - hdr_pkt.dstAddress[4], - hdr_pkt.dstAddress[5]); - // Now try to figure out the destination client(s). - auto dst_info = addressMap.find(&hdr_pkt.dstAddress); - - // First handle the multicast/broadcast or unknonw destination case. These - // all trigger a broadcast of the packet to all clients. - if (MultiHeaderPkt::isUnicastAddress(hdr_pkt.dstAddress) == false || - dst_info == addressMap.end()) { - unsigned n = 0; - for (auto const &c: clientsChannel) { - if (c.isAlive && &c!=&src) { - c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); - c.sendRaw(packetBuffer, hdr_pkt.dataPacketLength); - n++; - } - } - if (n == 0) { - inform("Broadcast/multicast packet dropped\n"); - } - } else { - // It is a unicast address with a known destination - Channel *dst = dst_info->second; - if (dst->isAlive) { - dst->sendRaw(&hdr_pkt, sizeof(hdr_pkt)); - dst->sendRaw(packetBuffer, hdr_pkt.dataPacketLength); - DPRINTF(debugPkt, "Unicast packet sent (to rank %d)\n",dst->rank); - } else { - inform("Unicast packet dropped (destination exited)\n"); - } - } -} - -void -TCPServer::syncTryComplete(SyncState st, MsgType ack) -{ - // Check if the barrieris complete. If so then notify all the clients. - for (auto &c : clientsChannel) { - if (c.isAlive && (c.state != st)) { - // sync not complete yet, stop here - return; - } - } - // Sync complete, send out the acks - MultiHeaderPkt::Header hdr_pkt; - hdr_pkt.msgType = ack; - for (auto &c : clientsChannel) { - if (c.isAlive) { - c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); - c.state = SyncState::idle; - } - } - // Reset periodic send tick - _periodicSyncTick = 0; - DPRINTF(st == SyncState::periodic ? debugPeriodic : debugSync, - "Sync COMPLETE\n"); -} - -void -TCPServer::ckptPropagate(Channel &ch) -{ - // Channel ch got a ckpt request that needs to be propagated to the other - // clients - MultiHeaderPkt::Header hdr_pkt; - hdr_pkt.msgType = MsgType::cmdCkptSyncReq; - for (auto &c : clientsChannel) { - if (c.isAlive && (&c != &ch)) { - switch (c.state) { - case SyncState::idle: - case SyncState::periodic: - c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); - c.state = SyncState::asyncCkpt; - break; - default: - panic("Unexpected state for ckpt sync request propagation " - "(rank:%d)\n",c.rank); - break; - } - } - } -} - -int main(int argc, char *argv[]) -{ - TCPServer *server; - int clients_num = -1, listen_port = -1; - int first_arg = 1, timeout_in_sec = 60; - - if (argc > 1 && string(argv[1]).compare("-debug") == 0) { - timeout_in_sec = -1; - first_arg++; - argc--; - } - - if (argc != 3) - panic("We need two command line args (number of clients and tcp listen" - " port"); - - clients_num = atoi(argv[first_arg]); - listen_port = atoi(argv[first_arg + 1]); - - server = new TCPServer(clients_num, listen_port, timeout_in_sec); - - server->run(); - - delete server; - - return 0; -} diff --git a/util/multi/tcp_server.hh b/util/multi/tcp_server.hh deleted file mode 100644 index c5f2a9ce8..000000000 --- a/util/multi/tcp_server.hh +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Copyright (c) 2015 ARM Limited - * All rights reserved - * - * The license below extends only to copyright in the software and shall - * not be construed as granting a license to any other intellectual - * property including but not limited to intellectual property relating - * to a hardware implementation of the functionality of the software - * licensed hereunder. You may use the software subject to the license - * terms below provided that you ensure that this notice is replicated - * unmodified and in its entirety in all distributions of the software, - * modified or unmodified, in source code or in binary form. - * - * Copyright (c) 2008 The Regents of The University of Michigan - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer; - * redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution; - * neither the name of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * Authors: Gabor Dozsa - */ - -/* @file - * Message server using TCP stream sockets for parallel gem5 runs. - * - * For a high level description about multi gem5 see comments in - * header files src/dev/multi_iface.hh and src/dev/tcp_iface.hh. - * - * This file implements the central message server process for multi gem5. - * The server is responsible the following tasks. - * 1. Establishing a TCP socket connection for each gem5 process (clients). - * - * 2. Process data messages coming in from clients. The server checks - * the MAC addresses in the header message and transfers the message - * to the target(s) client(s). - * - * 3. Processing synchronisation related control messages. Synchronisation - * is performed as follows. The server waits for a 'barrier enter' message - * from all the clients. When the last such control message arrives, the - * server sends out a 'barrier leave' control message to all the clients. - * - * 4. Triggers complete termination in case a client exits. A client may - * exit either by calling 'm5 exit' pseudo instruction or due to a fatal - * error. In either case, we assume that the entire multi simulation needs to - * terminate. The server triggers full termination by tearing down the - * open TCP sockets. - * - * The TCPServer class is instantiated as a singleton object. - * - * The server can be built independently from the rest of gem5 (and it is - * architecture agnostic). See the Makefile in the same directory. - * - */ - -#include <poll.h> - -#include <map> -#include <vector> - -#include "dev/etherpkt.hh" -#include "dev/multi_packet.hh" - -/** - * The maximum length of an Ethernet packet (allowing Jumbo frames). - */ -#define MAX_ETH_PACKET_LENGTH 9014 - -class TCPServer -{ - public: - typedef MultiHeaderPkt::AddressType AddressType; - typedef MultiHeaderPkt::Header Header; - typedef MultiHeaderPkt::MsgType MsgType; - - private: - - enum - class SyncState { periodic, ckpt, asyncCkpt, atomic, idle }; - /** - * The Channel class encapsulates all the information about a client - * and its current status. - */ - class Channel - { - private: - /** - * The MAC address of the client. - */ - AddressType address; - - /** - * Update the client MAC address. It is called every time a new data - * packet is to come in. - */ - void updateAddress(const AddressType &new_addr); - /** - * Process an incoming command message. - */ - void processCmd(MultiHeaderPkt::MsgType cmd, Tick send_tick); - - - public: - /** - * TCP stream socket. - */ - int fd; - /** - * Is client connected? - */ - bool isAlive; - /** - * Current state of the channel wrt. multi synchronisation. - */ - SyncState state; - /** - * Multi rank of the client - */ - unsigned rank; - - public: - Channel(); - ~Channel () {} - - - /** - * Receive and process the next incoming header packet. - */ - void headerPktIn(); - /** - * Send raw data to the connected client. - * - * @param data The data to send. - * @param size Size of the data (in bytes). - */ - void sendRaw(const void *data, unsigned size) const; - /** - * Receive raw data from the connected client. - * - * @param buf The buffer to store the incoming data into. - * @param size Size of data to receive (in bytes). - * @return In case of success, it returns size. Zero is returned - * if the socket is already closed by the client. - */ - unsigned recvRaw(void *buf, unsigned size) const; - }; - - /** - * The array of socket descriptors needed by the poll() system call. - */ - std::vector<struct pollfd> clientsPollFd; - /** - * Array holding all clients info. - */ - std::vector<Channel> clientsChannel; - - - /** - * We use a map to select the target client based on the destination - * MAC address. - */ - struct AddressCompare - { - bool operator()(const AddressType *a1, const AddressType *a2) - { - return MultiHeaderPkt::isAddressLess(*a1, *a2); - } - }; - std::map<const AddressType *, Channel *, AddressCompare> addressMap; - - /** - * As we dealt with only one message at a time, we can allocate and re-use - * a single packet buffer (to hold any incoming data packet). - */ - uint8_t packetBuffer[MAX_ETH_PACKET_LENGTH]; - /** - * Send tick of the current periodic sync. It is used for sanity check. - */ - Tick _periodicSyncTick; - /** - * The singleton server object. - */ - static TCPServer *instance; - - /** - * Set up the socket connections to all the clients. - * - * @param listen_port The port we are listening on for new client - * connection requests. - * @param nclients The number of clients to connect to. - * @param timeout Timeout in sec to complete the setup phase - * (i.e. all gem5 establish socket connections) - */ - void construct(unsigned listen_port, unsigned nclients, int timeout); - /** - * Transfer the header and the follow up data packet to the target(s) - * clients. - * - * @param hdr The header message structure. - * @param ch The source channel for the message. - */ - void xferData(const Header &hdr, const Channel &ch); - /** - * Check if the current round of a synchronisation is completed and notify - * the clients if it is so. - * - * @param st The state all channels should have if sync is complete. - * @param ack The type of ack message to send out if the sync is compete. - */ - void syncTryComplete(SyncState st, MultiHeaderPkt::MsgType ack); - /** - * Broadcast a request for checkpoint sync. - * - * @param ch The source channel of the checkpoint sync request. - */ - void ckptPropagate(Channel &ch); - /** - * Setter for current periodic send tick. - */ - void periodicSyncTick(Tick t) { _periodicSyncTick = t; } - /** - * Getter for current periodic send tick. - */ - Tick periodicSyncTick() { return _periodicSyncTick; } - - public: - - TCPServer(unsigned clients_num, unsigned listen_port, int timeout_in_sec); - ~TCPServer(); - - /** - * The main server loop that waits for and processes incoming messages. - */ - void run(); -}; |