summaryrefslogtreecommitdiff
path: root/util
diff options
context:
space:
mode:
authorGabor Dozsa <gabor.dozsa@arm.com>2015-07-15 19:53:50 -0500
committerGabor Dozsa <gabor.dozsa@arm.com>2015-07-15 19:53:50 -0500
commitfc5bf6713f191047e07f33a788d099b2bbd9faf4 (patch)
treea6af111bc06faaf3bd98a6b6ede0af6b6ff0ab2f /util
parent541066091949dc91e07874c262b0b5b740718d01 (diff)
downloadgem5-fc5bf6713f191047e07f33a788d099b2bbd9faf4.tar.xz
dev: add support for multi gem5 runs
Multi gem5 is an extension to gem5 to enable parallel simulation of a distributed system (e.g. simulation of a pool of machines connected by Ethernet links). A multi gem5 run consists of seperate gem5 processes running in parallel (potentially on different hosts/slots on a cluster). Each gem5 process executes the simulation of a component of the simulated distributed system (e.g. a multi-core board with an Ethernet NIC). The patch implements the "distributed" Ethernet link device (dev/src/multi_etherlink.[hh.cc]). This device will send/receive (simulated) Ethernet packets to/from peer gem5 processes. The interface to talk to the peer gem5 processes is defined in dev/src/multi_iface.hh and in tcp_iface.hh. There is also a central message server process (util/multi/tcp_server.[hh,cc]) which acts like an Ethernet switch and transfers messages among the gem5 peers. A multi gem5 simulations can be kicked off by the util/multi/gem5-multi.sh wrapper script. Checkpoints are supported by multi-gem5. The checkpoint must be initiated by a single gem5 process. E.g., the gem5 process with rank 0 can take a checkpoint from the bootscript just before it invokes 'mpirun' to launch an MPI test. The message server process will notify all the other peer gem5 processes and make them take a checkpoint, too (after completing a global synchronisation to ensure that there are no inflight messages among gem5).
Diffstat (limited to 'util')
-rw-r--r--util/multi/Makefile63
-rw-r--r--util/multi/bootscript.rcS122
-rwxr-xr-xutil/multi/gem5-multi.sh275
-rw-r--r--util/multi/tcp_server.cc463
-rw-r--r--util/multi/tcp_server.hh254
5 files changed, 1177 insertions, 0 deletions
diff --git a/util/multi/Makefile b/util/multi/Makefile
new file mode 100644
index 000000000..a58dd3307
--- /dev/null
+++ b/util/multi/Makefile
@@ -0,0 +1,63 @@
+#
+# Copyright (c) 2015 ARM Limited
+# All rights reserved
+#
+# The license below extends only to copyright in the software and shall
+# not be construed as granting a license to any other intellectual
+# property including but not limited to intellectual property relating
+# to a hardware implementation of the functionality of the software
+# licensed hereunder. You may use the software subject to the license
+# terms below provided that you ensure that this notice is replicated
+# unmodified and in its entirety in all distributions of the software,
+# modified or unmodified, in source code or in binary form.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met: redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer;
+# redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution;
+# neither the name of the copyright holders nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+# Authors: Gabor Dozsa
+
+CXX= g++
+
+DEBUG= -DDEBUG
+
+M5_ARCH?= ARM
+M5_DIR?= ../..
+
+vpath % $(M5_DIR)/build/$(M5_ARCH)/dev
+
+INCDIRS= -I. -I$(M5_DIR)/build/$(M5_ARCH) -I$(M5_DIR)/ext
+CCFLAGS= -g -Wall -O3 $(DEBUG) -std=c++11 -MMD $(INCDIRS)
+
+default: tcp_server
+
+clean:
+ @rm -f tcp_server *.o *.d *~
+
+tcp_server: tcp_server.o multi_packet.o
+ $(CXX) $(LFLAGS) -o $@ $^
+
+%.o: %.cc
+ @echo '$(CXX) $(CCFLAGS) -c $(notdir $<) -o $@'
+ @$(CXX) $(CCFLAGS) -c $< -o $@
+
+-include *.d
diff --git a/util/multi/bootscript.rcS b/util/multi/bootscript.rcS
new file mode 100644
index 000000000..95736f4b7
--- /dev/null
+++ b/util/multi/bootscript.rcS
@@ -0,0 +1,122 @@
+#!/bin/bash
+
+
+#
+# Copyright (c) 2015 ARM Limited
+# All rights reserved
+#
+# The license below extends only to copyright in the software and shall
+# not be construed as granting a license to any other intellectual
+# property including but not limited to intellectual property relating
+# to a hardware implementation of the functionality of the software
+# licensed hereunder. You may use the software subject to the license
+# terms below provided that you ensure that this notice is replicated
+# unmodified and in its entirety in all distributions of the software,
+# modified or unmodified, in source code or in binary form.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met: redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer;
+# redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution;
+# neither the name of the copyright holders nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+# Authors: Gabor Dozsa
+#
+#
+# This is an example boot script to use for muti gem5 runs. The important
+# task here is to extract the rank and size information from the kernel
+# boot args and use those to configure MAC/IP addresses and hostname.
+# Then we can kick off our (parallel) workload ...
+#
+# You are expected to costumize this scipt for your needs (e.g. change
+# the command at the end of the scipt to run your tests/workloads.
+
+source /root/.bashrc
+echo "bootscript.rcS is running"
+
+m='GEM5\_RANK=([0-9]+) GEM5\_SIZE=([0-9]+)'
+if [[ $(cat /proc/cmdline) =~ $m ]]
+then
+ MY_RANK=${BASH_REMATCH[1]}
+ MY_SIZE=${BASH_REMATCH[2]}
+else
+ echo "(E) GEM5_RANK/GEM5_SIZE was not defined in bootargs, exiting ..."
+ /sbin/m5 abort
+fi
+
+/bin/hostname node${MY_RANK}
+
+# Keep MAC address assignment simple for now ...
+(($MY_RANK>97)) && { echo "(E) Rank must be less than 98"; /sbin/m5 abort; }
+((MY_ADDR=MY_RANK+2))
+if (($MY_ADDR<10))
+then
+ MY_ADDR_PADDED=0${MY_ADDR}
+else
+ MY_ADDR_PADDED=${MY_ADDR}
+fi
+
+/sbin/ifconfig eth0 hw ether 00:90:00:00:00:${MY_ADDR_PADDED}
+/sbin/ifconfig eth0 192.168.0.${MY_ADDR} netmask 255.255.255.0 up
+
+/sbin/ifconfig -a
+
+# Prepare host lists for mpirun
+MY_MPI_HOSTS="192.168.0.2"
+for ((i=1; i<MY_SIZE; i++))
+do
+ MY_MPI_HOSTS+=",192.168.0.$((i+2))"
+done
+
+# Check that Ethernet links work, then take a checkpoint
+if [ "$MY_RANK" == "0" ]
+then
+ OLDIFS=$IFS
+ IFS=","
+ for i in $MY_MPI_HOSTS
+ do
+ ping -c 1 $i || { echo "ping $i failed, exiting ..."; exit -1; }
+ ssh $i hostname || { echo "ssh $i failed, exiting ..."; exit -1; }
+ done
+ IFS=$OLDIFS
+ /sbin/m5 checkpoint
+fi
+
+# --------------------------------------------
+# ------ Start your tests below ... ---------
+# --------------------------------------------
+
+if [ "$MY_RANK" == "0" ]
+then
+ echo "MPI test"
+ #mpirun -H 192.168.0.3,192.168.0.2 hostname
+ cd /benchmarks
+ mpirun -H $MY_MPI_HOSTS lulesh/lulesh2.0-mpi -s 5
+else
+ # This is to avoid other (rank!=0) gem5 processes exiting
+ # before the test (started by rank 0) completes. When rank 0 completes the
+ # test it will exit and that will trigger a notification to all the peer
+ # gem5 peocesses to stop the simulation.
+ echo "sleep forever..."
+ while /bin/true
+ do
+ sleep 5
+ done
+fi
diff --git a/util/multi/gem5-multi.sh b/util/multi/gem5-multi.sh
new file mode 100755
index 000000000..4b4937c90
--- /dev/null
+++ b/util/multi/gem5-multi.sh
@@ -0,0 +1,275 @@
+#! /bin/bash
+
+#
+# Copyright (c) 2015 ARM Limited
+# All rights reserved
+#
+# The license below extends only to copyright in the software and shall
+# not be construed as granting a license to any other intellectual
+# property including but not limited to intellectual property relating
+# to a hardware implementation of the functionality of the software
+# licensed hereunder. You may use the software subject to the license
+# terms below provided that you ensure that this notice is replicated
+# unmodified and in its entirety in all distributions of the software,
+# modified or unmodified, in source code or in binary form.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met: redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer;
+# redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution;
+# neither the name of the copyright holders nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+# Authors: Gabor Dozsa
+
+
+# This is a wrapper script to run a multi gem5 simulations.
+# See the usage_func() below for hints on how to use it. Also,
+# there are some examples in the util/multi directory (e.g.
+# see util/multi/test-2nodes-AArch64.sh)
+#
+#
+# Allocated hosts/cores are assumed to be listed in the LSB_MCPU_HOSTS
+# environment variable (which is what LSF does by default).
+# E.g. LSB_MCPU_HOSTS=\"hname1 2 hname2 4\" means we have altogether 6 slots
+# allocated to launch the gem5 processes, 2 of them are on host hname1
+# and 4 of them are on host hname2.
+# If LSB_MCPU_HOSTS environment variable is not defined then we launch all
+# processes on the localhost.
+#
+# Each gem5 process are passed in a unique rank ID [0..N-1] via the kernel
+# boot params. The total number of gem5 processes is also passed in.
+# These values can be used in the boot script to configure the MAC/IP
+# addresses - among other things (see util/multi/bootscript.rcS).
+#
+# Each gem5 process will create an m5out.$GEM5_RANK directory for
+# the usual output files. Furthermore, there will be a separate log file
+# for each ssh session (we use ssh to start gem5 processes) and one for
+# the server. These are called log.$GEM5_RANK and log.server.
+#
+
+
+# print help
+usage_func ()
+{
+ echo "Usage:$0 [-debug] [-n nnodes] [-s server] [-p port] gem5_exe gem5_args"
+ echo " -debug : debug mode (start gem5 in gdb)"
+ echo " nnodes : number of gem5 processes"
+ echo " server : message server executable"
+ echo " port : message server listen port"
+ echo " gem5_exe : gem5 executable (full path required)"
+ echo " gem5_args: usual gem5 arguments ( m5 options, config script options)"
+ echo "Note: if no LSF slots allocation is found all proceses are launched on the localhost."
+}
+
+
+# Process (optional) command line options
+
+while true
+do
+ case "x$1" in
+ x-n|x-nodes)
+ NNODES=$2
+ shift 2
+ ;;
+ x-s|x-server)
+ TCP_SERVER=$2
+ shift 2
+ ;;
+ x-p|x-port)
+ SERVER_PORT=$2
+ shift 2
+ ;;
+ x-debug)
+ GEM5_DEBUG="-debug"
+ shift 1
+ ;;
+ *)
+ break
+ ;;
+ esac
+done
+
+# The remaining command line args must be the usual gem5 command
+(($# < 2)) && { usage_func; exit -1; }
+GEM5_EXE=$1
+shift
+GEM5_ARGS="$*"
+
+# Default values to use (in case they are not defined as command line options)
+DEFAULT_TCP_SERVER=$(dirname $0)/../../util/multi/tcp_server
+DEFAULT_SERVER_PORT=2200
+
+[ -z "$TCP_SERVER" ] && TCP_SERVER=$DEFAULT_TCP_SERVER
+[ -z "$SERVER_PORT" ] && SERVER_PORT=$DEFAULT_SERVER_PORT
+[ -z "$NNODES" ] && NNODES=2
+
+
+# Check if all the executables we need exist
+[ -x "$TCP_SERVER" ] || { echo "Executable ${TCP_SERVER} not found"; exit 1; }
+[ -x "$GEM5_EXE" ] || { echo "Executable ${GEM5_EXE} not found"; exit 1; }
+
+
+declare -a SSH_PIDS
+declare -a HOSTS
+declare -a NCORES
+
+# Find out which cluster hosts/slots are allocated or
+# use localhost if there is no LSF allocation.
+# We assume that allocated slots are listed in the LSB_MCPU_HOSTS
+# environment variable in the form:
+# host1 nslots1 host2 nslots2 ...
+# (This is what LSF does by default.)
+NH=0
+[ "x$LSB_MCPU_HOSTS" != "x" ] || LSB_MCPU_HOSTS="localhost $NNODES"
+host=""
+for hc in $LSB_MCPU_HOSTS
+do
+ if [ "x$host" == "x" ]
+ then
+ host=$hc
+ HOSTS+=($hc)
+ else
+ NCORES+=($hc)
+ ((NH+=hc))
+ host=""
+ fi
+done
+((NNODES==NH)) || { echo "(E) Number of cluster slots ($NH) and gem5 instances ($N) differ"; exit -1; }
+#echo "hosts: ${HOSTS[@]}"
+#echo "hosts: ${NCORES[@]}"
+#echo ${#HOSTS[@]}
+
+
+# function to clean up and abort if something goes wrong
+abort_func ()
+{
+ echo
+ echo "KILLED $(date)"
+ # (try to) kill all gem5 processes on all hosts
+ bname=$(basename $GEM5_EXE)
+ killall -q $bname
+ for h in ${HOSTS[@]}
+ do
+ ssh $h killall -q $bname
+ done
+ sleep 3
+ # kill the message server and the watchdog
+ [ "x$SERVER_PID" != "x" ] && kill $SERVER_PID 2>/dev/null
+ [ "x$WATCHDOG_PID" != "x" ] && kill $WATCHDOG_PID 2>/dev/null
+ exit -1
+}
+
+
+# We need a watchdog to trigger full clean up if a gem5 process dies
+watchdog_func ()
+{
+ while true
+ do
+ sleep 30
+ ((NDEAD=0))
+ for p in ${SSH_PIDS[*]}
+ do
+ kill -0 $p 2>/dev/null || ((NDEAD+=1))
+ done
+ kill -0 $SERVER_PID || ((NDEAD+=1))
+ if ((NDEAD>0))
+ then
+ # we may be in the middle of an orderly termination,
+ # give it some time to complete before reporting abort
+ sleep 60
+ echo -n "(I) (some) gem5 process(es) exited"
+ abort_func
+ fi
+ done
+}
+
+# This function launches the gem5 processes. We use it only to allow launching
+# gem5 processes under gdb control (in the foreground) for debugging
+start_func ()
+{
+ local N=$1
+ local HOST=$2
+ local ENV_ARGS=$3
+ shift 3
+ if [ "x$GEM5_DEBUG" != "x" ]
+ then
+ gdb --args "$@"
+ else
+ ssh $HOST $ENV_ARGS "$@" &>log.$N &
+ fi
+}
+
+
+# Trigger full clean up in case we are being killed by external signal
+trap 'abort_func' INT TERM
+
+# env args to be passed explicitly to gem5 processes started via ssh
+ENV_ARGS="LD_LIBRARY_PATH=$LD_LIBRARY_PATH M5_PATH=$M5_PATH"
+
+# launch the mesage server and check if it has started okay
+$TCP_SERVER $GEM5_DEBUG $NNODES $SERVER_PORT &>log.server &
+SERVER_PID=$!
+sleep 2
+kill -0 $SERVER_PID || { echo "Failed to start message server"; exit -1; }
+
+# Now launch all the gem5 processes with ssh.
+echo "START $(date)"
+n=0
+for ((i=0; i < ${#HOSTS[@]}; i++))
+do
+ h=${HOSTS[$i]}
+ for ((j=0; j < ${NCORES[i]}; j++))
+ do
+ echo "starting gem5 on $h ..."
+ start_func $n $h "$ENV_ARGS" $GEM5_EXE -d $(pwd)/m5out.$n $GEM5_ARGS \
+ --multi \
+ --multi-rank=$n \
+ --multi-server-name=${HOSTS[0]} \
+ --multi-server-port=$SERVER_PORT \
+ --testsys-toplevel-LinuxArmSystem.boot_osflags="\"GEM5_RANK=$n GEM5_SIZE=$NNODES\""
+ SSH_PIDS[$n]=$!
+ ((n+=1))
+ done
+done
+
+[ "x$GEM5_DEBUG" == "x" ] || { kill $SERVER_PID; echo "DEBUG exit"; exit -1; }
+
+# start watchdog to trigger complete abort (after a grace period) if any
+# gem5 process dies
+watchdog_func &
+WATCHDOG_PID=$!
+
+# wait for exit statuses
+((NFAIL=0))
+for p in ${SSH_PIDS[*]}
+do
+ wait $p || ((NFAIL+=1))
+done
+wait $SERVER_PID || ((NFAIL+=1))
+
+# all done, let's terminate the watchdog
+kill $WATCHDOG_PID 2>/dev/null
+
+if ((NFAIL==0))
+then
+ echo "EXIT $(date)"
+else
+ echo "ABORT $(date)"
+fi
diff --git a/util/multi/tcp_server.cc b/util/multi/tcp_server.cc
new file mode 100644
index 000000000..1ec4303d3
--- /dev/null
+++ b/util/multi/tcp_server.cc
@@ -0,0 +1,463 @@
+/*
+ * Copyright (c) 2015 ARM Limited
+ * All rights reserved
+ *
+ * The license below extends only to copyright in the software and shall
+ * not be construed as granting a license to any other intellectual
+ * property including but not limited to intellectual property relating
+ * to a hardware implementation of the functionality of the software
+ * licensed hereunder. You may use the software subject to the license
+ * terms below provided that you ensure that this notice is replicated
+ * unmodified and in its entirety in all distributions of the software,
+ * modified or unmodified, in source code or in binary form.
+ *
+ * Copyright (c) 2008 The Regents of The University of Michigan
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met: redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer;
+ * redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution;
+ * neither the name of the copyright holders nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Authors: Gabor Dozsa
+ */
+
+
+/* @file
+ * Message server implementation using TCP stream sockets for parallel gem5
+ * runs.
+ */
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <cstdio>
+#include <cstdlib>
+
+#include "tcp_server.hh"
+
+using namespace std;
+
+// Some basic macros for information and error reporting.
+#define PRINTF(...) fprintf(stderr, __VA_ARGS__)
+
+#ifdef DEBUG
+static bool debugSetup = true;
+static bool debugPeriodic = false;
+static bool debugSync = true;
+static bool debugPkt = false;
+#define DPRINTF(v,...) if (v) PRINTF(__VA_ARGS__)
+#else
+#define DPRINTF(v,...)
+#endif
+
+#define inform(...) do { PRINTF("info: "); \
+ PRINTF(__VA_ARGS__); } while(0)
+
+#define panic(...) do { PRINTF("panic: "); \
+ PRINTF(__VA_ARGS__); \
+ PRINTF("\n[%s:%s], line %d\n", \
+ __FUNCTION__, __FILE__, __LINE__); \
+ exit(-1); } while(0)
+
+TCPServer *TCPServer::instance = nullptr;
+
+TCPServer::Channel::Channel() : fd(-1), isAlive(false), state(SyncState::idle)
+{
+ MultiHeaderPkt::clearAddress(address);
+}
+
+unsigned
+TCPServer::Channel::recvRaw(void *buf, unsigned size) const
+{
+ ssize_t n;
+ // This is a blocking receive.
+ n = recv(fd, buf, size, MSG_WAITALL);
+
+ if (n < 0)
+ panic("read() failed:%s", strerror(errno));
+ else if (n > 0 && n < size)
+ // the recv() call should wait for the full message
+ panic("read() failed");
+
+ return n;
+}
+
+void
+TCPServer::Channel::sendRaw(const void *buf, unsigned size) const
+{
+ ssize_t n;
+ n = send(fd, buf, size, MSG_NOSIGNAL);
+ if (n < 0)
+ panic("write() failed:%s", strerror(errno));
+ else if (n != size)
+ panic("write() failed");
+}
+
+void TCPServer::Channel::updateAddress(const AddressType &new_address)
+{
+ // check if the known address has changed (e.g. the client reconfigured
+ // its Ethernet NIC)
+ if (MultiHeaderPkt::isAddressEqual(address, new_address))
+ return;
+
+ // So we have to update the address. Note that we always
+ // store the same address as key in the map but the ordering
+ // may change so we need to erase and re-insert it again.
+ auto info = TCPServer::instance->addressMap.find(&address);
+ if (info != TCPServer::instance->addressMap.end()) {
+ TCPServer::instance->addressMap.erase(info);
+ }
+
+ MultiHeaderPkt::copyAddress(address, new_address);
+ TCPServer::instance->addressMap[&address] = this;
+}
+
+void
+TCPServer::Channel::headerPktIn()
+{
+ ssize_t n;
+ Header hdr_pkt;
+
+ n = recvRaw(&hdr_pkt, sizeof(hdr_pkt));
+
+ if (n == 0) {
+ // EOF - nothing to do here, we will handle this as a POLLRDHUP event
+ // in the main loop.
+ return;
+ }
+
+ if (hdr_pkt.msgType == MsgType::dataDescriptor) {
+ updateAddress(hdr_pkt.srcAddress);
+ TCPServer::instance->xferData(hdr_pkt, *this);
+ } else {
+ processCmd(hdr_pkt.msgType, hdr_pkt.sendTick);
+ }
+}
+
+void TCPServer::Channel::processCmd(MsgType cmd, Tick send_tick)
+{
+ switch (cmd) {
+ case MsgType::cmdAtomicSyncReq:
+ DPRINTF(debugSync,"Atomic sync request (rank:%d)\n",rank);
+ assert(state == SyncState::idle);
+ state = SyncState::atomic;
+ TCPServer::instance->syncTryComplete(SyncState::atomic,
+ MsgType::cmdAtomicSyncAck);
+ break;
+ case MsgType::cmdPeriodicSyncReq:
+ DPRINTF(debugPeriodic,"PERIODIC sync request (at %ld)\n",send_tick);
+ // sanity check
+ if (TCPServer::instance->periodicSyncTick() == 0) {
+ TCPServer::instance->periodicSyncTick(send_tick);
+ } else if ( TCPServer::instance->periodicSyncTick() != send_tick) {
+ panic("Out of order periodic sync request - rank:%d "
+ "(send_tick:%ld ongoing:%ld)", rank, send_tick,
+ TCPServer::instance->periodicSyncTick());
+ }
+ switch (state) {
+ case SyncState::idle:
+ state = SyncState::periodic;
+ TCPServer::instance->syncTryComplete(SyncState::periodic,
+ MsgType::cmdPeriodicSyncAck);
+ break;
+ case SyncState::asyncCkpt:
+ // An async ckpt request has already been sent to this client and
+ // that will interrupt this periodic sync. We can simply drop this
+ // message.
+ break;
+ default:
+ panic("Unexpected state for periodic sync request (rank:%d)",
+ rank);
+ break;
+ }
+ break;
+ case MsgType::cmdCkptSyncReq:
+ DPRINTF(debugSync, "CKPT sync request (rank:%d)\n",rank);
+ switch (state) {
+ case SyncState::idle:
+ TCPServer::instance->ckptPropagate(*this);
+ // we fall through here to complete #clients==1 case
+ case SyncState::asyncCkpt:
+ state = SyncState::ckpt;
+ TCPServer::instance->syncTryComplete(SyncState::ckpt,
+ MsgType::cmdCkptSyncAck);
+ break;
+ default:
+ panic("Unexpected state for ckpt sync request (rank:%d)", rank);
+ break;
+ }
+ break;
+ default:
+ panic("Unexpected header packet (rank:%d)",rank);
+ break;
+ }
+}
+
+TCPServer::TCPServer(unsigned clients_num,
+ unsigned listen_port,
+ int timeout_in_sec)
+{
+ assert(instance == nullptr);
+ construct(clients_num, listen_port, timeout_in_sec);
+ instance = this;
+}
+
+TCPServer::~TCPServer()
+{
+ for (auto &c : clientsPollFd)
+ close(c.fd);
+}
+
+void
+TCPServer::construct(unsigned clients_num, unsigned port, int timeout_in_sec)
+{
+ int listen_sock, new_sock, ret;
+ unsigned client_len;
+ struct sockaddr_in server_addr, client_addr;
+ struct pollfd new_pollfd;
+ Channel new_channel;
+
+ DPRINTF(debugSetup, "Start listening on port %u ...\n", port);
+
+ listen_sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (listen_sock < 0)
+ panic("socket() failed:%s", strerror(errno));
+
+ bzero(&server_addr, sizeof(server_addr));
+ server_addr.sin_family = AF_INET;
+ server_addr.sin_addr.s_addr = INADDR_ANY;
+ server_addr.sin_port = htons(port);
+ if (bind(listen_sock, (struct sockaddr *) &server_addr,
+ sizeof(server_addr)) < 0)
+ panic("bind() failed:%s", strerror(errno));
+ listen(listen_sock, 10);
+
+ clientsPollFd.reserve(clients_num);
+ clientsChannel.reserve(clients_num);
+
+ new_pollfd.events = POLLIN | POLLRDHUP;
+ new_pollfd.revents = 0;
+ while (clientsPollFd.size() < clients_num) {
+ new_pollfd.fd = listen_sock;
+ ret = poll(&new_pollfd, 1, timeout_in_sec*1000);
+ if (ret == 0)
+ panic("Timeout while waiting for clients to connect");
+ assert(ret == 1 && new_pollfd.revents == POLLIN);
+ client_len = sizeof(client_addr);
+ new_sock = accept(listen_sock,
+ (struct sockaddr *) &client_addr,
+ &client_len);
+ if (new_sock < 0)
+ panic("accept() failed:%s", strerror(errno));
+ new_pollfd.fd = new_sock;
+ new_pollfd.revents = 0;
+ clientsPollFd.push_back(new_pollfd);
+ new_channel.fd = new_sock;
+ new_channel.isAlive = true;
+ new_channel.recvRaw(&new_channel.rank, sizeof(new_channel.rank));
+ clientsChannel.push_back(new_channel);
+
+ DPRINTF(debugSetup, "New client connection addr:%u port:%hu rank:%d\n",
+ client_addr.sin_addr.s_addr, client_addr.sin_port,
+ new_channel.rank);
+ }
+ ret = close(listen_sock);
+ assert(ret == 0);
+
+ DPRINTF(debugSetup, "Setup complete\n");
+}
+
+void
+TCPServer::run()
+{
+ int nfd;
+ unsigned num_active_clients = clientsPollFd.size();
+
+ DPRINTF(debugSetup, "Entering run() loop\n");
+ while (num_active_clients == clientsPollFd.size()) {
+ nfd = poll(&clientsPollFd[0], clientsPollFd.size(), -1);
+ if (nfd == -1)
+ panic("poll() failed:%s", strerror(errno));
+
+ for (unsigned i = 0, n = 0;
+ i < clientsPollFd.size() && (signed)n < nfd;
+ i++) {
+ struct pollfd &pfd = clientsPollFd[i];
+ if (pfd.revents) {
+ if (pfd.revents & POLLERR)
+ panic("poll() returned POLLERR");
+ if (pfd.revents & POLLIN) {
+ clientsChannel[i].headerPktIn();
+ }
+ if (pfd.revents & POLLRDHUP) {
+ // One gem5 process exited or aborted. Either way, we
+ // assume the full simulation should stop now (either
+ // because m5 exit was called or a serious error
+ // occurred.) So we quit the run loop here and close all
+ // sockets to notify the remaining peer gem5 processes.
+ pfd.events = 0;
+ clientsChannel[i].isAlive = false;
+ num_active_clients--;
+ DPRINTF(debugSetup, "POLLRDHUP event");
+ }
+ n++;
+ if ((signed)n == nfd)
+ break;
+ }
+ }
+ }
+ DPRINTF(debugSetup, "Exiting run() loop\n");
+}
+
+void
+TCPServer::xferData(const Header &hdr_pkt, const Channel &src)
+{
+ unsigned n;
+ assert(hdr_pkt.dataPacketLength <= sizeof(packetBuffer));
+ n = src.recvRaw(packetBuffer, hdr_pkt.dataPacketLength);
+
+ if (n == 0)
+ panic("recvRaw() failed");
+ DPRINTF(debugPkt, "Incoming data packet (from rank %d) "
+ "src:0x%02x%02x%02x%02x%02x%02x "
+ "dst:0x%02x%02x%02x%02x%02x%02x\n",
+ src.rank,
+ hdr_pkt.srcAddress[0],
+ hdr_pkt.srcAddress[1],
+ hdr_pkt.srcAddress[2],
+ hdr_pkt.srcAddress[3],
+ hdr_pkt.srcAddress[4],
+ hdr_pkt.srcAddress[5],
+ hdr_pkt.dstAddress[0],
+ hdr_pkt.dstAddress[1],
+ hdr_pkt.dstAddress[2],
+ hdr_pkt.dstAddress[3],
+ hdr_pkt.dstAddress[4],
+ hdr_pkt.dstAddress[5]);
+ // Now try to figure out the destination client(s).
+ auto dst_info = addressMap.find(&hdr_pkt.dstAddress);
+
+ // First handle the multicast/broadcast or unknonw destination case. These
+ // all trigger a broadcast of the packet to all clients.
+ if (MultiHeaderPkt::isUnicastAddress(hdr_pkt.dstAddress) == false ||
+ dst_info == addressMap.end()) {
+ unsigned n = 0;
+ for (auto const &c: clientsChannel) {
+ if (c.isAlive && &c!=&src) {
+ c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
+ c.sendRaw(packetBuffer, hdr_pkt.dataPacketLength);
+ n++;
+ }
+ }
+ if (n == 0) {
+ inform("Broadcast/multicast packet dropped\n");
+ }
+ } else {
+ // It is a unicast address with a known destination
+ Channel *dst = dst_info->second;
+ if (dst->isAlive) {
+ dst->sendRaw(&hdr_pkt, sizeof(hdr_pkt));
+ dst->sendRaw(packetBuffer, hdr_pkt.dataPacketLength);
+ DPRINTF(debugPkt, "Unicast packet sent (to rank %d)\n",dst->rank);
+ } else {
+ inform("Unicast packet dropped (destination exited)\n");
+ }
+ }
+}
+
+void
+TCPServer::syncTryComplete(SyncState st, MsgType ack)
+{
+ // Check if the barrieris complete. If so then notify all the clients.
+ for (auto &c : clientsChannel) {
+ if (c.isAlive && (c.state != st)) {
+ // sync not complete yet, stop here
+ return;
+ }
+ }
+ // Sync complete, send out the acks
+ MultiHeaderPkt::Header hdr_pkt;
+ hdr_pkt.msgType = ack;
+ for (auto &c : clientsChannel) {
+ if (c.isAlive) {
+ c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
+ c.state = SyncState::idle;
+ }
+ }
+ // Reset periodic send tick
+ _periodicSyncTick = 0;
+ DPRINTF(st == SyncState::periodic ? debugPeriodic : debugSync,
+ "Sync COMPLETE\n");
+}
+
+void
+TCPServer::ckptPropagate(Channel &ch)
+{
+ // Channel ch got a ckpt request that needs to be propagated to the other
+ // clients
+ MultiHeaderPkt::Header hdr_pkt;
+ hdr_pkt.msgType = MsgType::cmdCkptSyncReq;
+ for (auto &c : clientsChannel) {
+ if (c.isAlive && (&c != &ch)) {
+ switch (c.state) {
+ case SyncState::idle:
+ case SyncState::periodic:
+ c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
+ c.state = SyncState::asyncCkpt;
+ break;
+ default:
+ panic("Unexpected state for ckpt sync request propagation "
+ "(rank:%d)\n",c.rank);
+ break;
+ }
+ }
+ }
+}
+
+int main(int argc, char *argv[])
+{
+ TCPServer *server;
+ int clients_num = -1, listen_port = -1;
+ int first_arg = 1, timeout_in_sec = 60;
+
+ if (argc > 1 && string(argv[1]).compare("-debug") == 0) {
+ timeout_in_sec = -1;
+ first_arg++;
+ argc--;
+ }
+
+ if (argc != 3)
+ panic("We need two command line args (number of clients and tcp listen"
+ " port");
+
+ clients_num = atoi(argv[first_arg]);
+ listen_port = atoi(argv[first_arg + 1]);
+
+ server = new TCPServer(clients_num, listen_port, timeout_in_sec);
+
+ server->run();
+
+ delete server;
+
+ return 0;
+}
diff --git a/util/multi/tcp_server.hh b/util/multi/tcp_server.hh
new file mode 100644
index 000000000..c5f2a9ce8
--- /dev/null
+++ b/util/multi/tcp_server.hh
@@ -0,0 +1,254 @@
+/*
+ * Copyright (c) 2015 ARM Limited
+ * All rights reserved
+ *
+ * The license below extends only to copyright in the software and shall
+ * not be construed as granting a license to any other intellectual
+ * property including but not limited to intellectual property relating
+ * to a hardware implementation of the functionality of the software
+ * licensed hereunder. You may use the software subject to the license
+ * terms below provided that you ensure that this notice is replicated
+ * unmodified and in its entirety in all distributions of the software,
+ * modified or unmodified, in source code or in binary form.
+ *
+ * Copyright (c) 2008 The Regents of The University of Michigan
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met: redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer;
+ * redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution;
+ * neither the name of the copyright holders nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Authors: Gabor Dozsa
+ */
+
+/* @file
+ * Message server using TCP stream sockets for parallel gem5 runs.
+ *
+ * For a high level description about multi gem5 see comments in
+ * header files src/dev/multi_iface.hh and src/dev/tcp_iface.hh.
+ *
+ * This file implements the central message server process for multi gem5.
+ * The server is responsible the following tasks.
+ * 1. Establishing a TCP socket connection for each gem5 process (clients).
+ *
+ * 2. Process data messages coming in from clients. The server checks
+ * the MAC addresses in the header message and transfers the message
+ * to the target(s) client(s).
+ *
+ * 3. Processing synchronisation related control messages. Synchronisation
+ * is performed as follows. The server waits for a 'barrier enter' message
+ * from all the clients. When the last such control message arrives, the
+ * server sends out a 'barrier leave' control message to all the clients.
+ *
+ * 4. Triggers complete termination in case a client exits. A client may
+ * exit either by calling 'm5 exit' pseudo instruction or due to a fatal
+ * error. In either case, we assume that the entire multi simulation needs to
+ * terminate. The server triggers full termination by tearing down the
+ * open TCP sockets.
+ *
+ * The TCPServer class is instantiated as a singleton object.
+ *
+ * The server can be built independently from the rest of gem5 (and it is
+ * architecture agnostic). See the Makefile in the same directory.
+ *
+ */
+
+#include <poll.h>
+
+#include <map>
+#include <vector>
+
+#include "dev/etherpkt.hh"
+#include "dev/multi_packet.hh"
+
+/**
+ * The maximum length of an Ethernet packet (allowing Jumbo frames).
+ */
+#define MAX_ETH_PACKET_LENGTH 9014
+
+class TCPServer
+{
+ public:
+ typedef MultiHeaderPkt::AddressType AddressType;
+ typedef MultiHeaderPkt::Header Header;
+ typedef MultiHeaderPkt::MsgType MsgType;
+
+ private:
+
+ enum
+ class SyncState { periodic, ckpt, asyncCkpt, atomic, idle };
+ /**
+ * The Channel class encapsulates all the information about a client
+ * and its current status.
+ */
+ class Channel
+ {
+ private:
+ /**
+ * The MAC address of the client.
+ */
+ AddressType address;
+
+ /**
+ * Update the client MAC address. It is called every time a new data
+ * packet is to come in.
+ */
+ void updateAddress(const AddressType &new_addr);
+ /**
+ * Process an incoming command message.
+ */
+ void processCmd(MultiHeaderPkt::MsgType cmd, Tick send_tick);
+
+
+ public:
+ /**
+ * TCP stream socket.
+ */
+ int fd;
+ /**
+ * Is client connected?
+ */
+ bool isAlive;
+ /**
+ * Current state of the channel wrt. multi synchronisation.
+ */
+ SyncState state;
+ /**
+ * Multi rank of the client
+ */
+ unsigned rank;
+
+ public:
+ Channel();
+ ~Channel () {}
+
+
+ /**
+ * Receive and process the next incoming header packet.
+ */
+ void headerPktIn();
+ /**
+ * Send raw data to the connected client.
+ *
+ * @param data The data to send.
+ * @param size Size of the data (in bytes).
+ */
+ void sendRaw(const void *data, unsigned size) const;
+ /**
+ * Receive raw data from the connected client.
+ *
+ * @param buf The buffer to store the incoming data into.
+ * @param size Size of data to receive (in bytes).
+ * @return In case of success, it returns size. Zero is returned
+ * if the socket is already closed by the client.
+ */
+ unsigned recvRaw(void *buf, unsigned size) const;
+ };
+
+ /**
+ * The array of socket descriptors needed by the poll() system call.
+ */
+ std::vector<struct pollfd> clientsPollFd;
+ /**
+ * Array holding all clients info.
+ */
+ std::vector<Channel> clientsChannel;
+
+
+ /**
+ * We use a map to select the target client based on the destination
+ * MAC address.
+ */
+ struct AddressCompare
+ {
+ bool operator()(const AddressType *a1, const AddressType *a2)
+ {
+ return MultiHeaderPkt::isAddressLess(*a1, *a2);
+ }
+ };
+ std::map<const AddressType *, Channel *, AddressCompare> addressMap;
+
+ /**
+ * As we dealt with only one message at a time, we can allocate and re-use
+ * a single packet buffer (to hold any incoming data packet).
+ */
+ uint8_t packetBuffer[MAX_ETH_PACKET_LENGTH];
+ /**
+ * Send tick of the current periodic sync. It is used for sanity check.
+ */
+ Tick _periodicSyncTick;
+ /**
+ * The singleton server object.
+ */
+ static TCPServer *instance;
+
+ /**
+ * Set up the socket connections to all the clients.
+ *
+ * @param listen_port The port we are listening on for new client
+ * connection requests.
+ * @param nclients The number of clients to connect to.
+ * @param timeout Timeout in sec to complete the setup phase
+ * (i.e. all gem5 establish socket connections)
+ */
+ void construct(unsigned listen_port, unsigned nclients, int timeout);
+ /**
+ * Transfer the header and the follow up data packet to the target(s)
+ * clients.
+ *
+ * @param hdr The header message structure.
+ * @param ch The source channel for the message.
+ */
+ void xferData(const Header &hdr, const Channel &ch);
+ /**
+ * Check if the current round of a synchronisation is completed and notify
+ * the clients if it is so.
+ *
+ * @param st The state all channels should have if sync is complete.
+ * @param ack The type of ack message to send out if the sync is compete.
+ */
+ void syncTryComplete(SyncState st, MultiHeaderPkt::MsgType ack);
+ /**
+ * Broadcast a request for checkpoint sync.
+ *
+ * @param ch The source channel of the checkpoint sync request.
+ */
+ void ckptPropagate(Channel &ch);
+ /**
+ * Setter for current periodic send tick.
+ */
+ void periodicSyncTick(Tick t) { _periodicSyncTick = t; }
+ /**
+ * Getter for current periodic send tick.
+ */
+ Tick periodicSyncTick() { return _periodicSyncTick; }
+
+ public:
+
+ TCPServer(unsigned clients_num, unsigned listen_port, int timeout_in_sec);
+ ~TCPServer();
+
+ /**
+ * The main server loop that waits for and processes incoming messages.
+ */
+ void run();
+};