diff options
Diffstat (limited to 'util/multi')
-rw-r--r-- | util/multi/Makefile | 63 | ||||
-rw-r--r-- | util/multi/bootscript.rcS | 122 | ||||
-rwxr-xr-x | util/multi/gem5-multi.sh | 275 | ||||
-rw-r--r-- | util/multi/tcp_server.cc | 463 | ||||
-rw-r--r-- | util/multi/tcp_server.hh | 254 |
5 files changed, 1177 insertions, 0 deletions
diff --git a/util/multi/Makefile b/util/multi/Makefile new file mode 100644 index 000000000..a58dd3307 --- /dev/null +++ b/util/multi/Makefile @@ -0,0 +1,63 @@ +# +# Copyright (c) 2015 ARM Limited +# All rights reserved +# +# The license below extends only to copyright in the software and shall +# not be construed as granting a license to any other intellectual +# property including but not limited to intellectual property relating +# to a hardware implementation of the functionality of the software +# licensed hereunder. You may use the software subject to the license +# terms below provided that you ensure that this notice is replicated +# unmodified and in its entirety in all distributions of the software, +# modified or unmodified, in source code or in binary form. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer; +# redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution; +# neither the name of the copyright holders nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# Authors: Gabor Dozsa + +CXX= g++ + +DEBUG= -DDEBUG + +M5_ARCH?= ARM +M5_DIR?= ../.. + +vpath % $(M5_DIR)/build/$(M5_ARCH)/dev + +INCDIRS= -I. -I$(M5_DIR)/build/$(M5_ARCH) -I$(M5_DIR)/ext +CCFLAGS= -g -Wall -O3 $(DEBUG) -std=c++11 -MMD $(INCDIRS) + +default: tcp_server + +clean: + @rm -f tcp_server *.o *.d *~ + +tcp_server: tcp_server.o multi_packet.o + $(CXX) $(LFLAGS) -o $@ $^ + +%.o: %.cc + @echo '$(CXX) $(CCFLAGS) -c $(notdir $<) -o $@' + @$(CXX) $(CCFLAGS) -c $< -o $@ + +-include *.d diff --git a/util/multi/bootscript.rcS b/util/multi/bootscript.rcS new file mode 100644 index 000000000..95736f4b7 --- /dev/null +++ b/util/multi/bootscript.rcS @@ -0,0 +1,122 @@ +#!/bin/bash + + +# +# Copyright (c) 2015 ARM Limited +# All rights reserved +# +# The license below extends only to copyright in the software and shall +# not be construed as granting a license to any other intellectual +# property including but not limited to intellectual property relating +# to a hardware implementation of the functionality of the software +# licensed hereunder. You may use the software subject to the license +# terms below provided that you ensure that this notice is replicated +# unmodified and in its entirety in all distributions of the software, +# modified or unmodified, in source code or in binary form. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer; +# redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution; +# neither the name of the copyright holders nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# Authors: Gabor Dozsa +# +# +# This is an example boot script to use for muti gem5 runs. The important +# task here is to extract the rank and size information from the kernel +# boot args and use those to configure MAC/IP addresses and hostname. +# Then we can kick off our (parallel) workload ... +# +# You are expected to costumize this scipt for your needs (e.g. change +# the command at the end of the scipt to run your tests/workloads. + +source /root/.bashrc +echo "bootscript.rcS is running" + +m='GEM5\_RANK=([0-9]+) GEM5\_SIZE=([0-9]+)' +if [[ $(cat /proc/cmdline) =~ $m ]] +then + MY_RANK=${BASH_REMATCH[1]} + MY_SIZE=${BASH_REMATCH[2]} +else + echo "(E) GEM5_RANK/GEM5_SIZE was not defined in bootargs, exiting ..." + /sbin/m5 abort +fi + +/bin/hostname node${MY_RANK} + +# Keep MAC address assignment simple for now ... +(($MY_RANK>97)) && { echo "(E) Rank must be less than 98"; /sbin/m5 abort; } +((MY_ADDR=MY_RANK+2)) +if (($MY_ADDR<10)) +then + MY_ADDR_PADDED=0${MY_ADDR} +else + MY_ADDR_PADDED=${MY_ADDR} +fi + +/sbin/ifconfig eth0 hw ether 00:90:00:00:00:${MY_ADDR_PADDED} +/sbin/ifconfig eth0 192.168.0.${MY_ADDR} netmask 255.255.255.0 up + +/sbin/ifconfig -a + +# Prepare host lists for mpirun +MY_MPI_HOSTS="192.168.0.2" +for ((i=1; i<MY_SIZE; i++)) +do + MY_MPI_HOSTS+=",192.168.0.$((i+2))" +done + +# Check that Ethernet links work, then take a checkpoint +if [ "$MY_RANK" == "0" ] +then + OLDIFS=$IFS + IFS="," + for i in $MY_MPI_HOSTS + do + ping -c 1 $i || { echo "ping $i failed, exiting ..."; exit -1; } + ssh $i hostname || { echo "ssh $i failed, exiting ..."; exit -1; } + done + IFS=$OLDIFS + /sbin/m5 checkpoint +fi + +# -------------------------------------------- +# ------ Start your tests below ... --------- +# -------------------------------------------- + +if [ "$MY_RANK" == "0" ] +then + echo "MPI test" + #mpirun -H 192.168.0.3,192.168.0.2 hostname + cd /benchmarks + mpirun -H $MY_MPI_HOSTS lulesh/lulesh2.0-mpi -s 5 +else + # This is to avoid other (rank!=0) gem5 processes exiting + # before the test (started by rank 0) completes. When rank 0 completes the + # test it will exit and that will trigger a notification to all the peer + # gem5 peocesses to stop the simulation. + echo "sleep forever..." + while /bin/true + do + sleep 5 + done +fi diff --git a/util/multi/gem5-multi.sh b/util/multi/gem5-multi.sh new file mode 100755 index 000000000..4b4937c90 --- /dev/null +++ b/util/multi/gem5-multi.sh @@ -0,0 +1,275 @@ +#! /bin/bash + +# +# Copyright (c) 2015 ARM Limited +# All rights reserved +# +# The license below extends only to copyright in the software and shall +# not be construed as granting a license to any other intellectual +# property including but not limited to intellectual property relating +# to a hardware implementation of the functionality of the software +# licensed hereunder. You may use the software subject to the license +# terms below provided that you ensure that this notice is replicated +# unmodified and in its entirety in all distributions of the software, +# modified or unmodified, in source code or in binary form. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer; +# redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution; +# neither the name of the copyright holders nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# Authors: Gabor Dozsa + + +# This is a wrapper script to run a multi gem5 simulations. +# See the usage_func() below for hints on how to use it. Also, +# there are some examples in the util/multi directory (e.g. +# see util/multi/test-2nodes-AArch64.sh) +# +# +# Allocated hosts/cores are assumed to be listed in the LSB_MCPU_HOSTS +# environment variable (which is what LSF does by default). +# E.g. LSB_MCPU_HOSTS=\"hname1 2 hname2 4\" means we have altogether 6 slots +# allocated to launch the gem5 processes, 2 of them are on host hname1 +# and 4 of them are on host hname2. +# If LSB_MCPU_HOSTS environment variable is not defined then we launch all +# processes on the localhost. +# +# Each gem5 process are passed in a unique rank ID [0..N-1] via the kernel +# boot params. The total number of gem5 processes is also passed in. +# These values can be used in the boot script to configure the MAC/IP +# addresses - among other things (see util/multi/bootscript.rcS). +# +# Each gem5 process will create an m5out.$GEM5_RANK directory for +# the usual output files. Furthermore, there will be a separate log file +# for each ssh session (we use ssh to start gem5 processes) and one for +# the server. These are called log.$GEM5_RANK and log.server. +# + + +# print help +usage_func () +{ + echo "Usage:$0 [-debug] [-n nnodes] [-s server] [-p port] gem5_exe gem5_args" + echo " -debug : debug mode (start gem5 in gdb)" + echo " nnodes : number of gem5 processes" + echo " server : message server executable" + echo " port : message server listen port" + echo " gem5_exe : gem5 executable (full path required)" + echo " gem5_args: usual gem5 arguments ( m5 options, config script options)" + echo "Note: if no LSF slots allocation is found all proceses are launched on the localhost." +} + + +# Process (optional) command line options + +while true +do + case "x$1" in + x-n|x-nodes) + NNODES=$2 + shift 2 + ;; + x-s|x-server) + TCP_SERVER=$2 + shift 2 + ;; + x-p|x-port) + SERVER_PORT=$2 + shift 2 + ;; + x-debug) + GEM5_DEBUG="-debug" + shift 1 + ;; + *) + break + ;; + esac +done + +# The remaining command line args must be the usual gem5 command +(($# < 2)) && { usage_func; exit -1; } +GEM5_EXE=$1 +shift +GEM5_ARGS="$*" + +# Default values to use (in case they are not defined as command line options) +DEFAULT_TCP_SERVER=$(dirname $0)/../../util/multi/tcp_server +DEFAULT_SERVER_PORT=2200 + +[ -z "$TCP_SERVER" ] && TCP_SERVER=$DEFAULT_TCP_SERVER +[ -z "$SERVER_PORT" ] && SERVER_PORT=$DEFAULT_SERVER_PORT +[ -z "$NNODES" ] && NNODES=2 + + +# Check if all the executables we need exist +[ -x "$TCP_SERVER" ] || { echo "Executable ${TCP_SERVER} not found"; exit 1; } +[ -x "$GEM5_EXE" ] || { echo "Executable ${GEM5_EXE} not found"; exit 1; } + + +declare -a SSH_PIDS +declare -a HOSTS +declare -a NCORES + +# Find out which cluster hosts/slots are allocated or +# use localhost if there is no LSF allocation. +# We assume that allocated slots are listed in the LSB_MCPU_HOSTS +# environment variable in the form: +# host1 nslots1 host2 nslots2 ... +# (This is what LSF does by default.) +NH=0 +[ "x$LSB_MCPU_HOSTS" != "x" ] || LSB_MCPU_HOSTS="localhost $NNODES" +host="" +for hc in $LSB_MCPU_HOSTS +do + if [ "x$host" == "x" ] + then + host=$hc + HOSTS+=($hc) + else + NCORES+=($hc) + ((NH+=hc)) + host="" + fi +done +((NNODES==NH)) || { echo "(E) Number of cluster slots ($NH) and gem5 instances ($N) differ"; exit -1; } +#echo "hosts: ${HOSTS[@]}" +#echo "hosts: ${NCORES[@]}" +#echo ${#HOSTS[@]} + + +# function to clean up and abort if something goes wrong +abort_func () +{ + echo + echo "KILLED $(date)" + # (try to) kill all gem5 processes on all hosts + bname=$(basename $GEM5_EXE) + killall -q $bname + for h in ${HOSTS[@]} + do + ssh $h killall -q $bname + done + sleep 3 + # kill the message server and the watchdog + [ "x$SERVER_PID" != "x" ] && kill $SERVER_PID 2>/dev/null + [ "x$WATCHDOG_PID" != "x" ] && kill $WATCHDOG_PID 2>/dev/null + exit -1 +} + + +# We need a watchdog to trigger full clean up if a gem5 process dies +watchdog_func () +{ + while true + do + sleep 30 + ((NDEAD=0)) + for p in ${SSH_PIDS[*]} + do + kill -0 $p 2>/dev/null || ((NDEAD+=1)) + done + kill -0 $SERVER_PID || ((NDEAD+=1)) + if ((NDEAD>0)) + then + # we may be in the middle of an orderly termination, + # give it some time to complete before reporting abort + sleep 60 + echo -n "(I) (some) gem5 process(es) exited" + abort_func + fi + done +} + +# This function launches the gem5 processes. We use it only to allow launching +# gem5 processes under gdb control (in the foreground) for debugging +start_func () +{ + local N=$1 + local HOST=$2 + local ENV_ARGS=$3 + shift 3 + if [ "x$GEM5_DEBUG" != "x" ] + then + gdb --args "$@" + else + ssh $HOST $ENV_ARGS "$@" &>log.$N & + fi +} + + +# Trigger full clean up in case we are being killed by external signal +trap 'abort_func' INT TERM + +# env args to be passed explicitly to gem5 processes started via ssh +ENV_ARGS="LD_LIBRARY_PATH=$LD_LIBRARY_PATH M5_PATH=$M5_PATH" + +# launch the mesage server and check if it has started okay +$TCP_SERVER $GEM5_DEBUG $NNODES $SERVER_PORT &>log.server & +SERVER_PID=$! +sleep 2 +kill -0 $SERVER_PID || { echo "Failed to start message server"; exit -1; } + +# Now launch all the gem5 processes with ssh. +echo "START $(date)" +n=0 +for ((i=0; i < ${#HOSTS[@]}; i++)) +do + h=${HOSTS[$i]} + for ((j=0; j < ${NCORES[i]}; j++)) + do + echo "starting gem5 on $h ..." + start_func $n $h "$ENV_ARGS" $GEM5_EXE -d $(pwd)/m5out.$n $GEM5_ARGS \ + --multi \ + --multi-rank=$n \ + --multi-server-name=${HOSTS[0]} \ + --multi-server-port=$SERVER_PORT \ + --testsys-toplevel-LinuxArmSystem.boot_osflags="\"GEM5_RANK=$n GEM5_SIZE=$NNODES\"" + SSH_PIDS[$n]=$! + ((n+=1)) + done +done + +[ "x$GEM5_DEBUG" == "x" ] || { kill $SERVER_PID; echo "DEBUG exit"; exit -1; } + +# start watchdog to trigger complete abort (after a grace period) if any +# gem5 process dies +watchdog_func & +WATCHDOG_PID=$! + +# wait for exit statuses +((NFAIL=0)) +for p in ${SSH_PIDS[*]} +do + wait $p || ((NFAIL+=1)) +done +wait $SERVER_PID || ((NFAIL+=1)) + +# all done, let's terminate the watchdog +kill $WATCHDOG_PID 2>/dev/null + +if ((NFAIL==0)) +then + echo "EXIT $(date)" +else + echo "ABORT $(date)" +fi diff --git a/util/multi/tcp_server.cc b/util/multi/tcp_server.cc new file mode 100644 index 000000000..1ec4303d3 --- /dev/null +++ b/util/multi/tcp_server.cc @@ -0,0 +1,463 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Copyright (c) 2008 The Regents of The University of Michigan + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + + +/* @file + * Message server implementation using TCP stream sockets for parallel gem5 + * runs. + */ +#include <arpa/inet.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +#include <cstdio> +#include <cstdlib> + +#include "tcp_server.hh" + +using namespace std; + +// Some basic macros for information and error reporting. +#define PRINTF(...) fprintf(stderr, __VA_ARGS__) + +#ifdef DEBUG +static bool debugSetup = true; +static bool debugPeriodic = false; +static bool debugSync = true; +static bool debugPkt = false; +#define DPRINTF(v,...) if (v) PRINTF(__VA_ARGS__) +#else +#define DPRINTF(v,...) +#endif + +#define inform(...) do { PRINTF("info: "); \ + PRINTF(__VA_ARGS__); } while(0) + +#define panic(...) do { PRINTF("panic: "); \ + PRINTF(__VA_ARGS__); \ + PRINTF("\n[%s:%s], line %d\n", \ + __FUNCTION__, __FILE__, __LINE__); \ + exit(-1); } while(0) + +TCPServer *TCPServer::instance = nullptr; + +TCPServer::Channel::Channel() : fd(-1), isAlive(false), state(SyncState::idle) +{ + MultiHeaderPkt::clearAddress(address); +} + +unsigned +TCPServer::Channel::recvRaw(void *buf, unsigned size) const +{ + ssize_t n; + // This is a blocking receive. + n = recv(fd, buf, size, MSG_WAITALL); + + if (n < 0) + panic("read() failed:%s", strerror(errno)); + else if (n > 0 && n < size) + // the recv() call should wait for the full message + panic("read() failed"); + + return n; +} + +void +TCPServer::Channel::sendRaw(const void *buf, unsigned size) const +{ + ssize_t n; + n = send(fd, buf, size, MSG_NOSIGNAL); + if (n < 0) + panic("write() failed:%s", strerror(errno)); + else if (n != size) + panic("write() failed"); +} + +void TCPServer::Channel::updateAddress(const AddressType &new_address) +{ + // check if the known address has changed (e.g. the client reconfigured + // its Ethernet NIC) + if (MultiHeaderPkt::isAddressEqual(address, new_address)) + return; + + // So we have to update the address. Note that we always + // store the same address as key in the map but the ordering + // may change so we need to erase and re-insert it again. + auto info = TCPServer::instance->addressMap.find(&address); + if (info != TCPServer::instance->addressMap.end()) { + TCPServer::instance->addressMap.erase(info); + } + + MultiHeaderPkt::copyAddress(address, new_address); + TCPServer::instance->addressMap[&address] = this; +} + +void +TCPServer::Channel::headerPktIn() +{ + ssize_t n; + Header hdr_pkt; + + n = recvRaw(&hdr_pkt, sizeof(hdr_pkt)); + + if (n == 0) { + // EOF - nothing to do here, we will handle this as a POLLRDHUP event + // in the main loop. + return; + } + + if (hdr_pkt.msgType == MsgType::dataDescriptor) { + updateAddress(hdr_pkt.srcAddress); + TCPServer::instance->xferData(hdr_pkt, *this); + } else { + processCmd(hdr_pkt.msgType, hdr_pkt.sendTick); + } +} + +void TCPServer::Channel::processCmd(MsgType cmd, Tick send_tick) +{ + switch (cmd) { + case MsgType::cmdAtomicSyncReq: + DPRINTF(debugSync,"Atomic sync request (rank:%d)\n",rank); + assert(state == SyncState::idle); + state = SyncState::atomic; + TCPServer::instance->syncTryComplete(SyncState::atomic, + MsgType::cmdAtomicSyncAck); + break; + case MsgType::cmdPeriodicSyncReq: + DPRINTF(debugPeriodic,"PERIODIC sync request (at %ld)\n",send_tick); + // sanity check + if (TCPServer::instance->periodicSyncTick() == 0) { + TCPServer::instance->periodicSyncTick(send_tick); + } else if ( TCPServer::instance->periodicSyncTick() != send_tick) { + panic("Out of order periodic sync request - rank:%d " + "(send_tick:%ld ongoing:%ld)", rank, send_tick, + TCPServer::instance->periodicSyncTick()); + } + switch (state) { + case SyncState::idle: + state = SyncState::periodic; + TCPServer::instance->syncTryComplete(SyncState::periodic, + MsgType::cmdPeriodicSyncAck); + break; + case SyncState::asyncCkpt: + // An async ckpt request has already been sent to this client and + // that will interrupt this periodic sync. We can simply drop this + // message. + break; + default: + panic("Unexpected state for periodic sync request (rank:%d)", + rank); + break; + } + break; + case MsgType::cmdCkptSyncReq: + DPRINTF(debugSync, "CKPT sync request (rank:%d)\n",rank); + switch (state) { + case SyncState::idle: + TCPServer::instance->ckptPropagate(*this); + // we fall through here to complete #clients==1 case + case SyncState::asyncCkpt: + state = SyncState::ckpt; + TCPServer::instance->syncTryComplete(SyncState::ckpt, + MsgType::cmdCkptSyncAck); + break; + default: + panic("Unexpected state for ckpt sync request (rank:%d)", rank); + break; + } + break; + default: + panic("Unexpected header packet (rank:%d)",rank); + break; + } +} + +TCPServer::TCPServer(unsigned clients_num, + unsigned listen_port, + int timeout_in_sec) +{ + assert(instance == nullptr); + construct(clients_num, listen_port, timeout_in_sec); + instance = this; +} + +TCPServer::~TCPServer() +{ + for (auto &c : clientsPollFd) + close(c.fd); +} + +void +TCPServer::construct(unsigned clients_num, unsigned port, int timeout_in_sec) +{ + int listen_sock, new_sock, ret; + unsigned client_len; + struct sockaddr_in server_addr, client_addr; + struct pollfd new_pollfd; + Channel new_channel; + + DPRINTF(debugSetup, "Start listening on port %u ...\n", port); + + listen_sock = socket(AF_INET, SOCK_STREAM, 0); + if (listen_sock < 0) + panic("socket() failed:%s", strerror(errno)); + + bzero(&server_addr, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = INADDR_ANY; + server_addr.sin_port = htons(port); + if (bind(listen_sock, (struct sockaddr *) &server_addr, + sizeof(server_addr)) < 0) + panic("bind() failed:%s", strerror(errno)); + listen(listen_sock, 10); + + clientsPollFd.reserve(clients_num); + clientsChannel.reserve(clients_num); + + new_pollfd.events = POLLIN | POLLRDHUP; + new_pollfd.revents = 0; + while (clientsPollFd.size() < clients_num) { + new_pollfd.fd = listen_sock; + ret = poll(&new_pollfd, 1, timeout_in_sec*1000); + if (ret == 0) + panic("Timeout while waiting for clients to connect"); + assert(ret == 1 && new_pollfd.revents == POLLIN); + client_len = sizeof(client_addr); + new_sock = accept(listen_sock, + (struct sockaddr *) &client_addr, + &client_len); + if (new_sock < 0) + panic("accept() failed:%s", strerror(errno)); + new_pollfd.fd = new_sock; + new_pollfd.revents = 0; + clientsPollFd.push_back(new_pollfd); + new_channel.fd = new_sock; + new_channel.isAlive = true; + new_channel.recvRaw(&new_channel.rank, sizeof(new_channel.rank)); + clientsChannel.push_back(new_channel); + + DPRINTF(debugSetup, "New client connection addr:%u port:%hu rank:%d\n", + client_addr.sin_addr.s_addr, client_addr.sin_port, + new_channel.rank); + } + ret = close(listen_sock); + assert(ret == 0); + + DPRINTF(debugSetup, "Setup complete\n"); +} + +void +TCPServer::run() +{ + int nfd; + unsigned num_active_clients = clientsPollFd.size(); + + DPRINTF(debugSetup, "Entering run() loop\n"); + while (num_active_clients == clientsPollFd.size()) { + nfd = poll(&clientsPollFd[0], clientsPollFd.size(), -1); + if (nfd == -1) + panic("poll() failed:%s", strerror(errno)); + + for (unsigned i = 0, n = 0; + i < clientsPollFd.size() && (signed)n < nfd; + i++) { + struct pollfd &pfd = clientsPollFd[i]; + if (pfd.revents) { + if (pfd.revents & POLLERR) + panic("poll() returned POLLERR"); + if (pfd.revents & POLLIN) { + clientsChannel[i].headerPktIn(); + } + if (pfd.revents & POLLRDHUP) { + // One gem5 process exited or aborted. Either way, we + // assume the full simulation should stop now (either + // because m5 exit was called or a serious error + // occurred.) So we quit the run loop here and close all + // sockets to notify the remaining peer gem5 processes. + pfd.events = 0; + clientsChannel[i].isAlive = false; + num_active_clients--; + DPRINTF(debugSetup, "POLLRDHUP event"); + } + n++; + if ((signed)n == nfd) + break; + } + } + } + DPRINTF(debugSetup, "Exiting run() loop\n"); +} + +void +TCPServer::xferData(const Header &hdr_pkt, const Channel &src) +{ + unsigned n; + assert(hdr_pkt.dataPacketLength <= sizeof(packetBuffer)); + n = src.recvRaw(packetBuffer, hdr_pkt.dataPacketLength); + + if (n == 0) + panic("recvRaw() failed"); + DPRINTF(debugPkt, "Incoming data packet (from rank %d) " + "src:0x%02x%02x%02x%02x%02x%02x " + "dst:0x%02x%02x%02x%02x%02x%02x\n", + src.rank, + hdr_pkt.srcAddress[0], + hdr_pkt.srcAddress[1], + hdr_pkt.srcAddress[2], + hdr_pkt.srcAddress[3], + hdr_pkt.srcAddress[4], + hdr_pkt.srcAddress[5], + hdr_pkt.dstAddress[0], + hdr_pkt.dstAddress[1], + hdr_pkt.dstAddress[2], + hdr_pkt.dstAddress[3], + hdr_pkt.dstAddress[4], + hdr_pkt.dstAddress[5]); + // Now try to figure out the destination client(s). + auto dst_info = addressMap.find(&hdr_pkt.dstAddress); + + // First handle the multicast/broadcast or unknonw destination case. These + // all trigger a broadcast of the packet to all clients. + if (MultiHeaderPkt::isUnicastAddress(hdr_pkt.dstAddress) == false || + dst_info == addressMap.end()) { + unsigned n = 0; + for (auto const &c: clientsChannel) { + if (c.isAlive && &c!=&src) { + c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); + c.sendRaw(packetBuffer, hdr_pkt.dataPacketLength); + n++; + } + } + if (n == 0) { + inform("Broadcast/multicast packet dropped\n"); + } + } else { + // It is a unicast address with a known destination + Channel *dst = dst_info->second; + if (dst->isAlive) { + dst->sendRaw(&hdr_pkt, sizeof(hdr_pkt)); + dst->sendRaw(packetBuffer, hdr_pkt.dataPacketLength); + DPRINTF(debugPkt, "Unicast packet sent (to rank %d)\n",dst->rank); + } else { + inform("Unicast packet dropped (destination exited)\n"); + } + } +} + +void +TCPServer::syncTryComplete(SyncState st, MsgType ack) +{ + // Check if the barrieris complete. If so then notify all the clients. + for (auto &c : clientsChannel) { + if (c.isAlive && (c.state != st)) { + // sync not complete yet, stop here + return; + } + } + // Sync complete, send out the acks + MultiHeaderPkt::Header hdr_pkt; + hdr_pkt.msgType = ack; + for (auto &c : clientsChannel) { + if (c.isAlive) { + c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); + c.state = SyncState::idle; + } + } + // Reset periodic send tick + _periodicSyncTick = 0; + DPRINTF(st == SyncState::periodic ? debugPeriodic : debugSync, + "Sync COMPLETE\n"); +} + +void +TCPServer::ckptPropagate(Channel &ch) +{ + // Channel ch got a ckpt request that needs to be propagated to the other + // clients + MultiHeaderPkt::Header hdr_pkt; + hdr_pkt.msgType = MsgType::cmdCkptSyncReq; + for (auto &c : clientsChannel) { + if (c.isAlive && (&c != &ch)) { + switch (c.state) { + case SyncState::idle: + case SyncState::periodic: + c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); + c.state = SyncState::asyncCkpt; + break; + default: + panic("Unexpected state for ckpt sync request propagation " + "(rank:%d)\n",c.rank); + break; + } + } + } +} + +int main(int argc, char *argv[]) +{ + TCPServer *server; + int clients_num = -1, listen_port = -1; + int first_arg = 1, timeout_in_sec = 60; + + if (argc > 1 && string(argv[1]).compare("-debug") == 0) { + timeout_in_sec = -1; + first_arg++; + argc--; + } + + if (argc != 3) + panic("We need two command line args (number of clients and tcp listen" + " port"); + + clients_num = atoi(argv[first_arg]); + listen_port = atoi(argv[first_arg + 1]); + + server = new TCPServer(clients_num, listen_port, timeout_in_sec); + + server->run(); + + delete server; + + return 0; +} diff --git a/util/multi/tcp_server.hh b/util/multi/tcp_server.hh new file mode 100644 index 000000000..c5f2a9ce8 --- /dev/null +++ b/util/multi/tcp_server.hh @@ -0,0 +1,254 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Copyright (c) 2008 The Regents of The University of Michigan + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + +/* @file + * Message server using TCP stream sockets for parallel gem5 runs. + * + * For a high level description about multi gem5 see comments in + * header files src/dev/multi_iface.hh and src/dev/tcp_iface.hh. + * + * This file implements the central message server process for multi gem5. + * The server is responsible the following tasks. + * 1. Establishing a TCP socket connection for each gem5 process (clients). + * + * 2. Process data messages coming in from clients. The server checks + * the MAC addresses in the header message and transfers the message + * to the target(s) client(s). + * + * 3. Processing synchronisation related control messages. Synchronisation + * is performed as follows. The server waits for a 'barrier enter' message + * from all the clients. When the last such control message arrives, the + * server sends out a 'barrier leave' control message to all the clients. + * + * 4. Triggers complete termination in case a client exits. A client may + * exit either by calling 'm5 exit' pseudo instruction or due to a fatal + * error. In either case, we assume that the entire multi simulation needs to + * terminate. The server triggers full termination by tearing down the + * open TCP sockets. + * + * The TCPServer class is instantiated as a singleton object. + * + * The server can be built independently from the rest of gem5 (and it is + * architecture agnostic). See the Makefile in the same directory. + * + */ + +#include <poll.h> + +#include <map> +#include <vector> + +#include "dev/etherpkt.hh" +#include "dev/multi_packet.hh" + +/** + * The maximum length of an Ethernet packet (allowing Jumbo frames). + */ +#define MAX_ETH_PACKET_LENGTH 9014 + +class TCPServer +{ + public: + typedef MultiHeaderPkt::AddressType AddressType; + typedef MultiHeaderPkt::Header Header; + typedef MultiHeaderPkt::MsgType MsgType; + + private: + + enum + class SyncState { periodic, ckpt, asyncCkpt, atomic, idle }; + /** + * The Channel class encapsulates all the information about a client + * and its current status. + */ + class Channel + { + private: + /** + * The MAC address of the client. + */ + AddressType address; + + /** + * Update the client MAC address. It is called every time a new data + * packet is to come in. + */ + void updateAddress(const AddressType &new_addr); + /** + * Process an incoming command message. + */ + void processCmd(MultiHeaderPkt::MsgType cmd, Tick send_tick); + + + public: + /** + * TCP stream socket. + */ + int fd; + /** + * Is client connected? + */ + bool isAlive; + /** + * Current state of the channel wrt. multi synchronisation. + */ + SyncState state; + /** + * Multi rank of the client + */ + unsigned rank; + + public: + Channel(); + ~Channel () {} + + + /** + * Receive and process the next incoming header packet. + */ + void headerPktIn(); + /** + * Send raw data to the connected client. + * + * @param data The data to send. + * @param size Size of the data (in bytes). + */ + void sendRaw(const void *data, unsigned size) const; + /** + * Receive raw data from the connected client. + * + * @param buf The buffer to store the incoming data into. + * @param size Size of data to receive (in bytes). + * @return In case of success, it returns size. Zero is returned + * if the socket is already closed by the client. + */ + unsigned recvRaw(void *buf, unsigned size) const; + }; + + /** + * The array of socket descriptors needed by the poll() system call. + */ + std::vector<struct pollfd> clientsPollFd; + /** + * Array holding all clients info. + */ + std::vector<Channel> clientsChannel; + + + /** + * We use a map to select the target client based on the destination + * MAC address. + */ + struct AddressCompare + { + bool operator()(const AddressType *a1, const AddressType *a2) + { + return MultiHeaderPkt::isAddressLess(*a1, *a2); + } + }; + std::map<const AddressType *, Channel *, AddressCompare> addressMap; + + /** + * As we dealt with only one message at a time, we can allocate and re-use + * a single packet buffer (to hold any incoming data packet). + */ + uint8_t packetBuffer[MAX_ETH_PACKET_LENGTH]; + /** + * Send tick of the current periodic sync. It is used for sanity check. + */ + Tick _periodicSyncTick; + /** + * The singleton server object. + */ + static TCPServer *instance; + + /** + * Set up the socket connections to all the clients. + * + * @param listen_port The port we are listening on for new client + * connection requests. + * @param nclients The number of clients to connect to. + * @param timeout Timeout in sec to complete the setup phase + * (i.e. all gem5 establish socket connections) + */ + void construct(unsigned listen_port, unsigned nclients, int timeout); + /** + * Transfer the header and the follow up data packet to the target(s) + * clients. + * + * @param hdr The header message structure. + * @param ch The source channel for the message. + */ + void xferData(const Header &hdr, const Channel &ch); + /** + * Check if the current round of a synchronisation is completed and notify + * the clients if it is so. + * + * @param st The state all channels should have if sync is complete. + * @param ack The type of ack message to send out if the sync is compete. + */ + void syncTryComplete(SyncState st, MultiHeaderPkt::MsgType ack); + /** + * Broadcast a request for checkpoint sync. + * + * @param ch The source channel of the checkpoint sync request. + */ + void ckptPropagate(Channel &ch); + /** + * Setter for current periodic send tick. + */ + void periodicSyncTick(Tick t) { _periodicSyncTick = t; } + /** + * Getter for current periodic send tick. + */ + Tick periodicSyncTick() { return _periodicSyncTick; } + + public: + + TCPServer(unsigned clients_num, unsigned listen_port, int timeout_in_sec); + ~TCPServer(); + + /** + * The main server loop that waits for and processes incoming messages. + */ + void run(); +}; |