summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabor Dozsa <gabor.dozsa@arm.com>2016-01-07 16:33:47 -0600
committerGabor Dozsa <gabor.dozsa@arm.com>2016-01-07 16:33:47 -0600
commit5dec4e07b89786aa67ce64aadeeb14c81b3977b3 (patch)
tree44535119ad1f458cbe2a26b56c8c8377a25fe0ff
parente67749426054d8ddb7f11b53a89741d4808f3acb (diff)
downloadgem5-5dec4e07b89786aa67ce64aadeeb14c81b3977b3.tar.xz
dev: Distributed Ethernet link for distributed gem5 simulations
Distributed gem5 (abbreviated dist-gem5) is the result of the convergence effort between multi-gem5 and pd-gem5 (from Univ. of Wisconsin). It relies on the base multi-gem5 infrastructure for packet forwarding, synchronisation and checkpointing but combines those with the elaborated network switch model from pd-gem5. --HG-- rename : src/dev/net/multi_etherlink.cc => src/dev/net/dist_etherlink.cc rename : src/dev/net/multi_etherlink.hh => src/dev/net/dist_etherlink.hh rename : src/dev/net/multi_iface.cc => src/dev/net/dist_iface.cc rename : src/dev/net/multi_iface.hh => src/dev/net/dist_iface.hh rename : src/dev/net/multi_packet.hh => src/dev/net/dist_packet.hh
-rw-r--r--src/dev/net/Ethernet.py15
-rw-r--r--src/dev/net/SConscript12
-rw-r--r--src/dev/net/dist_etherlink.cc (renamed from src/dev/net/multi_etherlink.cc)139
-rw-r--r--src/dev/net/dist_etherlink.hh (renamed from src/dev/net/multi_etherlink.hh)73
-rw-r--r--src/dev/net/dist_iface.cc808
-rw-r--r--src/dev/net/dist_iface.hh595
-rw-r--r--src/dev/net/dist_packet.hh (renamed from src/dev/net/multi_packet.hh)90
-rw-r--r--src/dev/net/etherpkt.cc16
-rw-r--r--src/dev/net/etherpkt.hh12
-rw-r--r--src/dev/net/multi_iface.cc622
-rw-r--r--src/dev/net/multi_iface.hh492
-rw-r--r--src/dev/net/tcp_iface.cc265
-rw-r--r--src/dev/net/tcp_iface.hh77
-rw-r--r--src/sim/global_event.hh2
-rw-r--r--src/sim/initparam_keys.hh (renamed from src/dev/net/multi_packet.cc)82
-rw-r--r--src/sim/pseudo_inst.cc25
-rw-r--r--util/multi/Makefile63
-rw-r--r--util/multi/tcp_server.cc463
-rw-r--r--util/multi/tcp_server.hh254
19 files changed, 1879 insertions, 2226 deletions
diff --git a/src/dev/net/Ethernet.py b/src/dev/net/Ethernet.py
index 9859857a0..5f878ea10 100644
--- a/src/dev/net/Ethernet.py
+++ b/src/dev/net/Ethernet.py
@@ -58,19 +58,22 @@ class EtherLink(EtherObject):
speed = Param.NetworkBandwidth('1Gbps', "link speed")
dump = Param.EtherDump(NULL, "dump object")
-class MultiEtherLink(EtherObject):
- type = 'MultiEtherLink'
- cxx_header = "dev/net/multi_etherlink.hh"
+class DistEtherLink(EtherObject):
+ type = 'DistEtherLink'
+ cxx_header = "dev/net/dist_etherlink.hh"
int0 = SlavePort("interface 0")
delay = Param.Latency('0us', "packet transmit delay")
delay_var = Param.Latency('0ns', "packet transmit delay variability")
speed = Param.NetworkBandwidth('1Gbps', "link speed")
dump = Param.EtherDump(NULL, "dump object")
- multi_rank = Param.UInt32('0', "Rank of the this gem5 process (multi run)")
- sync_start = Param.Latency('5200000000000t', "first multi sync barrier")
- sync_repeat = Param.Latency('10us', "multi sync barrier repeat")
+ dist_rank = Param.UInt32('0', "Rank of this gem5 process (dist run)")
+ dist_size = Param.UInt32('1', "Number of gem5 processes (dist run)")
+ sync_start = Param.Latency('5200000000000t', "first dist sync barrier")
+ sync_repeat = Param.Latency('10us', "dist sync barrier repeat")
server_name = Param.String('localhost', "Message server name")
server_port = Param.UInt32('2200', "Message server port")
+ is_switch = Param.Bool(False, "true if this a link in etherswitch")
+ num_nodes = Param.UInt32('2', "Number of simulate nodes")
class EtherBus(EtherObject):
type = 'EtherBus'
diff --git a/src/dev/net/SConscript b/src/dev/net/SConscript
index f529a1b2a..9ad8eec92 100644
--- a/src/dev/net/SConscript
+++ b/src/dev/net/SConscript
@@ -70,14 +70,14 @@ DebugFlag('EthernetIntr')
DebugFlag('EthernetPIO')
DebugFlag('EthernetSM')
-# Multi gem5
-Source('multi_packet.cc')
-Source('multi_iface.cc')
-Source('multi_etherlink.cc')
+# Dist gem5
+Source('dist_iface.cc')
+Source('dist_etherlink.cc')
Source('tcp_iface.cc')
-DebugFlag('MultiEthernet')
-DebugFlag('MultiEthernetPkt')
+DebugFlag('DistEthernet')
+DebugFlag('DistEthernetPkt')
+DebugFlag('DistEthernetCmd')
# Ethernet controllers
Source('i8254xGBe.cc')
diff --git a/src/dev/net/multi_etherlink.cc b/src/dev/net/dist_etherlink.cc
index cf4300ddf..a793739f8 100644
--- a/src/dev/net/multi_etherlink.cc
+++ b/src/dev/net/dist_etherlink.cc
@@ -38,10 +38,10 @@
*/
/* @file
- * Device module for a full duplex ethernet link for multi gem5 simulations.
+ * Device module for a full duplex ethernet link for dist gem5 simulations.
*/
-#include "dev/net/multi_etherlink.hh"
+#include "dev/net/dist_etherlink.hh"
#include <arpa/inet.h>
#include <sys/socket.h>
@@ -54,15 +54,15 @@
#include "base/random.hh"
#include "base/trace.hh"
+#include "debug/DistEthernet.hh"
+#include "debug/DistEthernetPkt.hh"
#include "debug/EthernetData.hh"
-#include "debug/MultiEthernet.hh"
-#include "debug/MultiEthernetPkt.hh"
+#include "dev/net/dist_iface.hh"
#include "dev/net/etherdump.hh"
#include "dev/net/etherint.hh"
#include "dev/net/etherlink.hh"
#include "dev/net/etherobject.hh"
#include "dev/net/etherpkt.hh"
-#include "dev/net/multi_iface.hh"
#include "dev/net/tcp_iface.hh"
#include "params/EtherLink.hh"
#include "sim/core.hh"
@@ -71,33 +71,45 @@
using namespace std;
-MultiEtherLink::MultiEtherLink(const Params *p)
- : EtherObject(p)
+DistEtherLink::DistEtherLink(const Params *p)
+ : EtherObject(p), linkDelay(p->delay)
{
- DPRINTF(MultiEthernet,"MultiEtherLink::MultiEtherLink() "
- "link delay:%llu\n", p->delay);
+ DPRINTF(DistEthernet,"DistEtherLink::DistEtherLink() "
+ "link delay:%llu ticksPerByte:%f\n", p->delay, p->speed);
txLink = new TxLink(name() + ".link0", this, p->speed, p->delay_var,
p->dump);
rxLink = new RxLink(name() + ".link1", this, p->delay, p->dump);
- // create the multi (TCP) interface to talk to the peer gem5 processes.
- multiIface = new TCPIface(p->server_name, p->server_port, p->multi_rank,
- p->sync_start, p->sync_repeat, this);
+ Tick sync_repeat;
+ if (p->sync_repeat != 0) {
+ if (p->sync_repeat != p->delay)
+ warn("DistEtherLink(): sync_repeat is %lu and linkdelay is %lu",
+ p->sync_repeat, p->delay);
+ sync_repeat = p->sync_repeat;
+ } else {
+ sync_repeat = p->delay;
+ }
+
+ // create the dist (TCP) interface to talk to the peer gem5 processes.
+ distIface = new TCPIface(p->server_name, p->server_port,
+ p->dist_rank, p->dist_size,
+ p->sync_start, sync_repeat, this, p->is_switch,
+ p->num_nodes);
- localIface = new LocalIface(name() + ".int0", txLink, rxLink, multiIface);
+ localIface = new LocalIface(name() + ".int0", txLink, rxLink, distIface);
}
-MultiEtherLink::~MultiEtherLink()
+DistEtherLink::~DistEtherLink()
{
delete txLink;
delete rxLink;
delete localIface;
- delete multiIface;
+ delete distIface;
}
EtherInt*
-MultiEtherLink::getEthPort(const std::string &if_name, int idx)
+DistEtherLink::getEthPort(const std::string &if_name, int idx)
{
if (if_name != "int0") {
return nullptr;
@@ -107,66 +119,55 @@ MultiEtherLink::getEthPort(const std::string &if_name, int idx)
return localIface;
}
-void MultiEtherLink::memWriteback()
-{
- DPRINTF(MultiEthernet,"MultiEtherLink::memWriteback() called\n");
- multiIface->drainDone();
-}
-
void
-MultiEtherLink::serialize(CheckpointOut &cp) const
+DistEtherLink::serialize(CheckpointOut &cp) const
{
- multiIface->serialize("multiIface", cp);
- txLink->serialize("txLink", cp);
- rxLink->serialize("rxLink", cp);
+ distIface->serializeSection(cp, "distIface");
+ txLink->serializeSection(cp, "txLink");
+ rxLink->serializeSection(cp, "rxLink");
}
void
-MultiEtherLink::unserialize(CheckpointIn &cp)
+DistEtherLink::unserialize(CheckpointIn &cp)
{
- multiIface->unserialize("multiIface", cp);
- txLink->unserialize("txLink", cp);
- rxLink->unserialize("rxLink", cp);
+ distIface->unserializeSection(cp, "distIface");
+ txLink->unserializeSection(cp, "txLink");
+ rxLink->unserializeSection(cp, "rxLink");
}
void
-MultiEtherLink::init()
+DistEtherLink::init()
{
- DPRINTF(MultiEthernet,"MultiEtherLink::init() called\n");
- multiIface->initRandom();
+ DPRINTF(DistEthernet,"DistEtherLink::init() called\n");
+ distIface->init(rxLink->doneEvent(), linkDelay);
}
void
-MultiEtherLink::startup()
+DistEtherLink::startup()
{
- DPRINTF(MultiEthernet,"MultiEtherLink::startup() called\n");
- multiIface->startPeriodicSync();
+ DPRINTF(DistEthernet,"DistEtherLink::startup() called\n");
+ distIface->startup();
}
void
-MultiEtherLink::RxLink::setMultiInt(MultiIface *m)
+DistEtherLink::RxLink::setDistInt(DistIface *m)
{
- assert(!multiIface);
- multiIface = m;
- // Spawn a new receiver thread that will process messages
- // coming in from peer gem5 processes.
- // The receive thread will also schedule a (receive) doneEvent
- // for each incoming data packet.
- multiIface->spawnRecvThread(&doneEvent, linkDelay);
+ assert(!distIface);
+ distIface = m;
}
void
-MultiEtherLink::RxLink::rxDone()
+DistEtherLink::RxLink::rxDone()
{
assert(!busy());
// retrieve the packet that triggered the receive done event
- packet = multiIface->packetIn();
+ packet = distIface->packetIn();
if (dump)
dump->dump(packet);
- DPRINTF(MultiEthernetPkt, "MultiEtherLink::MultiLink::rxDone() "
+ DPRINTF(DistEthernetPkt, "DistEtherLink::DistLink::rxDone() "
"packet received: len=%d\n", packet->length);
DDUMP(EthernetData, packet->data, packet->length);
@@ -176,7 +177,7 @@ MultiEtherLink::RxLink::rxDone()
}
void
-MultiEtherLink::TxLink::txDone()
+DistEtherLink::TxLink::txDone()
{
if (dump)
dump->dump(packet);
@@ -188,10 +189,10 @@ MultiEtherLink::TxLink::txDone()
}
bool
-MultiEtherLink::TxLink::transmit(EthPacketPtr pkt)
+DistEtherLink::TxLink::transmit(EthPacketPtr pkt)
{
if (busy()) {
- DPRINTF(MultiEthernet, "packet not sent, link busy\n");
+ DPRINTF(DistEthernet, "packet not sent, link busy\n");
return false;
}
@@ -201,8 +202,8 @@ MultiEtherLink::TxLink::transmit(EthPacketPtr pkt)
delay += random_mt.random<Tick>(0, delayVar);
// send the packet to the peers
- assert(multiIface);
- multiIface->packetOut(pkt, delay);
+ assert(distIface);
+ distIface->packetOut(pkt, delay);
// schedule the send done event
parent->schedule(doneEvent, curTick() + delay);
@@ -211,56 +212,56 @@ MultiEtherLink::TxLink::transmit(EthPacketPtr pkt)
}
void
-MultiEtherLink::Link::serialize(const string &base, CheckpointOut &cp) const
+DistEtherLink::Link::serialize(CheckpointOut &cp) const
{
bool packet_exists = (packet != nullptr);
- paramOut(cp, base + ".packet_exists", packet_exists);
+ SERIALIZE_SCALAR(packet_exists);
if (packet_exists)
- packet->serialize(base + ".packet", cp);
+ packet->serialize("packet", cp);
bool event_scheduled = event->scheduled();
- paramOut(cp, base + ".event_scheduled", event_scheduled);
+ SERIALIZE_SCALAR(event_scheduled);
if (event_scheduled) {
Tick event_time = event->when();
- paramOut(cp, base + ".event_time", event_time);
+ SERIALIZE_SCALAR(event_time);
}
}
void
-MultiEtherLink::Link::unserialize(const string &base, CheckpointIn &cp)
+DistEtherLink::Link::unserialize(CheckpointIn &cp)
{
bool packet_exists;
- paramIn(cp, base + ".packet_exists", packet_exists);
+ UNSERIALIZE_SCALAR(packet_exists);
if (packet_exists) {
packet = make_shared<EthPacketData>(16384);
- packet->unserialize(base + ".packet", cp);
+ packet->unserialize("packet", cp);
}
bool event_scheduled;
- paramIn(cp, base + ".event_scheduled", event_scheduled);
+ UNSERIALIZE_SCALAR(event_scheduled);
if (event_scheduled) {
Tick event_time;
- paramIn(cp, base + ".event_time", event_time);
+ UNSERIALIZE_SCALAR(event_time);
parent->schedule(*event, event_time);
}
}
-MultiEtherLink::LocalIface::LocalIface(const std::string &name,
+DistEtherLink::LocalIface::LocalIface(const std::string &name,
TxLink *tx,
RxLink *rx,
- MultiIface *m) :
+ DistIface *m) :
EtherInt(name), txLink(tx)
{
tx->setLocalInt(this);
rx->setLocalInt(this);
- tx->setMultiInt(m);
- rx->setMultiInt(m);
+ tx->setDistInt(m);
+ rx->setDistInt(m);
}
-MultiEtherLink *
-MultiEtherLinkParams::create()
+DistEtherLink *
+DistEtherLinkParams::create()
{
- return new MultiEtherLink(this);
+ return new DistEtherLink(this);
}
diff --git a/src/dev/net/multi_etherlink.hh b/src/dev/net/dist_etherlink.hh
index 0a3e39bd7..e8218941a 100644
--- a/src/dev/net/multi_etherlink.hh
+++ b/src/dev/net/dist_etherlink.hh
@@ -38,30 +38,30 @@
*/
/* @file
- * Device module for a full duplex ethernet link for multi gem5 simulations.
+ * Device module for a full duplex ethernet link for dist gem5 simulations.
*
- * See comments in dev/multi_iface.hh for a generic description of multi
+ * See comments in dev/net/dist_iface.hh for a generic description of dist
* gem5 simulations.
*
* This class is meant to be a drop in replacement for the EtherLink class for
- * multi gem5 runs.
+ * dist gem5 runs.
*
*/
-#ifndef __DEV_NET_MULTIETHERLINK_HH__
-#define __DEV_NET_MULTIETHERLINK_HH__
+#ifndef __DEV_DIST_ETHERLINK_HH__
+#define __DEV_DIST_ETHERLINK_HH__
#include <iostream>
#include "dev/net/etherlink.hh"
-#include "params/MultiEtherLink.hh"
+#include "params/DistEtherLink.hh"
-class MultiIface;
+class DistIface;
class EthPacketData;
/**
* Model for a fixed bandwidth full duplex ethernet link.
*/
-class MultiEtherLink : public EtherObject
+class DistEtherLink : public EtherObject
{
protected:
class LocalIface;
@@ -72,22 +72,22 @@ class MultiEtherLink : public EtherObject
* The link will encapsulate and transfer Ethernet packets to/from
* the message server.
*/
- class Link
+ class Link : public Serializable
{
protected:
std::string objName;
- MultiEtherLink *parent;
+ DistEtherLink *parent;
LocalIface *localIface;
EtherDump *dump;
- MultiIface *multiIface;
+ DistIface *distIface;
Event *event;
EthPacketPtr packet;
public:
- Link(const std::string &name, MultiEtherLink *p,
+ Link(const std::string &name, DistEtherLink *p,
EtherDump *d, Event *e) :
objName(name), parent(p), localIface(nullptr), dump(d),
- multiIface(nullptr), event(e) {}
+ distIface(nullptr), event(e) {}
~Link() {}
@@ -95,8 +95,8 @@ class MultiEtherLink : public EtherObject
bool busy() const { return (bool)packet; }
void setLocalInt(LocalIface *i) { assert(!localIface); localIface=i; }
- void serialize(const std::string &base, CheckpointOut &cp) const;
- void unserialize(const std::string &base, CheckpointIn &cp);
+ void serialize(CheckpointOut &cp) const override;
+ void unserialize(CheckpointIn &cp) override;
};
/**
@@ -123,17 +123,17 @@ class MultiEtherLink : public EtherObject
DoneEvent doneEvent;
public:
- TxLink(const std::string &name, MultiEtherLink *p,
+ TxLink(const std::string &name, DistEtherLink *p,
double invBW, Tick delay_var, EtherDump *d) :
Link(name, p, d, &doneEvent), ticksPerByte(invBW),
delayVar(delay_var), doneEvent(this) {}
~TxLink() {}
/**
- * Register the multi interface to be used to talk to the
+ * Register the dist interface to be used to talk to the
* peer gem5 processes.
*/
- void setMultiInt(MultiIface *m) { assert(!multiIface); multiIface=m; }
+ void setDistInt(DistIface *m) { assert(!distIface); distIface=m; }
/**
* Initiate sending of a packet via this link.
@@ -158,23 +158,27 @@ class MultiEtherLink : public EtherObject
/**
* Receive done callback method. Called from doneEvent.
*/
- void rxDone() ;
+ void rxDone();
typedef EventWrapper<RxLink, &RxLink::rxDone> DoneEvent;
friend void DoneEvent::process();
- DoneEvent doneEvent;
+ DoneEvent _doneEvent;
public:
- RxLink(const std::string &name, MultiEtherLink *p,
+ RxLink(const std::string &name, DistEtherLink *p,
Tick delay, EtherDump *d) :
- Link(name, p, d, &doneEvent),
- linkDelay(delay), doneEvent(this) {}
+ Link(name, p, d, &_doneEvent),
+ linkDelay(delay), _doneEvent(this) {}
~RxLink() {}
/**
- * Register our multi interface to talk to the peer gem5 processes.
+ * Register our dist interface to talk to the peer gem5 processes.
+ */
+ void setDistInt(DistIface *m);
+ /**
+ * Done events will be scheduled by DistIface (so we need the accessor)
*/
- void setMultiInt(MultiIface *m);
+ const DoneEvent *doneEvent() const { return &_doneEvent; }
};
/**
@@ -187,7 +191,7 @@ class MultiEtherLink : public EtherObject
public:
LocalIface(const std::string &name, TxLink *tx, RxLink *rx,
- MultiIface *m);
+ DistIface *m);
bool recvPacket(EthPacketPtr pkt) { return txLink->transmit(pkt); }
void sendDone() { peer->sendDone(); }
@@ -199,7 +203,7 @@ class MultiEtherLink : public EtherObject
/**
* Interface to talk to the peer gem5 processes.
*/
- MultiIface *multiIface;
+ DistIface *distIface;
/**
* Send link
*/
@@ -210,10 +214,12 @@ class MultiEtherLink : public EtherObject
RxLink *rxLink;
LocalIface *localIface;
+ Tick linkDelay;
+
public:
- typedef MultiEtherLinkParams Params;
- MultiEtherLink(const Params *p);
- ~MultiEtherLink();
+ typedef DistEtherLinkParams Params;
+ DistEtherLink(const Params *p);
+ ~DistEtherLink();
const Params *
params() const
@@ -224,12 +230,11 @@ class MultiEtherLink : public EtherObject
virtual EtherInt *getEthPort(const std::string &if_name,
int idx) override;
- void memWriteback() override;
- void init() override;
- void startup() override;
+ virtual void init() override;
+ virtual void startup() override;
void serialize(CheckpointOut &cp) const override;
void unserialize(CheckpointIn &cp) override;
};
-#endif // __DEV_NET_MULTIETHERLINK_HH__
+#endif // __DEV_DIST_ETHERLINK_HH__
diff --git a/src/dev/net/dist_iface.cc b/src/dev/net/dist_iface.cc
new file mode 100644
index 000000000..45ce651a9
--- /dev/null
+++ b/src/dev/net/dist_iface.cc
@@ -0,0 +1,808 @@
+/*
+ * Copyright (c) 2015 ARM Limited
+ * All rights reserved
+ *
+ * The license below extends only to copyright in the software and shall
+ * not be construed as granting a license to any other intellectual
+ * property including but not limited to intellectual property relating
+ * to a hardware implementation of the functionality of the software
+ * licensed hereunder. You may use the software subject to the license
+ * terms below provided that you ensure that this notice is replicated
+ * unmodified and in its entirety in all distributions of the software,
+ * modified or unmodified, in source code or in binary form.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met: redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer;
+ * redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution;
+ * neither the name of the copyright holders nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Authors: Gabor Dozsa
+ */
+
+/* @file
+ * The interface class for dist-gem5 simulations.
+ */
+
+#include "dev/net/dist_iface.hh"
+
+#include <queue>
+#include <thread>
+
+#include "base/random.hh"
+#include "base/trace.hh"
+#include "debug/DistEthernet.hh"
+#include "debug/DistEthernetPkt.hh"
+#include "dev/net/etherpkt.hh"
+#include "sim/sim_exit.hh"
+#include "sim/sim_object.hh"
+
+using namespace std;
+DistIface::Sync *DistIface::sync = nullptr;
+DistIface::SyncEvent *DistIface::syncEvent = nullptr;
+unsigned DistIface::distIfaceNum = 0;
+unsigned DistIface::recvThreadsNum = 0;
+DistIface *DistIface::master = nullptr;
+
+void
+DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
+{
+ if (start_tick < firstAt) {
+ firstAt = start_tick;
+ inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
+ }
+
+ if (repeat_tick == 0)
+ panic("Dist synchronisation interval must be greater than zero");
+
+ if (repeat_tick < nextRepeat) {
+ nextRepeat = repeat_tick;
+ inform("Dist synchronisation interval is changed to %lu.\n",
+ nextRepeat);
+ }
+}
+
+DistIface::SyncSwitch::SyncSwitch(int num_nodes)
+{
+ numNodes = num_nodes;
+ waitNum = num_nodes;
+ numExitReq = 0;
+ numCkptReq = 0;
+ doExit = false;
+ doCkpt = false;
+ firstAt = std::numeric_limits<Tick>::max();
+ nextAt = 0;
+ nextRepeat = std::numeric_limits<Tick>::max();
+}
+
+DistIface::SyncNode::SyncNode()
+{
+ waitNum = 0;
+ needExit = ReqType::none;
+ needCkpt = ReqType::none;
+ doExit = false;
+ doCkpt = false;
+ firstAt = std::numeric_limits<Tick>::max();
+ nextAt = 0;
+ nextRepeat = std::numeric_limits<Tick>::max();
+}
+
+void
+DistIface::SyncNode::run(bool same_tick)
+{
+ std::unique_lock<std::mutex> sync_lock(lock);
+ Header header;
+
+ assert(waitNum == 0);
+ waitNum = DistIface::recvThreadsNum;
+ // initiate the global synchronisation
+ header.msgType = MsgType::cmdSyncReq;
+ header.sendTick = curTick();
+ header.syncRepeat = nextRepeat;
+ header.needCkpt = needCkpt;
+ if (needCkpt != ReqType::none)
+ needCkpt = ReqType::pending;
+ header.needExit = needExit;
+ if (needExit != ReqType::none)
+ needExit = ReqType::pending;
+ DistIface::master->sendCmd(header);
+ // now wait until all receiver threads complete the synchronisation
+ auto lf = [this]{ return waitNum == 0; };
+ cv.wait(sync_lock, lf);
+ // global synchronisation is done
+ assert(!same_tick || (nextAt == curTick()));
+}
+
+
+void
+DistIface::SyncSwitch::run(bool same_tick)
+{
+ std::unique_lock<std::mutex> sync_lock(lock);
+ Header header;
+ // Wait for the sync requests from the nodes
+ if (waitNum > 0) {
+ auto lf = [this]{ return waitNum == 0; };
+ cv.wait(sync_lock, lf);
+ }
+ assert(waitNum == 0);
+ assert(!same_tick || (nextAt == curTick()));
+ waitNum = numNodes;
+ // Complete the global synchronisation
+ header.msgType = MsgType::cmdSyncAck;
+ header.sendTick = nextAt;
+ header.syncRepeat = nextRepeat;
+ if (doCkpt || numCkptReq == numNodes) {
+ doCkpt = true;
+ header.needCkpt = ReqType::immediate;
+ numCkptReq = 0;
+ } else {
+ header.needCkpt = ReqType::none;
+ }
+ if (doExit || numExitReq == numNodes) {
+ doExit = true;
+ header.needExit = ReqType::immediate;
+ } else {
+ header.needExit = ReqType::none;
+ }
+ DistIface::master->sendCmd(header);
+}
+
+void
+DistIface::SyncSwitch::progress(Tick send_tick,
+ Tick sync_repeat,
+ ReqType need_ckpt,
+ ReqType need_exit)
+{
+ std::unique_lock<std::mutex> sync_lock(lock);
+ assert(waitNum > 0);
+
+ if (send_tick > nextAt)
+ nextAt = send_tick;
+ if (nextRepeat > sync_repeat)
+ nextRepeat = sync_repeat;
+
+ if (need_ckpt == ReqType::collective)
+ numCkptReq++;
+ else if (need_ckpt == ReqType::immediate)
+ doCkpt = true;
+ if (need_exit == ReqType::collective)
+ numExitReq++;
+ else if (need_exit == ReqType::immediate)
+ doExit = true;
+
+ waitNum--;
+ // Notify the simulation thread if the on-going sync is complete
+ if (waitNum == 0) {
+ sync_lock.unlock();
+ cv.notify_one();
+ }
+}
+
+void
+DistIface::SyncNode::progress(Tick max_send_tick,
+ Tick next_repeat,
+ ReqType do_ckpt,
+ ReqType do_exit)
+{
+ std::unique_lock<std::mutex> sync_lock(lock);
+ assert(waitNum > 0);
+
+ nextAt = max_send_tick;
+ nextRepeat = next_repeat;
+ doCkpt = (do_ckpt != ReqType::none);
+ doExit = (do_exit != ReqType::none);
+
+ waitNum--;
+ // Notify the simulation thread if the on-going sync is complete
+ if (waitNum == 0) {
+ sync_lock.unlock();
+ cv.notify_one();
+ }
+}
+
+void
+DistIface::SyncNode::requestCkpt(ReqType req)
+{
+ std::lock_guard<std::mutex> sync_lock(lock);
+ assert(req != ReqType::none);
+ if (needCkpt != ReqType::none)
+ warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
+ if (needCkpt == ReqType::none || req == ReqType::immediate)
+ needCkpt = req;
+}
+
+void
+DistIface::SyncNode::requestExit(ReqType req)
+{
+ std::lock_guard<std::mutex> sync_lock(lock);
+ assert(req != ReqType::none);
+ if (needExit != ReqType::none)
+ warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
+ if (needExit == ReqType::none || req == ReqType::immediate)
+ needExit = req;
+}
+
+void
+DistIface::Sync::drainComplete()
+{
+ if (doCkpt) {
+ // The first DistIface object called this right before writing the
+ // checkpoint. We need to drain the underlying physical network here.
+ // Note that other gem5 peers may enter this barrier at different
+ // ticks due to draining.
+ run(false);
+ // Only the "first" DistIface object has to perform the sync
+ doCkpt = false;
+ }
+}
+
+void
+DistIface::SyncNode::serialize(CheckpointOut &cp) const
+{
+ int need_exit = static_cast<int>(needExit);
+ SERIALIZE_SCALAR(need_exit);
+}
+
+void
+DistIface::SyncNode::unserialize(CheckpointIn &cp)
+{
+ int need_exit;
+ UNSERIALIZE_SCALAR(need_exit);
+ needExit = static_cast<ReqType>(need_exit);
+}
+
+void
+DistIface::SyncSwitch::serialize(CheckpointOut &cp) const
+{
+ SERIALIZE_SCALAR(numExitReq);
+}
+
+void
+DistIface::SyncSwitch::unserialize(CheckpointIn &cp)
+{
+ UNSERIALIZE_SCALAR(numExitReq);
+}
+
+void
+DistIface::SyncEvent::start()
+{
+ // Note that this may be called either from startup() or drainResume()
+
+ // At this point, all DistIface objects has already called Sync::init() so
+ // we have a local minimum of the start tick and repeat for the periodic
+ // sync.
+ Tick firstAt = DistIface::sync->firstAt;
+ repeat = DistIface::sync->nextRepeat;
+ // Do a global barrier to agree on a common repeat value (the smallest
+ // one from all participating nodes
+ DistIface::sync->run(curTick() == 0);
+
+ assert(!DistIface::sync->doCkpt);
+ assert(!DistIface::sync->doExit);
+ assert(DistIface::sync->nextAt >= curTick());
+ assert(DistIface::sync->nextRepeat <= repeat);
+
+ // if this is called at tick 0 then we use the config start param otherwise
+ // the maximum of the current tick of all participating nodes
+ if (curTick() == 0) {
+ assert(!scheduled());
+ assert(DistIface::sync->nextAt == 0);
+ schedule(firstAt);
+ } else {
+ if (scheduled())
+ reschedule(DistIface::sync->nextAt);
+ else
+ schedule(DistIface::sync->nextAt);
+ }
+ inform("Dist sync scheduled at %lu and repeats %lu\n", when(),
+ DistIface::sync->nextRepeat);
+}
+
+void
+DistIface::SyncEvent::process()
+{
+ // We may not start a global periodic sync while draining before taking a
+ // checkpoint. This is due to the possibility that peer gem5 processes
+ // may not hit the same periodic sync before they complete draining and
+ // that would make this periodic sync clash with sync called from
+ // DistIface::serialize() by other gem5 processes.
+ // We would need a 'distributed drain' solution to eliminate this
+ // restriction.
+ // Note that if draining was not triggered by checkpointing then we are
+ // fine since no extra global sync will happen (i.e. all peer gem5 will
+ // hit this periodic sync eventually).
+ panic_if(_draining && DistIface::sync->doCkpt,
+ "Distributed sync is hit while draining");
+ /*
+ * Note that this is a global event so this process method will be called
+ * by only exactly one thread.
+ */
+ /*
+ * We hold the eventq lock at this point but the receiver thread may
+ * need the lock to schedule new recv events while waiting for the
+ * dist sync to complete.
+ * Note that the other simulation threads also release their eventq
+ * locks while waiting for us due to the global event semantics.
+ */
+ {
+ EventQueue::ScopedRelease sr(curEventQueue());
+ // we do a global sync here that is supposed to happen at the same
+ // tick in all gem5 peers
+ DistIface::sync->run(true);
+ // global sync completed
+ }
+ if (DistIface::sync->doCkpt)
+ exitSimLoop("checkpoint");
+ if (DistIface::sync->doExit)
+ exitSimLoop("exit request from gem5 peers");
+
+ // schedule the next periodic sync
+ repeat = DistIface::sync->nextRepeat;
+ schedule(curTick() + repeat);
+}
+
+void
+DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay)
+{
+ // This is called from the receiver thread when it starts running. The new
+ // receiver thread shares the event queue with the simulation thread
+ // (associated with the simulated Ethernet link).
+ curEventQueue(eventManager->eventQueue());
+
+ recvDone = recv_done;
+ linkDelay = link_delay;
+}
+
+Tick
+DistIface::RecvScheduler::calcReceiveTick(Tick send_tick,
+ Tick send_delay,
+ Tick prev_recv_tick)
+{
+ Tick recv_tick = send_tick + send_delay + linkDelay;
+ // sanity check (we need atleast a send delay long window)
+ assert(recv_tick >= prev_recv_tick + send_delay);
+ panic_if(prev_recv_tick + send_delay > recv_tick,
+ "Receive window is smaller than send delay");
+ panic_if(recv_tick <= curTick(),
+ "Simulators out of sync - missed packet receive by %llu ticks"
+ "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
+ "linkDelay: %lu )",
+ curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
+ linkDelay);
+
+ return recv_tick;
+}
+
+void
+DistIface::RecvScheduler::resumeRecvTicks()
+{
+ // Schedule pending packets asap in case link speed/delay changed when
+ // restoring from the checkpoint.
+ // This may be done during unserialize except that curTick() is unknown
+ // so we call this during drainResume().
+ // If we are not restoring from a checkppint then link latency could not
+ // change so we just return.
+ if (!ckptRestore)
+ return;
+
+ std::vector<Desc> v;
+ while (!descQueue.empty()) {
+ Desc d = descQueue.front();
+ descQueue.pop();
+ d.sendTick = curTick();
+ d.sendDelay = d.packet->size(); // assume 1 tick/byte max link speed
+ v.push_back(d);
+ }
+
+ for (auto &d : v)
+ descQueue.push(d);
+
+ if (recvDone->scheduled()) {
+ assert(!descQueue.empty());
+ eventManager->reschedule(recvDone, curTick());
+ } else {
+ assert(descQueue.empty() && v.empty());
+ }
+ ckptRestore = false;
+}
+
+void
+DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet,
+ Tick send_tick,
+ Tick send_delay)
+{
+ // Note : this is called from the receiver thread
+ curEventQueue()->lock();
+ Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
+
+ DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
+ "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
+ send_tick, send_delay, linkDelay, recv_tick);
+ // Every packet must be sent and arrive in the same quantum
+ assert(send_tick > master->syncEvent->when() -
+ master->syncEvent->repeat);
+ // No packet may be scheduled for receive in the arrival quantum
+ assert(send_tick + send_delay + linkDelay > master->syncEvent->when());
+
+ // Now we are about to schedule a recvDone event for the new data packet.
+ // We use the same recvDone object for all incoming data packets. Packet
+ // descriptors are saved in the ordered queue. The currently scheduled
+ // packet is always on the top of the queue.
+ // NOTE: we use the event queue lock to protect the receive desc queue,
+ // too, which is accessed both by the receiver thread and the simulation
+ // thread.
+ descQueue.emplace(new_packet, send_tick, send_delay);
+ if (descQueue.size() == 1) {
+ assert(!recvDone->scheduled());
+ eventManager->schedule(recvDone, recv_tick);
+ } else {
+ assert(recvDone->scheduled());
+ panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
+ "Out of order packet received (recv_tick: %lu top(): %lu\n",
+ recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
+ }
+ curEventQueue()->unlock();
+}
+
+EthPacketPtr
+DistIface::RecvScheduler::popPacket()
+{
+ // Note : this is called from the simulation thread when a receive done
+ // event is being processed for the link. We assume that the thread holds
+ // the event queue queue lock when this is called!
+ EthPacketPtr next_packet = descQueue.front().packet;
+ descQueue.pop();
+
+ if (descQueue.size() > 0) {
+ Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
+ descQueue.front().sendDelay,
+ curTick());
+ eventManager->schedule(recvDone, recv_tick);
+ }
+ prevRecvTick = curTick();
+ return next_packet;
+}
+
+void
+DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const
+{
+ SERIALIZE_SCALAR(sendTick);
+ SERIALIZE_SCALAR(sendDelay);
+ packet->serialize("rxPacket", cp);
+}
+
+void
+DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp)
+{
+ UNSERIALIZE_SCALAR(sendTick);
+ UNSERIALIZE_SCALAR(sendDelay);
+ packet = std::make_shared<EthPacketData>(16384);
+ packet->unserialize("rxPacket", cp);
+}
+
+void
+DistIface::RecvScheduler::serialize(CheckpointOut &cp) const
+{
+ SERIALIZE_SCALAR(prevRecvTick);
+ // serialize the receive desc queue
+ std::queue<Desc> tmp_queue(descQueue);
+ unsigned n_desc_queue = descQueue.size();
+ assert(tmp_queue.size() == descQueue.size());
+ SERIALIZE_SCALAR(n_desc_queue);
+ for (int i = 0; i < n_desc_queue; i++) {
+ tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
+ tmp_queue.pop();
+ }
+ assert(tmp_queue.empty());
+}
+
+void
+DistIface::RecvScheduler::unserialize(CheckpointIn &cp)
+{
+ assert(descQueue.size() == 0);
+ assert(recvDone->scheduled() == false);
+ assert(ckptRestore == false);
+
+ UNSERIALIZE_SCALAR(prevRecvTick);
+ // unserialize the receive desc queue
+ unsigned n_desc_queue;
+ UNSERIALIZE_SCALAR(n_desc_queue);
+ for (int i = 0; i < n_desc_queue; i++) {
+ Desc recv_desc;
+ recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
+ descQueue.push(recv_desc);
+ }
+ ckptRestore = true;
+}
+
+DistIface::DistIface(unsigned dist_rank,
+ unsigned dist_size,
+ Tick sync_start,
+ Tick sync_repeat,
+ EventManager *em,
+ bool is_switch, int num_nodes) :
+ syncStart(sync_start), syncRepeat(sync_repeat),
+ recvThread(nullptr), recvScheduler(em),
+ rank(dist_rank), size(dist_size)
+{
+ DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
+ isMaster = false;
+ if (master == nullptr) {
+ assert(sync == nullptr);
+ assert(syncEvent == nullptr);
+ if (is_switch)
+ sync = new SyncSwitch(num_nodes);
+ else
+ sync = new SyncNode();
+ syncEvent = new SyncEvent();
+ master = this;
+ isMaster = true;
+ }
+ distIfaceId = distIfaceNum;
+ distIfaceNum++;
+}
+
+DistIface::~DistIface()
+{
+ assert(recvThread);
+ delete recvThread;
+ if (this == master) {
+ assert(syncEvent);
+ delete syncEvent;
+ assert(sync);
+ delete sync;
+ master = nullptr;
+ }
+}
+
+void
+DistIface::packetOut(EthPacketPtr pkt, Tick send_delay)
+{
+ Header header;
+
+ // Prepare a dist header packet for the Ethernet packet we want to
+ // send out.
+ header.msgType = MsgType::dataDescriptor;
+ header.sendTick = curTick();
+ header.sendDelay = send_delay;
+
+ header.dataPacketLength = pkt->size();
+
+ // Send out the packet and the meta info.
+ sendPacket(header, pkt);
+
+ DPRINTF(DistEthernetPkt,
+ "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
+ pkt->size(), send_delay);
+}
+
+void
+DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
+{
+ EthPacketPtr new_packet;
+ DistHeaderPkt::Header header;
+
+ // Initialize receive scheduler parameters
+ recvScheduler.init(recv_done, link_delay);
+
+ // Main loop to wait for and process any incoming message.
+ for (;;) {
+ // recvHeader() blocks until the next dist header packet comes in.
+ if (!recvHeader(header)) {
+ // We lost connection to the peer gem5 processes most likely
+ // because one of them called m5 exit. So we stop here.
+ // Grab the eventq lock to stop the simulation thread
+ curEventQueue()->lock();
+ exit_message("info",
+ 0,
+ "Message server closed connection, "
+ "simulation is exiting");
+ }
+
+ // We got a valid dist header packet, let's process it
+ if (header.msgType == MsgType::dataDescriptor) {
+ recvPacket(header, new_packet);
+ recvScheduler.pushPacket(new_packet,
+ header.sendTick,
+ header.sendDelay);
+ } else {
+ // everything else must be synchronisation related command
+ sync->progress(header.sendTick,
+ header.syncRepeat,
+ header.needCkpt,
+ header.needExit);
+ }
+ }
+}
+
+void
+DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
+{
+ assert(recvThread == nullptr);
+
+ recvThread = new std::thread(&DistIface::recvThreadFunc,
+ this,
+ const_cast<Event *>(recv_done),
+ link_delay);
+ recvThreadsNum++;
+}
+
+DrainState
+DistIface::drain()
+{
+ DPRINTF(DistEthernet,"DistIFace::drain() called\n");
+ // This can be called multiple times in the same drain cycle.
+ if (this == master)
+ syncEvent->draining(true);
+ return DrainState::Drained;
+}
+
+void
+DistIface::drainResume() {
+ DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
+ if (this == master)
+ syncEvent->draining(false);
+ recvScheduler.resumeRecvTicks();
+}
+
+void
+DistIface::serialize(CheckpointOut &cp) const
+{
+ // Drain the dist interface before the checkpoint is taken. We cannot call
+ // this as part of the normal drain cycle because this dist sync has to be
+ // called exactly once after the system is fully drained.
+ sync->drainComplete();
+
+ unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
+
+ SERIALIZE_SCALAR(rank_orig);
+ SERIALIZE_SCALAR(dist_iface_id_orig);
+
+ recvScheduler.serializeSection(cp, "recvScheduler");
+ if (this == master) {
+ sync->serializeSection(cp, "Sync");
+ }
+}
+
+void
+DistIface::unserialize(CheckpointIn &cp)
+{
+ unsigned rank_orig, dist_iface_id_orig;
+ UNSERIALIZE_SCALAR(rank_orig);
+ UNSERIALIZE_SCALAR(dist_iface_id_orig);
+
+ panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
+ rank, rank_orig);
+ panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
+ "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
+ dist_iface_id_orig);
+
+ recvScheduler.unserializeSection(cp, "recvScheduler");
+ if (this == master) {
+ sync->unserializeSection(cp, "Sync");
+ }
+}
+
+void
+DistIface::init(const Event *done_event, Tick link_delay)
+{
+ // Init hook for the underlaying message transport to setup/finalize
+ // communication channels
+ initTransport();
+
+ // Spawn a new receiver thread that will process messages
+ // coming in from peer gem5 processes.
+ // The receive thread will also schedule a (receive) doneEvent
+ // for each incoming data packet.
+ spawnRecvThread(done_event, link_delay);
+
+
+ // Adjust the periodic sync start and interval. Different DistIface
+ // might have different requirements. The singleton sync object
+ // will select the minimum values for both params.
+ assert(sync != nullptr);
+ sync->init(syncStart, syncRepeat);
+
+ // Initialize the seed for random generator to avoid the same sequence
+ // in all gem5 peer processes
+ assert(master != nullptr);
+ if (this == master)
+ random_mt.init(5489 * (rank+1) + 257);
+}
+
+void
+DistIface::startup()
+{
+ DPRINTF(DistEthernet, "DistIface::startup() started\n");
+ if (this == master)
+ syncEvent->start();
+ DPRINTF(DistEthernet, "DistIface::startup() done\n");
+}
+
+bool
+DistIface::readyToCkpt(Tick delay, Tick period)
+{
+ bool ret = true;
+ DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
+ "period:%lu\n", delay, period);
+ if (master) {
+ if (delay == 0) {
+ inform("m5 checkpoint called with zero delay => triggering collaborative "
+ "checkpoint\n");
+ sync->requestCkpt(ReqType::collective);
+ } else {
+ inform("m5 checkpoint called with non-zero delay => triggering immediate "
+ "checkpoint (at the next sync)\n");
+ sync->requestCkpt(ReqType::immediate);
+ }
+ if (period != 0)
+ inform("Non-zero period for m5_ckpt is ignored in "
+ "distributed gem5 runs\n");
+ ret = false;
+ }
+ return ret;
+}
+
+bool
+DistIface::readyToExit(Tick delay)
+{
+ bool ret = true;
+ DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
+ delay);
+ if (master) {
+ if (delay == 0) {
+ inform("m5 exit called with zero delay => triggering collaborative "
+ "exit\n");
+ sync->requestExit(ReqType::collective);
+ } else {
+ inform("m5 exit called with non-zero delay => triggering immediate "
+ "exit (at the next sync)\n");
+ sync->requestExit(ReqType::immediate);
+ }
+ ret = false;
+ }
+ return ret;
+}
+
+uint64_t
+DistIface::rankParam()
+{
+ uint64_t val;
+ if (master) {
+ val = master->rank;
+ } else {
+ warn("Dist-rank parameter is queried in single gem5 simulation.");
+ val = 0;
+ }
+ return val;
+}
+
+uint64_t
+DistIface::sizeParam()
+{
+ uint64_t val;
+ if (master) {
+ val = master->size;
+ } else {
+ warn("Dist-size parameter is queried in single gem5 simulation.");
+ val = 1;
+ }
+ return val;
+}
diff --git a/src/dev/net/dist_iface.hh b/src/dev/net/dist_iface.hh
new file mode 100644
index 000000000..e69211fb8
--- /dev/null
+++ b/src/dev/net/dist_iface.hh
@@ -0,0 +1,595 @@
+/*
+ * Copyright (c) 2015 ARM Limited
+ * All rights reserved
+ *
+ * The license below extends only to copyright in the software and shall
+ * not be construed as granting a license to any other intellectual
+ * property including but not limited to intellectual property relating
+ * to a hardware implementation of the functionality of the software
+ * licensed hereunder. You may use the software subject to the license
+ * terms below provided that you ensure that this notice is replicated
+ * unmodified and in its entirety in all distributions of the software,
+ * modified or unmodified, in source code or in binary form.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met: redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer;
+ * redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution;
+ * neither the name of the copyright holders nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Authors: Gabor Dozsa
+ */
+
+/* @file
+ * The interface class for dist gem5 simulations.
+ *
+ * dist-gem5 is an extension to gem5 to enable parallel simulation of a
+ * distributed system (e.g. simulation of a pool of machines
+ * connected by Ethernet links). A dist gem5 run consists of seperate gem5
+ * processes running in parallel. Each gem5 process executes
+ * the simulation of a component of the simulated distributed system.
+ * (An example component can be a dist-core board with an Ethernet NIC.)
+ * The DistIface class below provides services to transfer data and
+ * control messages among the gem5 processes. The main such services are
+ * as follows.
+ *
+ * 1. Send a data packet coming from a simulated Ethernet link. The packet
+ * will be transferred to (all) the target(s) gem5 processes. The send
+ * operation is always performed by the simulation thread, i.e. the gem5
+ * thread that is processing the event queue associated with the simulated
+ * Ethernet link.
+ *
+ * 2. Spawn a receiver thread to process messages coming in from the
+ * from other gem5 processes. Each simulated Ethernet link has its own
+ * associated receiver thread. The receiver thread saves the incoming packet
+ * and schedule an appropriate receive event in the event queue.
+ *
+ * 3. Schedule a global barrier event periodically to keep the gem5
+ * processes in sync.
+ * Periodic barrier event to keep peer gem5 processes in sync. The basic idea
+ * is that no gem5 process can go ahead further than the simulated link
+ * transmission delay to ensure that a corresponding receive event can always
+ * be scheduled for any message coming in from a peer gem5 process.
+ *
+ *
+ *
+ * This interface is an abstract class. It can work with various low level
+ * send/receive service implementations (e.g. TCP/IP, MPI,...). A TCP
+ * stream socket version is implemented in src/dev/net/tcp_iface.[hh,cc].
+ */
+#ifndef __DEV_DIST_IFACE_HH__
+#define __DEV_DIST_IFACE_HH__
+
+#include <array>
+#include <mutex>
+#include <queue>
+#include <thread>
+#include <utility>
+
+#include "dev/net/dist_packet.hh"
+#include "dev/net/etherpkt.hh"
+#include "sim/core.hh"
+#include "sim/drain.hh"
+#include "sim/global_event.hh"
+#include "sim/serialize.hh"
+
+class EventManager;
+
+/**
+ * The interface class to talk to peer gem5 processes.
+ */
+class DistIface : public Drainable, public Serializable
+{
+ public:
+ typedef DistHeaderPkt::Header Header;
+
+ protected:
+ typedef DistHeaderPkt::MsgType MsgType;
+ typedef DistHeaderPkt::ReqType ReqType;
+
+ private:
+ class SyncEvent;
+ /** @class Sync
+ * This class implements global sync operations among gem5 peer processes.
+ *
+ * @note This class is used as a singleton object (shared by all DistIface
+ * objects).
+ */
+ class Sync : public Serializable
+ {
+ protected:
+ /**
+ * The lock to protect access to the Sync object.
+ */
+ std::mutex lock;
+ /**
+ * Condition variable for the simulation thread to wait on
+ * until all receiver threads completes the current global
+ * synchronisation.
+ */
+ std::condition_variable cv;
+ /**
+ * Number of receiver threads that not yet completed the current global
+ * synchronisation.
+ */
+ unsigned waitNum;
+ /**
+ * Flag is set if exit is permitted upon sync completion
+ */
+ bool doExit;
+ /**
+ * Flag is set if taking a ckpt is permitted upon sync completion
+ */
+ bool doCkpt;
+ /**
+ * The repeat value for the next periodic sync
+ */
+ Tick nextRepeat;
+ /**
+ * Tick for the very first periodic sync
+ */
+ Tick firstAt;
+ /**
+ * Tick for the next periodic sync (if the event is not scheduled yet)
+ */
+ Tick nextAt;
+
+ friend class SyncEvent;
+
+ public:
+ /**
+ * Initialize periodic sync params.
+ *
+ * @param start Start tick for dist synchronisation
+ * @param repeat Frequency of dist synchronisation
+ *
+ */
+ void init(Tick start, Tick repeat);
+ /**
+ * Core method to perform a full dist sync.
+ */
+ virtual void run(bool same_tick) = 0;
+ /**
+ * Callback when the receiver thread gets a sync ack message.
+ */
+ virtual void progress(Tick send_tick,
+ Tick next_repeat,
+ ReqType do_ckpt,
+ ReqType do_exit) = 0;
+
+ virtual void requestCkpt(ReqType req) = 0;
+ virtual void requestExit(ReqType req) = 0;
+
+ void drainComplete();
+
+ virtual void serialize(CheckpointOut &cp) const override = 0;
+ virtual void unserialize(CheckpointIn &cp) override = 0;
+ };
+
+ class SyncNode: public Sync
+ {
+ private:
+ /**
+ * Exit requested
+ */
+ ReqType needExit;
+ /**
+ * Ckpt requested
+ */
+ ReqType needCkpt;
+
+ public:
+
+ SyncNode();
+ ~SyncNode() {}
+ void run(bool same_tick) override;
+ void progress(Tick max_req_tick,
+ Tick next_repeat,
+ ReqType do_ckpt,
+ ReqType do_exit) override;
+
+ void requestCkpt(ReqType req) override;
+ void requestExit(ReqType req) override;
+
+ void serialize(CheckpointOut &cp) const override;
+ void unserialize(CheckpointIn &cp) override;
+ };
+
+ class SyncSwitch: public Sync
+ {
+ private:
+ /**
+ * Counter for recording exit requests
+ */
+ unsigned numExitReq;
+ /**
+ * Counter for recording ckpt requests
+ */
+ unsigned numCkptReq;
+ /**
+ * Number of connected simulated nodes
+ */
+ unsigned numNodes;
+
+ public:
+ SyncSwitch(int num_nodes);
+ ~SyncSwitch() {}
+
+ void run(bool same_tick) override;
+ void progress(Tick max_req_tick,
+ Tick next_repeat,
+ ReqType do_ckpt,
+ ReqType do_exit) override;
+
+ void requestCkpt(ReqType) override {
+ panic("Switch requested checkpoint");
+ }
+ void requestExit(ReqType) override {
+ panic("Switch requested exit");
+ }
+
+ void serialize(CheckpointOut &cp) const override;
+ void unserialize(CheckpointIn &cp) override;
+ };
+
+ /**
+ * The global event to schedule periodic dist sync. It is used as a
+ * singleton object.
+ *
+ * The periodic synchronisation works as follows.
+ * 1. A SyncEvent is scheduled as a global event when startup() is
+ * called.
+ * 2. The process() method of the SyncEvent initiates a new barrier
+ * for each simulated Ethernet link.
+ * 3. Simulation thread(s) then waits until all receiver threads
+ * complete the ongoing barrier. The global sync event is done.
+ */
+ class SyncEvent : public GlobalSyncEvent
+ {
+ private:
+ /**
+ * Flag to set when the system is draining
+ */
+ bool _draining;
+ public:
+ /**
+ * Only the firstly instantiated DistIface object will
+ * call this constructor.
+ */
+ SyncEvent() : GlobalSyncEvent(Sim_Exit_Pri, 0), _draining(false) {}
+
+ ~SyncEvent() {}
+ /**
+ * Schedule the first periodic sync event.
+ */
+ void start();
+ /**
+ * This is a global event so process() will only be called by
+ * exactly one simulation thread. (See further comments in the .cc
+ * file.)
+ */
+ void process() override;
+
+ bool draining() const { return _draining; }
+ void draining(bool fl) { _draining = fl; }
+ };
+ /**
+ * Class to encapsulate information about data packets received.
+
+ * @note The main purpose of the class to take care of scheduling receive
+ * done events for the simulated network link and store incoming packets
+ * until they can be received by the simulated network link.
+ */
+ class RecvScheduler : public Serializable
+ {
+ private:
+ /**
+ * Received packet descriptor. This information is used by the receive
+ * thread to schedule receive events and by the simulation thread to
+ * process those events.
+ */
+ struct Desc : public Serializable
+ {
+ EthPacketPtr packet;
+ Tick sendTick;
+ Tick sendDelay;
+
+ Desc() : sendTick(0), sendDelay(0) {}
+ Desc(EthPacketPtr p, Tick s, Tick d) :
+ packet(p), sendTick(s), sendDelay(d) {}
+ Desc(const Desc &d) :
+ packet(d.packet), sendTick(d.sendTick), sendDelay(d.sendDelay) {}
+
+ void serialize(CheckpointOut &cp) const override;
+ void unserialize(CheckpointIn &cp) override;
+ };
+ /**
+ * The queue to store the receive descriptors.
+ */
+ std::queue<Desc> descQueue;
+ /**
+ * The tick when the most recent receive event was processed.
+ *
+ * @note This information is necessary to simulate possible receiver
+ * link contention when calculating the receive tick for the next
+ * incoming data packet (see the calcReceiveTick() method)
+ */
+ Tick prevRecvTick;
+ /**
+ * The receive done event for the simulated Ethernet link.
+ *
+ * @note This object is constructed by the simulated network link. We
+ * schedule this object for each incoming data packet.
+ */
+ Event *recvDone;
+ /**
+ * The link delay in ticks for the simulated Ethernet link.
+ *
+ * @note This value is used for calculating the receive ticks for
+ * incoming data packets.
+ */
+ Tick linkDelay;
+ /**
+ * The event manager associated with the simulated Ethernet link.
+ *
+ * @note It is used to access the event queue for scheduling receive
+ * done events for the link.
+ */
+ EventManager *eventManager;
+ /**
+ * Calculate the tick to schedule the next receive done event.
+ *
+ * @param send_tick The tick the packet was sent.
+ * @param send_delay The simulated delay at the sender side.
+ * @param prev_recv_tick Tick when the last receive event was
+ * processed.
+ *
+ * @note This method tries to take into account possible receiver link
+ * contention and adjust receive tick for the incoming packets
+ * accordingly.
+ */
+ Tick calcReceiveTick(Tick send_tick,
+ Tick send_delay,
+ Tick prev_recv_tick);
+
+ /**
+ * Flag to set if receive ticks for pending packets need to be
+ * recalculated due to changed link latencies at a resume
+ */
+ bool ckptRestore;
+
+ public:
+ /**
+ * Scheduler for the incoming data packets.
+ *
+ * @param em The event manager associated with the simulated Ethernet
+ * link.
+ */
+ RecvScheduler(EventManager *em) :
+ prevRecvTick(0), recvDone(nullptr), linkDelay(0),
+ eventManager(em), ckptRestore(false) {}
+
+ /**
+ * Initialize network link parameters.
+ *
+ * @note This method is called from the receiver thread (see
+ * recvThreadFunc()).
+ */
+ void init(Event *recv_done, Tick link_delay);
+ /**
+ * Fetch the next packet that is to be received by the simulated network
+ * link.
+ *
+ * @note This method is called from the process() method of the receive
+ * done event associated with the network link.
+ */
+ EthPacketPtr popPacket();
+ /**
+ * Push a newly arrived packet into the desc queue.
+ */
+ void pushPacket(EthPacketPtr new_packet,
+ Tick send_tick,
+ Tick send_delay);
+
+ void serialize(CheckpointOut &cp) const override;
+ void unserialize(CheckpointIn &cp) override;
+ /**
+ * Adjust receive ticks for pending packets when restoring from a
+ * checkpoint
+ *
+ * @note Link speed and delay parameters may change at resume.
+ */
+ void resumeRecvTicks();
+ };
+ /**
+ * Tick to schedule the first dist sync event.
+ * This is just as optimization : we do not need any dist sync
+ * event until the simulated NIC is brought up by the OS.
+ */
+ Tick syncStart;
+ /**
+ * Frequency of dist sync events in ticks.
+ */
+ Tick syncRepeat;
+ /**
+ * Receiver thread pointer.
+ * Each DistIface object must have exactly one receiver thread.
+ */
+ std::thread *recvThread;
+ /**
+ * Meta information about data packets received.
+ */
+ RecvScheduler recvScheduler;
+
+ protected:
+ /**
+ * The rank of this process among the gem5 peers.
+ */
+ unsigned rank;
+ /**
+ * The number of gem5 processes comprising this dist simulation.
+ */
+ unsigned size;
+ /**
+ * Number of DistIface objects (i.e. dist links in this gem5 process)
+ */
+ static unsigned distIfaceNum;
+ /**
+ * Unique id for the dist link
+ */
+ unsigned distIfaceId;
+
+ bool isMaster;
+
+ private:
+ /**
+ * Number of receiver threads (in this gem5 process)
+ */
+ static unsigned recvThreadsNum;
+ /**
+ * The singleton Sync object to perform dist synchronisation.
+ */
+ static Sync *sync;
+ /**
+ * The singleton SyncEvent object to schedule periodic dist sync.
+ */
+ static SyncEvent *syncEvent;
+ /**
+ * The very first DistIface object created becomes the master. We need
+ * a master to co-ordinate the global synchronisation.
+ */
+ static DistIface *master;
+
+ private:
+ /**
+ * Send out a data packet to the remote end.
+ * @param header Meta info about the packet (which needs to be transferred
+ * to the destination alongside the packet).
+ * @param packet Pointer to the packet to send.
+ */
+ virtual void sendPacket(const Header &header, const EthPacketPtr &packet) = 0;
+ /**
+ * Send out a control command to the remote end.
+ * @param header Meta info describing the command (e.g. sync request)
+ */
+ virtual void sendCmd(const Header &header) = 0;
+ /**
+ * Receive a header (i.e. meta info describing a data packet or a control command)
+ * from the remote end.
+ * @param header The meta info structure to store the incoming header.
+ */
+ virtual bool recvHeader(Header &header) = 0;
+ /**
+ * Receive a packet from the remote end.
+ * @param header Meta info about the incoming packet (obtanied by a previous
+ * call to the recvHedaer() method).
+ * @param Pointer to packet received.
+ */
+ virtual void recvPacket(const Header &header, EthPacketPtr &packet) = 0;
+ /**
+ * Init hook for the underlaying transport
+ */
+ virtual void initTransport() = 0;
+ /**
+ * spawn the receiver thread.
+ * @param recv_done The receive done event associated with the simulated
+ * Ethernet link.
+ * @param link_delay The link delay for the simulated Ethernet link.
+ */
+ void spawnRecvThread(const Event *recv_done, Tick link_delay);
+ /**
+ * The function executed by a receiver thread.
+ */
+ void recvThreadFunc(Event *recv_done, Tick link_delay);
+
+ public:
+
+ /**
+ * ctor
+ * @param dist_rank Rank of this gem5 process within the dist run
+ * @param sync_start Start tick for dist synchronisation
+ * @param sync_repeat Frequency for dist synchronisation
+ * @param em The event manager associated with the simulated Ethernet link
+ */
+ DistIface(unsigned dist_rank,
+ unsigned dist_size,
+ Tick sync_start,
+ Tick sync_repeat,
+ EventManager *em,
+ bool is_switch,
+ int num_nodes);
+
+ virtual ~DistIface();
+ /**
+ * Send out an Ethernet packet.
+ * @param pkt The Ethernet packet to send.
+ * @param send_delay The delay in ticks for the send completion event.
+ */
+ void packetOut(EthPacketPtr pkt, Tick send_delay);
+ /**
+ * Fetch the packet scheduled to be received next by the simulated
+ * network link.
+ *
+ * @note This method is called within the process() method of the link
+ * receive done event. It also schedules the next receive event if the
+ * receive queue is not empty.
+ */
+ EthPacketPtr packetIn() { return recvScheduler.popPacket(); }
+
+ DrainState drain() override;
+ void drainResume() override;
+ void init(const Event *e, Tick link_delay);
+ void startup();
+
+ void serialize(CheckpointOut &cp) const override;
+ void unserialize(CheckpointIn &cp) override;
+ /**
+ * Initiate the exit from the simulation.
+ * @param delay Delay param from the m5 exit command. If Delay is zero
+ * then a collaborative exit is requested (i.e. all nodes have to call
+ * this method before the distributed simulation can exit). If Delay is
+ * not zero then exit is requested asap (and it will happen at the next
+ * sync tick).
+ * @return False if we are in distributed mode (i.e. exit can happen only
+ * at sync), True otherwise.
+ */
+ static bool readyToExit(Tick delay);
+ /**
+ * Initiate taking a checkpoint
+ * @param delay Delay param from the m5 checkpoint command. If Delay is
+ * zero then a collaborative checkpoint is requested (i.e. all nodes have
+ * to call this method before the checkpoint can be taken). If Delay is
+ * not zero then a checkpoint is requested asap (and it will happen at the
+ * next sync tick).
+ * @return False if we are in dist mode (i.e. exit can happen only at
+ * sync), True otherwise.
+ */
+ static bool readyToCkpt(Tick delay, Tick period);
+ /**
+ * Getter for the dist rank param.
+ */
+ static uint64_t rankParam();
+ /**
+ * Getter for the dist size param.
+ */
+ static uint64_t sizeParam();
+ };
+
+#endif
diff --git a/src/dev/net/multi_packet.hh b/src/dev/net/dist_packet.hh
index 3d8e85dfa..4c079c44a 100644
--- a/src/dev/net/multi_packet.hh
+++ b/src/dev/net/dist_packet.hh
@@ -38,93 +38,69 @@
*/
/* @file
- * Header packet class for multi gem5 runs.
+ * Header packet class for dist-gem5 runs.
*
- * For a high level description about multi gem5 see comments in
- * header file multi_iface.hh.
+ * For a high level description about dist-gem5 see comments in
+ * header file dist_iface.hh.
*
- * The MultiHeaderPkt class defines the format of message headers
- * sent among gem5 processes during a multi gem5 simulation. A header packet
+ * The DistHeaderPkt class defines the format of message headers
+ * sent among gem5 processes during a dist gem5 simulation. A header packet
* can either carry the description of data packet (i.e. a simulated Ethernet
* packet) or a synchronisation related control command. In case of
* data packet description, the corresponding data packet always follows
* the header packet back-to-back.
*/
-#ifndef __DEV_NET_MULTI_PACKET_HH__
-#define __DEV_NET_MULTI_PACKET_HH__
+#ifndef __DEV_DIST_PACKET_HH__
+#define __DEV_DIST_PACKET_HH__
#include <cstring>
#include "base/types.hh"
-class MultiHeaderPkt
+class DistHeaderPkt
{
private:
- MultiHeaderPkt() {}
- ~MultiHeaderPkt() {}
+ DistHeaderPkt() {}
+ ~DistHeaderPkt() {}
public:
+ enum class ReqType { immediate, collective, pending, none };
/**
- * Simply type to help with calculating space requirements for
- * the corresponding header field.
- */
- typedef uint8_t AddressType[6];
-
- /**
- * The msg type defines what informarion a multi header packet carries.
+ * The msg type defines what information a dist header packet carries.
*/
enum class MsgType
{
dataDescriptor,
- cmdPeriodicSyncReq,
- cmdPeriodicSyncAck,
- cmdCkptSyncReq,
- cmdCkptSyncAck,
- cmdAtomicSyncReq,
- cmdAtomicSyncAck,
+ cmdSyncReq,
+ cmdSyncAck,
unknown
};
struct Header
{
/**
- * The msg type field is valid for all header packets. In case of
- * a synchronisation control command this is the only valid field.
+ * The msg type field is valid for all header packets.
+ *
+ * @note senderRank is used with data packets while collFlags are used
+ * by sync ack messages to trigger collective ckpt or exit events.
*/
MsgType msgType;
Tick sendTick;
- Tick sendDelay;
- /**
- * Actual length of the simulated Ethernet packet.
- */
- unsigned dataPacketLength;
- /**
- * Source MAC address.
- */
- AddressType srcAddress;
- /**
- * Destination MAC address.
- */
- AddressType dstAddress;
+ union {
+ Tick sendDelay;
+ Tick syncRepeat;
+ };
+ union {
+ /**
+ * Actual length of the simulated Ethernet packet.
+ */
+ unsigned dataPacketLength;
+ struct {
+ ReqType needCkpt;
+ ReqType needExit;
+ };
+ };
};
-
- static unsigned maxAddressLength();
-
- /**
- * Static functions for manipulating and comparing MAC addresses.
- */
- static void clearAddress(AddressType &addr);
- static bool isAddressEqual(const AddressType &addr1,
- const AddressType &addr2);
- static bool isAddressLess(const AddressType &addr1,
- const AddressType &addr2);
-
- static void copyAddress(AddressType &dest,
- const AddressType &src);
-
- static bool isUnicastAddress(const AddressType &addr);
- static bool isMulticastAddress(const AddressType &addr);
- static bool isBroadcastAddress(const AddressType &addr);
};
-#endif // __DEV_NET_MULTI_PACKET_HH__
+#endif
diff --git a/src/dev/net/etherpkt.cc b/src/dev/net/etherpkt.cc
index a16f572c5..f06af3306 100644
--- a/src/dev/net/etherpkt.cc
+++ b/src/dev/net/etherpkt.cc
@@ -53,19 +53,3 @@ EthPacketData::unserialize(const string &base, CheckpointIn &cp)
arrayParamIn(cp, base + ".data", data, length);
}
-void
-EthPacketData::packAddress(uint8_t *src_addr,
- uint8_t *dst_addr,
- unsigned &nbytes)
-{
- Net::EthHdr *hdr = (Net::EthHdr *)data;
- assert(hdr->src().size() == hdr->dst().size());
- if (nbytes < hdr->src().size())
- panic("EthPacketData::packAddress() Buffer overflow");
-
- memcpy(dst_addr, hdr->dst().bytes(), hdr->dst().size());
- memcpy(src_addr, hdr->src().bytes(), hdr->src().size());
-
- nbytes = hdr->src().size();
-}
-
diff --git a/src/dev/net/etherpkt.hh b/src/dev/net/etherpkt.hh
index 4119578c3..457563293 100644
--- a/src/dev/net/etherpkt.hh
+++ b/src/dev/net/etherpkt.hh
@@ -71,18 +71,6 @@ class EthPacketData
~EthPacketData() { if (data) delete [] data; }
public:
- /**
- * This function pulls out the MAC source and destination addresses from
- * the packet data and stores them in the caller specified buffers.
- *
- * @param src_addr The buffer to store the source MAC address.
- * @param dst_addr The buffer to store the destination MAC address.
- * @param length This is an inout parameter. The caller stores in this
- * the size of the address buffers. On return, this will contain the
- * actual address size stored in the buffers. (We assume that source
- * address size is equal to that of the destination address.)
- */
- void packAddress(uint8_t *src_addr, uint8_t *dst_addr, unsigned &length);
void serialize(const std::string &base, CheckpointOut &cp) const;
void unserialize(const std::string &base, CheckpointIn &cp);
diff --git a/src/dev/net/multi_iface.cc b/src/dev/net/multi_iface.cc
deleted file mode 100644
index 15f69f2ac..000000000
--- a/src/dev/net/multi_iface.cc
+++ /dev/null
@@ -1,622 +0,0 @@
-/*
- * Copyright (c) 2015 ARM Limited
- * All rights reserved
- *
- * The license below extends only to copyright in the software and shall
- * not be construed as granting a license to any other intellectual
- * property including but not limited to intellectual property relating
- * to a hardware implementation of the functionality of the software
- * licensed hereunder. You may use the software subject to the license
- * terms below provided that you ensure that this notice is replicated
- * unmodified and in its entirety in all distributions of the software,
- * modified or unmodified, in source code or in binary form.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met: redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer;
- * redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution;
- * neither the name of the copyright holders nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- * Authors: Gabor Dozsa
- */
-
-/* @file
- * The interface class for multi gem5 simulations.
- */
-
-#include "dev/net/multi_iface.hh"
-
-#include <queue>
-#include <thread>
-
-#include "base/random.hh"
-#include "base/trace.hh"
-#include "debug/MultiEthernet.hh"
-#include "debug/MultiEthernetPkt.hh"
-#include "dev/net/etherpkt.hh"
-#include "sim/sim_exit.hh"
-#include "sim/sim_object.hh"
-
-
-MultiIface::Sync *MultiIface::sync = nullptr;
-MultiIface::SyncEvent *MultiIface::syncEvent = nullptr;
-unsigned MultiIface::recvThreadsNum = 0;
-MultiIface *MultiIface::master = nullptr;
-
-bool
-MultiIface::Sync::run(SyncTrigger t, Tick sync_tick)
-{
- std::unique_lock<std::mutex> sync_lock(lock);
-
- trigger = t;
- if (trigger != SyncTrigger::periodic) {
- DPRINTF(MultiEthernet,"MultiIface::Sync::run() trigger:%d\n",
- (unsigned)trigger);
- }
-
- switch (state) {
- case SyncState::asyncCkpt:
- switch (trigger) {
- case SyncTrigger::ckpt:
- assert(MultiIface::syncEvent->interrupted == false);
- state = SyncState::busy;
- break;
- case SyncTrigger::periodic:
- if (waitNum == 0) {
- // So all recv threads got an async checkpoint request already
- // and a simExit is scheduled at the end of the current tick
- // (i.e. it is a periodic sync scheduled at the same tick as
- // the simExit).
- state = SyncState::idle;
- DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted "
- "due to async ckpt scheduled\n");
- return false;
- } else {
- // we still need to wait for some receiver thread to get the
- // aysnc ckpt request. We are going to proceed as 'interrupted'
- // periodic sync.
- state = SyncState::interrupted;
- DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted "
- "due to ckpt request is coming in\n");
- }
- break;
- case SyncTrigger::atomic:
- assert(trigger != SyncTrigger::atomic);
- }
- break;
- case SyncState::idle:
- state = SyncState::busy;
- break;
- // Only one sync can be active at any time
- case SyncState::interrupted:
- case SyncState::busy:
- assert(state != SyncState::interrupted);
- assert(state != SyncState::busy);
- break;
- }
- // Kick-off the sync unless we are in the middle of an interrupted
- // periodic sync
- if (state != SyncState::interrupted) {
- assert(waitNum == 0);
- waitNum = MultiIface::recvThreadsNum;
- // initiate the global synchronisation
- assert(MultiIface::master != nullptr);
- MultiIface::master->syncRaw(triggerToMsg[(unsigned)trigger], sync_tick);
- }
- // now wait until all receiver threads complete the synchronisation
- auto lf = [this]{ return waitNum == 0; };
- cv.wait(sync_lock, lf);
-
- // we are done
- assert(state == SyncState::busy || state == SyncState::interrupted);
- bool ret = (state != SyncState::interrupted);
- state = SyncState::idle;
- return ret;
-}
-
-void
-MultiIface::Sync::progress(MsgType msg)
-{
- std::unique_lock<std::mutex> sync_lock(lock);
-
- switch (msg) {
- case MsgType::cmdAtomicSyncAck:
- assert(state == SyncState::busy && trigger == SyncTrigger::atomic);
- break;
- case MsgType::cmdPeriodicSyncAck:
- assert(state == SyncState::busy && trigger == SyncTrigger::periodic);
- break;
- case MsgType::cmdCkptSyncAck:
- assert(state == SyncState::busy && trigger == SyncTrigger::ckpt);
- break;
- case MsgType::cmdCkptSyncReq:
- switch (state) {
- case SyncState::busy:
- if (trigger == SyncTrigger::ckpt) {
- // We are already in a checkpoint sync but got another ckpt
- // sync request. This may happen if two (or more) peer gem5
- // processes try to start a ckpt nearly at the same time.
- // Incrementing waitNum here (before decrementing it below)
- // effectively results in ignoring this new ckpt sync request.
- waitNum++;
- break;
- }
- assert (waitNum == recvThreadsNum);
- state = SyncState::interrupted;
- // we need to fall over here to handle "recvThreadsNum == 1" case
- case SyncState::interrupted:
- assert(trigger == SyncTrigger::periodic);
- assert(waitNum >= 1);
- if (waitNum == 1) {
- exitSimLoop("checkpoint");
- }
- break;
- case SyncState::idle:
- // There is no on-going sync so we got an async ckpt request. If we
- // are the only receiver thread then we need to schedule the
- // checkpoint. Otherwise, only change the state to 'asyncCkpt' and
- // let the last receiver thread to schedule the checkpoint at the
- // 'asyncCkpt' case.
- // Note that a periodic or resume sync may start later and that can
- // trigger a state change to 'interrupted' (so the checkpoint may
- // get scheduled at 'interrupted' case finally).
- assert(waitNum == 0);
- state = SyncState::asyncCkpt;
- waitNum = MultiIface::recvThreadsNum;
- // we need to fall over here to handle "recvThreadsNum == 1" case
- case SyncState::asyncCkpt:
- assert(waitNum >= 1);
- if (waitNum == 1)
- exitSimLoop("checkpoint");
- break;
- default:
- panic("Unexpected state for checkpoint request message");
- break;
- }
- break;
- default:
- panic("Unknown msg type");
- break;
- }
- waitNum--;
- assert(state != SyncState::idle);
- // Notify the simultaion thread if there is an on-going sync.
- if (state != SyncState::asyncCkpt) {
- sync_lock.unlock();
- cv.notify_one();
- }
-}
-
-void MultiIface::SyncEvent::start(Tick start, Tick interval)
-{
- assert(!scheduled());
- if (interval == 0)
- panic("Multi synchronisation period must be greater than zero");
- repeat = interval;
- schedule(start);
-}
-
-void
-MultiIface::SyncEvent::adjust(Tick start_tick, Tick repeat_tick)
-{
- // The new multi interface may require earlier start of the
- // synchronisation.
- assert(scheduled() == true);
- if (start_tick < when())
- reschedule(start_tick);
- // The new multi interface may require more frequent synchronisation.
- if (repeat == 0)
- panic("Multi synchronisation period must be greater than zero");
- if (repeat < repeat_tick)
- repeat = repeat_tick;
-}
-
-void
-MultiIface::SyncEvent::process()
-{
- /*
- * Note that this is a global event so this process method will be called
- * by only exactly one thread.
- */
- // if we are draining the system then we must not start a periodic sync (as
- // it is not sure that all peer gem5 will reach this tick before taking
- // the checkpoint).
- if (isDraining == true) {
- assert(interrupted == false);
- interrupted = true;
- DPRINTF(MultiEthernet,"MultiIface::SyncEvent::process() interrupted "
- "due to draining\n");
- return;
- }
- if (interrupted == false)
- scheduledAt = curTick();
- /*
- * We hold the eventq lock at this point but the receiver thread may
- * need the lock to schedule new recv events while waiting for the
- * multi sync to complete.
- * Note that the other simulation threads also release their eventq
- * locks while waiting for us due to the global event semantics.
- */
- curEventQueue()->unlock();
- // we do a global sync here
- interrupted = !MultiIface::sync->run(SyncTrigger::periodic, scheduledAt);
- // Global sync completed or got interrupted.
- // we are expected to exit with the eventq lock held
- curEventQueue()->lock();
- // schedule the next global sync event if this one completed. Otherwise
- // (i.e. this one was interrupted by a checkpoint request), we will
- // reschedule this one after the draining is complete.
- if (!interrupted)
- schedule(scheduledAt + repeat);
-}
-
-void MultiIface::SyncEvent::resume()
-{
- Tick sync_tick;
- assert(!scheduled());
- if (interrupted) {
- assert(curTick() >= scheduledAt);
- // We have to complete the interrupted periodic sync asap.
- // Note that this sync might be interrupted now again with a checkpoint
- // request from a peer gem5...
- sync_tick = curTick();
- schedule(sync_tick);
- } else {
- // So we completed the last periodic sync, let's find out the tick for
- // next one
- assert(curTick() > scheduledAt);
- sync_tick = scheduledAt + repeat;
- if (sync_tick < curTick())
- panic("Cannot resume periodic synchronisation");
- schedule(sync_tick);
- }
- DPRINTF(MultiEthernet,
- "MultiIface::SyncEvent periodic sync resumed at %lld "
- "(curTick:%lld)\n", sync_tick, curTick());
-}
-
-void MultiIface::SyncEvent::serialize(const std::string &base,
- CheckpointOut &cp) const
-{
- // Save the periodic multi sync schedule information
- paramOut(cp, base + ".periodicSyncRepeat", repeat);
- paramOut(cp, base + ".periodicSyncInterrupted", interrupted);
- paramOut(cp, base + ".periodicSyncAt", scheduledAt);
-}
-
-void MultiIface::SyncEvent::unserialize(const std::string &base,
- CheckpointIn &cp)
-{
- paramIn(cp, base + ".periodicSyncRepeat", repeat);
- paramIn(cp, base + ".periodicSyncInterrupted", interrupted);
- paramIn(cp, base + ".periodicSyncAt", scheduledAt);
-}
-
-MultiIface::MultiIface(unsigned multi_rank,
- Tick sync_start,
- Tick sync_repeat,
- EventManager *em) :
- syncStart(sync_start), syncRepeat(sync_repeat),
- recvThread(nullptr), eventManager(em), recvDone(nullptr),
- scheduledRecvPacket(nullptr), linkDelay(0), rank(multi_rank)
-{
- DPRINTF(MultiEthernet, "MultiIface() ctor rank:%d\n",multi_rank);
- if (master == nullptr) {
- assert(sync == nullptr);
- assert(syncEvent == nullptr);
- sync = new Sync();
- syncEvent = new SyncEvent();
- master = this;
- }
-}
-
-MultiIface::~MultiIface()
-{
- assert(recvThread);
- delete recvThread;
- if (this == master) {
- assert(syncEvent);
- delete syncEvent;
- assert(sync);
- delete sync;
- }
-}
-
-void
-MultiIface::packetOut(EthPacketPtr pkt, Tick send_delay)
-{
- MultiHeaderPkt::Header header_pkt;
- unsigned address_length = MultiHeaderPkt::maxAddressLength();
-
- // Prepare a multi header packet for the Ethernet packet we want to
- // send out.
- header_pkt.msgType = MsgType::dataDescriptor;
- header_pkt.sendTick = curTick();
- header_pkt.sendDelay = send_delay;
-
- // Store also the source and destination addresses.
- pkt->packAddress(header_pkt.srcAddress, header_pkt.dstAddress,
- address_length);
-
- header_pkt.dataPacketLength = pkt->size();
-
- // Send out the multi hedare packet followed by the Ethernet packet.
- sendRaw(&header_pkt, sizeof(header_pkt), header_pkt.dstAddress);
- sendRaw(pkt->data, pkt->size(), header_pkt.dstAddress);
- DPRINTF(MultiEthernetPkt,
- "MultiIface::sendDataPacket() done size:%d send_delay:%llu "
- "src:0x%02x%02x%02x%02x%02x%02x "
- "dst:0x%02x%02x%02x%02x%02x%02x\n",
- pkt->size(), send_delay,
- header_pkt.srcAddress[0], header_pkt.srcAddress[1],
- header_pkt.srcAddress[2], header_pkt.srcAddress[3],
- header_pkt.srcAddress[4], header_pkt.srcAddress[5],
- header_pkt.dstAddress[0], header_pkt.dstAddress[1],
- header_pkt.dstAddress[2], header_pkt.dstAddress[3],
- header_pkt.dstAddress[4], header_pkt.dstAddress[5]);
-}
-
-bool
-MultiIface::recvHeader(MultiHeaderPkt::Header &header_pkt)
-{
- // Blocking receive of an incoming multi header packet.
- return recvRaw((void *)&header_pkt, sizeof(header_pkt));
-}
-
-void
-MultiIface::recvData(const MultiHeaderPkt::Header &header_pkt)
-{
- // We are here beacuse a header packet has been received implying
- // that an Ethernet (data) packet is coming in next.
- assert(header_pkt.msgType == MsgType::dataDescriptor);
- // Allocate storage for the incoming Ethernet packet.
- EthPacketPtr new_packet(new EthPacketData(header_pkt.dataPacketLength));
- // Now execute the blocking receive and store the incoming data directly
- // in the new EthPacketData object.
- if (! recvRaw((void *)(new_packet->data), header_pkt.dataPacketLength))
- panic("Missing data packet");
-
- new_packet->length = header_pkt.dataPacketLength;
- // Grab the event queue lock to schedule a new receive event for the
- // data packet.
- curEventQueue()->lock();
- // Compute the receive tick. It includes the send delay and the
- // simulated link delay.
- Tick recv_tick = header_pkt.sendTick + header_pkt.sendDelay + linkDelay;
- DPRINTF(MultiEthernetPkt, "MultiIface::recvThread() packet receive, "
- "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
- header_pkt.sendTick, header_pkt.sendDelay, linkDelay, recv_tick);
-
- if (recv_tick <= curTick()) {
- panic("Simulators out of sync - missed packet receive by %llu ticks",
- curTick() - recv_tick);
- }
- // Now we are about to schedule a recvDone event for the new data packet.
- // We use the same recvDone object for all incoming data packets. If
- // that is already scheduled - i.e. a receive event for a previous
- // data packet is already pending - then we have to check whether the
- // receive tick for the new packet is earlier than that of the currently
- // pending event. Packets may arrive out-of-order with respect to
- // simulated receive time. If that is the case, we need to re-schedule the
- // recvDone event for the new packet. Otherwise, we save the packet
- // pointer and the recv tick for the new packet in the recvQueue. See
- // the implementation of the packetIn() method for comments on how this
- // information is retrieved from the recvQueue by the simulation thread.
- if (!recvDone->scheduled()) {
- assert(recvQueue.size() == 0);
- assert(scheduledRecvPacket == nullptr);
- scheduledRecvPacket = new_packet;
- eventManager->schedule(recvDone, recv_tick);
- } else if (recvDone->when() > recv_tick) {
- recvQueue.emplace(scheduledRecvPacket, recvDone->when());
- eventManager->reschedule(recvDone, recv_tick);
- scheduledRecvPacket = new_packet;
- } else {
- recvQueue.emplace(new_packet, recv_tick);
- }
- curEventQueue()->unlock();
-}
-
-void
-MultiIface::recvThreadFunc()
-{
- EthPacketPtr new_packet;
- MultiHeaderPkt::Header header;
-
- // The new receiver thread shares the event queue with the simulation
- // thread (associated with the simulated Ethernet link).
- curEventQueue(eventManager->eventQueue());
- // Main loop to wait for and process any incoming message.
- for (;;) {
- // recvHeader() blocks until the next multi header packet comes in.
- if (!recvHeader(header)) {
- // We lost connection to the peer gem5 processes most likely
- // because one of them called m5 exit. So we stop here.
- exit_message("info", 0, "Message server closed connection, "
- "simulation is exiting");
- }
- // We got a valid multi header packet, let's process it
- if (header.msgType == MsgType::dataDescriptor) {
- recvData(header);
- } else {
- // everything else must be synchronisation related command
- sync->progress(header.msgType);
- }
- }
-}
-
-EthPacketPtr
-MultiIface::packetIn()
-{
- // We are called within the process() method of the recvDone event. We
- // return the packet that triggered the current receive event.
- // If there is further packets in the recvQueue, we also have to schedule
- // the recvEvent for the next packet with the smallest receive tick.
- // The priority queue container ensures that smallest receive tick is
- // always on the top of the queue.
- assert(scheduledRecvPacket != nullptr);
- EthPacketPtr next_packet = scheduledRecvPacket;
-
- if (! recvQueue.empty()) {
- eventManager->schedule(recvDone, recvQueue.top().second);
- scheduledRecvPacket = recvQueue.top().first;
- recvQueue.pop();
- } else {
- scheduledRecvPacket = nullptr;
- }
-
- return next_packet;
-}
-
-void
-MultiIface::spawnRecvThread(Event *recv_done, Tick link_delay)
-{
- assert(recvThread == nullptr);
- // all receive thread must be spawned before simulation starts
- assert(eventManager->eventQueue()->getCurTick() == 0);
-
- recvDone = recv_done;
- linkDelay = link_delay;
-
- recvThread = new std::thread(&MultiIface::recvThreadFunc, this);
-
- recvThreadsNum++;
-}
-
-DrainState
-MultiIface::drain()
-{
- DPRINTF(MultiEthernet,"MultiIFace::drain() called\n");
-
- // This can be called multiple times in the same drain cycle.
- if (master == this) {
- syncEvent->isDraining = true;
- }
-
- return DrainState::Drained;
-}
-
-void MultiIface::drainDone() {
- if (master == this) {
- assert(syncEvent->isDraining == true);
- syncEvent->isDraining = false;
- // We need to resume the interrupted periodic sync here now that the
- // draining is done. If the last periodic sync completed before the
- // checkpoint then the next one is already scheduled.
- if (syncEvent->interrupted)
- syncEvent->resume();
- }
-}
-
-void MultiIface::serialize(const std::string &base, CheckpointOut &cp) const
-{
- // Drain the multi interface before the checkpoint is taken. We cannot call
- // this as part of the normal drain cycle because this multi sync has to be
- // called exactly once after the system is fully drained.
- // Note that every peer will take a checkpoint but they may take it at
- // different ticks.
- // This sync request may interrupt an on-going periodic sync in some peers.
- sync->run(SyncTrigger::ckpt, curTick());
-
- // Save the periodic multi sync status
- syncEvent->serialize(base, cp);
-
- unsigned n_rx_packets = recvQueue.size();
- if (scheduledRecvPacket != nullptr)
- n_rx_packets++;
-
- paramOut(cp, base + ".nRxPackets", n_rx_packets);
-
- if (n_rx_packets > 0) {
- assert(recvDone->scheduled());
- scheduledRecvPacket->serialize(base + ".rxPacket[0]", cp);
- }
-
- for (unsigned i=1; i < n_rx_packets; i++) {
- const RecvInfo recv_info = recvQueue.impl().at(i-1);
- recv_info.first->serialize(base + csprintf(".rxPacket[%d]", i), cp);
- Tick rx_tick = recv_info.second;
- paramOut(cp, base + csprintf(".rxTick[%d]", i), rx_tick);
- }
-}
-
-void MultiIface::unserialize(const std::string &base, CheckpointIn &cp)
-{
- assert(recvQueue.size() == 0);
- assert(scheduledRecvPacket == nullptr);
- assert(recvDone->scheduled() == false);
-
- // restore periodic sync info
- syncEvent->unserialize(base, cp);
-
- unsigned n_rx_packets;
- paramIn(cp, base + ".nRxPackets", n_rx_packets);
-
- if (n_rx_packets > 0) {
- scheduledRecvPacket = std::make_shared<EthPacketData>(16384);
- scheduledRecvPacket->unserialize(base + ".rxPacket[0]", cp);
- // Note: receive event will be scheduled when the link is unserialized
- }
-
- for (unsigned i=1; i < n_rx_packets; i++) {
- EthPacketPtr rx_packet = std::make_shared<EthPacketData>(16384);
- rx_packet->unserialize(base + csprintf(".rxPacket[%d]", i), cp);
- Tick rx_tick = 0;
- paramIn(cp, base + csprintf(".rxTick[%d]", i), rx_tick);
- assert(rx_tick > 0);
- recvQueue.emplace(rx_packet,rx_tick);
- }
-}
-
-void MultiIface::initRandom()
-{
- // Initialize the seed for random generator to avoid the same sequence
- // in all gem5 peer processes
- assert(master != nullptr);
- if (this == master)
- random_mt.init(5489 * (rank+1) + 257);
-}
-
-void MultiIface::startPeriodicSync()
-{
- DPRINTF(MultiEthernet, "MultiIface:::initPeriodicSync started\n");
- // Do a global sync here to ensure that peer gem5 processes are around
- // (actually this may not be needed...)
- sync->run(SyncTrigger::atomic, curTick());
-
- // Start the periodic sync if it is a fresh simulation from scratch
- if (curTick() == 0) {
- if (this == master) {
- syncEvent->start(syncStart, syncRepeat);
- inform("Multi synchronisation activated: start at %lld, "
- "repeat at every %lld ticks.\n",
- syncStart, syncRepeat);
- } else {
- // In case another multiIface object requires different schedule
- // for periodic sync than the master does.
- syncEvent->adjust(syncStart, syncRepeat);
- }
- } else {
- // Schedule the next periodic sync if resuming from a checkpoint
- if (this == master)
- syncEvent->resume();
- }
- DPRINTF(MultiEthernet, "MultiIface::initPeriodicSync done\n");
-}
diff --git a/src/dev/net/multi_iface.hh b/src/dev/net/multi_iface.hh
deleted file mode 100644
index f8ce2abf7..000000000
--- a/src/dev/net/multi_iface.hh
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
- * Copyright (c) 2015 ARM Limited
- * All rights reserved
- *
- * The license below extends only to copyright in the software and shall
- * not be construed as granting a license to any other intellectual
- * property including but not limited to intellectual property relating
- * to a hardware implementation of the functionality of the software
- * licensed hereunder. You may use the software subject to the license
- * terms below provided that you ensure that this notice is replicated
- * unmodified and in its entirety in all distributions of the software,
- * modified or unmodified, in source code or in binary form.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met: redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer;
- * redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution;
- * neither the name of the copyright holders nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- * Authors: Gabor Dozsa
- */
-
-/* @file
- * The interface class for multi gem5 simulations.
- *
- * Multi gem5 is an extension to gem5 to enable parallel simulation of a
- * distributed system (e.g. simulation of a pool of machines
- * connected by Ethernet links). A multi gem5 run consists of seperate gem5
- * processes running in parallel. Each gem5 process executes
- * the simulation of a component of the simulated distributed system.
- * (An example component can be a multi-core board with an Ethernet NIC.)
- * The MultiIface class below provides services to transfer data and
- * control messages among the gem5 processes. The main such services are
- * as follows.
- *
- * 1. Send a data packet coming from a simulated Ethernet link. The packet
- * will be transferred to (all) the target(s) gem5 processes. The send
- * operation is always performed by the simulation thread, i.e. the gem5
- * thread that is processing the event queue associated with the simulated
- * Ethernet link.
- *
- * 2. Spawn a receiver thread to process messages coming in from the
- * from other gem5 processes. Each simulated Ethernet link has its own
- * associated receiver thread. The receiver thread saves the incoming packet
- * and schedule an appropriate receive event in the event queue.
- *
- * 3. Schedule a global barrier event periodically to keep the gem5
- * processes in sync.
- * Periodic barrier event to keep peer gem5 processes in sync. The basic idea
- * is that no gem5 process can go ahead further than the simulated link
- * transmission delay to ensure that a corresponding receive event can always
- * be scheduled for any message coming in from a peer gem5 process.
- *
- *
- *
- * This interface is an abstract class (sendRaw() and recvRaw()
- * methods are pure virtual). It can work with various low level
- * send/receive service implementations (e.g. TCP/IP, MPI,...). A TCP
- * stream socket version is implemented in dev/src/tcp_iface.[hh,cc].
- */
-#ifndef __DEV_NET_MULTI_IFACE_HH__
-#define __DEV_NET_MULTI_IFACE_HH__
-
-#include <array>
-#include <mutex>
-#include <queue>
-#include <thread>
-#include <utility>
-
-#include "dev/net/etherpkt.hh"
-#include "dev/net/multi_packet.hh"
-#include "sim/core.hh"
-#include "sim/drain.hh"
-#include "sim/global_event.hh"
-
-class EventManager;
-
-/**
- * The interface class to talk to peer gem5 processes.
- */
-class MultiIface : public Drainable
-{
- public:
- /*!
- * The possible reasons a multi sync among gem5 peers is needed for.
- */
- enum
- class SyncTrigger {
- periodic, /*!< Regular periodic sync. This can be interrupted by a
- checkpoint sync request */
- ckpt, /*!< sync before taking a checkpoint */
- atomic /*!< sync that cannot be interrupted (e.g. sync at startup) */
- };
-
- private:
- typedef MultiHeaderPkt::MsgType MsgType;
-
- /** Sync State-Machine
- \dot
- digraph Sync {
- node [shape=box, fontsize=10];
- idle -> busy
- [ label="new trigger\n by run()" fontsize=8 ];
- busy -> busy
- [ label="new message by progress():\n(msg == SyncAck &&\nwaitNum > 1) || \n(msg==CkptSyncReq &&\ntrigger == ckpt)" fontsize=8 ];
- busy -> idle
- [ label="new message by progress():\n(msg == SyncAck &&\nwaitNum == 1)" fontsize=8 ];
- busy -> interrupted
- [ label="new message by progress():\n(msg == CkptSyncReq &&\ntrigger == periodic)" fontsize=8 ];
- idle -> asyncCkpt
- [ label="new message by progress():\nmsg == CkptSyncReq" fontsize=8 ];
- asyncCkpt -> asyncCkpt
- [ label="new message by progress():\nmsg == CkptSyncReq" fontsize=8 ];
- asyncCkpt -> busy
- [ label="new trigger by run():\ntrigger == ckpt" fontsize=8 ];
- asyncCkpt -> idle
- [ label="new trigger by run():\n(trigger == periodic &&\nwaitNum == 0) " fontsize=8 ];
- asyncCkpt -> interrupted
- [ label="new trigger by run():\n(trigger == periodic &&\nwaitNum > 0) " fontsize=8 ];
- interrupted -> interrupted
- [ label="new message by progress():\n(msg == CkptSyncReq &&\nwaitNum > 1)" fontsize=8 ];
- interrupted -> idle
- [ label="new message by progress():\n(msg == CkptSyncReq &&\nwaitNum == 1)" fontsize=8 ];
- }
- \enddot
- */
- /** @class Sync
- * This class implements global sync operations among gem5 peer processes.
- *
- * @note This class is used as a singleton object (shared by all MultiIface
- * objects).
- */
- class Sync
- {
- private:
- /*!
- * Internal state of the sync singleton object.
- */
- enum class SyncState {
- busy, /*!< There is an on-going sync. */
- interrupted, /*!< An on-going periodic sync was interrupted. */
- asyncCkpt, /*!< A checkpoint (sim_exit) is already scheduled */
- idle /*!< There is no active sync. */
- };
- /**
- * The lock to protect access to the MultiSync object.
- */
- std::mutex lock;
- /**
- * Condition variable for the simulation thread to wait on
- * until all receiver threads completes the current global
- * synchronisation.
- */
- std::condition_variable cv;
- /**
- * Number of receiver threads that not yet completed the current global
- * synchronisation.
- */
- unsigned waitNum;
- /**
- * The trigger for the most recent sync.
- */
- SyncTrigger trigger;
- /**
- * Map sync triggers to request messages.
- */
- std::array<MsgType, 3> triggerToMsg = {{
- MsgType::cmdPeriodicSyncReq,
- MsgType::cmdCkptSyncReq,
- MsgType::cmdAtomicSyncReq
- }};
-
- /**
- * Current sync state.
- */
- SyncState state;
-
- public:
- /**
- * Core method to perform a full multi sync.
- *
- * @param t Sync trigger.
- * @param sync_tick The tick the sync was expected to happen at.
- * @return true if the sync completed, false if it was interrupted.
- *
- * @note In case of an interrupted periodic sync, sync_tick can be less
- * than curTick() when we resume (i.e. re-run) it
- */
- bool run(SyncTrigger t, Tick sync_tick);
- /**
- * Callback when the receiver thread gets a sync message.
- */
- void progress(MsgType m);
-
- Sync() : waitNum(0), state(SyncState::idle) {}
- ~Sync() {}
- };
-
-
- /**
- * The global event to schedule peridic multi sync. It is used as a
- * singleton object.
- *
- * The periodic synchronisation works as follows.
- * 1. A MultisyncEvent is scheduled as a global event when startup() is
- * called.
- * 2. The progress() method of the MultisyncEvent initiates a new barrier
- * for each simulated Ethernet links.
- * 3. Simulation thread(s) then waits until all receiver threads
- * completes the ongoing barrier. The global sync event is done.
- */
- class SyncEvent : public GlobalSyncEvent
- {
- public:
- /**
- * Flag to indicate that the most recent periodic sync was interrupted
- * (by a checkpoint request).
- */
- bool interrupted;
- /**
- * The tick when the most recent periodic synchronisation was scheduled
- * at.
- */
- Tick scheduledAt;
- /**
- * Flag to indicate an on-going drain cycle.
- */
- bool isDraining;
-
- public:
- /**
- * Only the firstly instanstiated MultiIface object will
- * call this constructor.
- */
- SyncEvent() : GlobalSyncEvent(Default_Pri, 0), interrupted(false),
- scheduledAt(0), isDraining(false) {}
-
- ~SyncEvent() { assert (scheduled() == false); }
- /**
- * Schedule the first periodic sync event.
- *
- * @param start Start tick for multi synchronisation
- * @param repeat Frequency of multi synchronisation
- *
- */
- void start(Tick start, Tick repeat);
- /**
- * Reschedule (if necessary) the periodic sync event.
- *
- * @param start Start tick for multi synchronisation
- * @param repeat Frequency of multi synchronisation
- *
- * @note Useful if we have multiple MultiIface objects with
- * different 'start' and 'repeat' values for global sync.
- */
- void adjust(Tick start, Tick repeat);
- /**
- * This is a global event so process() will be called by each
- * simulation threads. (See further comments in the .cc file.)
- */
- void process() override;
- /**
- * Schedule periodic sync when resuming from a checkpoint.
- */
- void resume();
-
- void serialize(const std::string &base, CheckpointOut &cp) const;
- void unserialize(const std::string &base, CheckpointIn &cp);
- };
-
- /**
- * The receive thread needs to store the packet pointer and the computed
- * receive tick for each incoming data packet. This information is used
- * by the simulation thread when it processes the corresponding receive
- * event. (See more comments at the implemetation of the recvThreadFunc()
- * and RecvPacketIn() methods.)
- */
- typedef std::pair<EthPacketPtr, Tick> RecvInfo;
-
- /**
- * Comparison predicate for RecvInfo, needed by the recvQueue.
- */
- struct RecvInfoCompare {
- bool operator()(const RecvInfo &lhs, const RecvInfo &rhs)
- {
- return lhs.second > rhs.second;
- }
- };
-
- /**
- * Customized priority queue used to store incoming data packets info by
- * the receiver thread. We need to expose the underlying container to
- * enable iterator access for serializing.
- */
- class RecvQueue : public std::priority_queue<RecvInfo,
- std::vector<RecvInfo>,
- RecvInfoCompare>
- {
- public:
- std::vector<RecvInfo> &impl() { return c; }
- const std::vector<RecvInfo> &impl() const { return c; }
- };
-
- /*
- * The priority queue to store RecvInfo items ordered by receive ticks.
- */
- RecvQueue recvQueue;
- /**
- * The singleton Sync object to perform multi synchronisation.
- */
- static Sync *sync;
- /**
- * The singleton SyncEvent object to schedule periodic multi sync.
- */
- static SyncEvent *syncEvent;
- /**
- * Tick to schedule the first multi sync event.
- * This is just as optimization : we do not need any multi sync
- * event until the simulated NIC is brought up by the OS.
- */
- Tick syncStart;
- /**
- * Frequency of multi sync events in ticks.
- */
- Tick syncRepeat;
- /**
- * Receiver thread pointer.
- * Each MultiIface object must have exactly one receiver thread.
- */
- std::thread *recvThread;
- /**
- * The event manager associated with the MultiIface object.
- */
- EventManager *eventManager;
-
- /**
- * The receive done event for the simulated Ethernet link.
- * It is scheduled by the receiver thread for each incoming data
- * packet.
- */
- Event *recvDone;
-
- /**
- * The packet that belongs to the currently scheduled recvDone event.
- */
- EthPacketPtr scheduledRecvPacket;
-
- /**
- * The link delay in ticks for the simulated Ethernet link.
- */
- Tick linkDelay;
-
- /**
- * The rank of this process among the gem5 peers.
- */
- unsigned rank;
- /**
- * Total number of receiver threads (in this gem5 process).
- * During the simulation it should be constant and equal to the
- * number of MultiIface objects (i.e. simulated Ethernet
- * links).
- */
- static unsigned recvThreadsNum;
- /**
- * The very first MultiIface object created becomes the master. We need
- * a master to co-ordinate the global synchronisation.
- */
- static MultiIface *master;
-
- protected:
- /**
- * Low level generic send routine.
- * @param buf buffer that holds the data to send out
- * @param length number of bytes to send
- * @param dest_addr address of the target (simulated NIC). This may be
- * used by a subclass for optimization (e.g. optimize broadcast)
- */
- virtual void sendRaw(void *buf,
- unsigned length,
- const MultiHeaderPkt::AddressType dest_addr) = 0;
- /**
- * Low level generic receive routine.
- * @param buf the buffer to store the incoming message
- * @param length buffer size (in bytes)
- */
- virtual bool recvRaw(void *buf, unsigned length) = 0;
- /**
- * Low level request for synchronisation among gem5 processes. Only one
- * MultiIface object needs to call this (in each gem5 process) to trigger
- * a multi sync.
- *
- * @param sync_req Sync request command.
- * @param sync_tick The tick when sync is expected to happen in the sender.
- */
- virtual void syncRaw(MsgType sync_req, Tick sync_tick) = 0;
- /**
- * The function executed by a receiver thread.
- */
- void recvThreadFunc();
- /**
- * Receive a multi header packet. Called by the receiver thread.
- * @param header the structure to store the incoming header packet.
- * @return false if any error occured during the receive, true otherwise
- *
- * A header packet can carry a control command (e.g. 'barrier leave') or
- * information about a data packet that is following the header packet
- * back to back.
- */
- bool recvHeader(MultiHeaderPkt::Header &header);
- /**
- * Receive a data packet. Called by the receiver thread.
- * @param data_header The packet descriptor for the expected incoming data
- * packet.
- */
- void recvData(const MultiHeaderPkt::Header &data_header);
-
- public:
-
- /**
- * ctor
- * @param multi_rank Rank of this gem5 process within the multi run
- * @param sync_start Start tick for multi synchronisation
- * @param sync_repeat Frequency for multi synchronisation
- * @param em The event manager associated with the simulated Ethernet link
- */
- MultiIface(unsigned multi_rank,
- Tick sync_start,
- Tick sync_repeat,
- EventManager *em);
-
- virtual ~MultiIface();
- /**
- * Send out an Ethernet packet.
- * @param pkt The Ethernet packet to send.
- * @param send_delay The delay in ticks for the send completion event.
- */
- void packetOut(EthPacketPtr pkt, Tick send_delay);
- /**
- * Fetch the next packet from the receive queue.
- */
- EthPacketPtr packetIn();
-
- /**
- * spawn the receiver thread.
- * @param recv_done The receive done event associated with the simulated
- * Ethernet link.
- * @param link_delay The link delay for the simulated Ethernet link.
- */
- void spawnRecvThread(Event *recv_done,
- Tick link_delay);
- /**
- * Initialize the random number generator with a different seed in each
- * peer gem5 process.
- */
- void initRandom();
-
- DrainState drain() override;
-
- /**
- * Callback when draining is complete.
- */
- void drainDone();
-
- /**
- * Initialize the periodic synchronisation among peer gem5 processes.
- */
- void startPeriodicSync();
-
- void serialize(const std::string &base, CheckpointOut &cp) const;
- void unserialize(const std::string &base, CheckpointIn &cp);
-
-};
-
-
-#endif // __DEV_NET_MULTI_IFACE_HH__
diff --git a/src/dev/net/tcp_iface.cc b/src/dev/net/tcp_iface.cc
index 035ec8fd0..38fc7aef2 100644
--- a/src/dev/net/tcp_iface.cc
+++ b/src/dev/net/tcp_iface.cc
@@ -35,25 +35,35 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Authors: Gabor Dozsa
+ * Mohammad Alian
*/
/* @file
- * TCP stream socket based interface class implementation for multi gem5 runs.
+ * TCP stream socket based interface class implementation for dist-gem5 runs.
*/
#include "dev/net/tcp_iface.hh"
#include <arpa/inet.h>
#include <netdb.h>
+#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <cerrno>
#include <cstring>
+#include <vector>
#include "base/types.hh"
-#include "debug/MultiEthernet.hh"
+#include "debug/DistEthernet.hh"
+#include "debug/DistEthernetCmd.hh"
+#include "sim/sim_exit.hh"
+
+#if defined(__FreeBSD__)
+#include <netinet/in.h>
+
+#endif
// MSG_NOSIGNAL does not exists on OS X
#if defined(__APPLE__) || defined(__MACH__)
@@ -64,42 +74,181 @@
using namespace std;
+std::vector<std::pair<TCPIface::NodeInfo, int> > TCPIface::nodes;
vector<int> TCPIface::sockRegistry;
+int TCPIface::fdStatic = -1;
+bool TCPIface::anyListening = false;
TCPIface::TCPIface(string server_name, unsigned server_port,
- unsigned multi_rank, Tick sync_start, Tick sync_repeat,
- EventManager *em) :
- MultiIface(multi_rank, sync_start, sync_repeat, em)
+ unsigned dist_rank, unsigned dist_size,
+ Tick sync_start, Tick sync_repeat,
+ EventManager *em, bool is_switch, int num_nodes) :
+ DistIface(dist_rank, dist_size, sync_start, sync_repeat, em,
+ is_switch, num_nodes), serverName(server_name),
+ serverPort(server_port), isSwitch(is_switch), listening(false)
{
- struct addrinfo addr_hint, *addr_results;
+ if (is_switch && isMaster) {
+ while (!listen(serverPort)) {
+ DPRINTF(DistEthernet, "TCPIface(listen): Can't bind port %d\n",
+ serverPort);
+ serverPort++;
+ }
+ inform("tcp_iface listening on port %d", serverPort);
+ // Now accept the first connection requests from each compute node and
+ // store the node info. The compute nodes will then wait for ack
+ // messages. Ack messages will be sent by initTransport() in the
+ // appropriate order to make sure that every compute node is always
+ // connected to the same switch port.
+ NodeInfo ni;
+ for (int i = 0; i < size; i++) {
+ accept();
+ DPRINTF(DistEthernet, "First connection, waiting for link info\n");
+ if (!recvTCP(sock, &ni, sizeof(ni)))
+ panic("Failed to receive link info");
+ nodes.push_back(make_pair(ni, sock));
+ }
+ }
+}
+
+bool
+TCPIface::listen(int port)
+{
+ if (listening)
+ panic("Socket already listening!");
+
+ struct sockaddr_in sockaddr;
int ret;
- string port_str = to_string(server_port);
+ fdStatic = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+ panic_if(fdStatic < 0, "socket() failed: %s", strerror(errno));
- sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
- panic_if(sock < 0, "socket() failed: %s", strerror(errno));
+ sockaddr.sin_family = PF_INET;
+ sockaddr.sin_addr.s_addr = INADDR_ANY;
+ sockaddr.sin_port = htons(port);
+ // finally clear sin_zero
+ memset(&sockaddr.sin_zero, 0, sizeof(sockaddr.sin_zero));
+ ret = ::bind(fdStatic, (struct sockaddr *)&sockaddr, sizeof (sockaddr));
- bzero(&addr_hint, sizeof(addr_hint));
- addr_hint.ai_family = AF_INET;
- addr_hint.ai_socktype = SOCK_STREAM;
- addr_hint.ai_protocol = IPPROTO_TCP;
+ if (ret != 0) {
+ if (ret == -1 && errno != EADDRINUSE)
+ panic("ListenSocket(listen): bind() failed!");
+ return false;
+ }
- ret = getaddrinfo(server_name.c_str(), port_str.c_str(),
- &addr_hint, &addr_results);
- panic_if(ret < 0, "getaddrinf() failed: %s", strerror(errno));
+ if (::listen(fdStatic, 24) == -1) {
+ if (errno != EADDRINUSE)
+ panic("ListenSocket(listen): listen() failed!");
- DPRINTF(MultiEthernet, "Connecting to %s:%u\n",
- server_name.c_str(), port_str.c_str());
+ return false;
+ }
- ret = ::connect(sock, (struct sockaddr *)(addr_results->ai_addr),
- addr_results->ai_addrlen);
- panic_if(ret < 0, "connect() failed: %s", strerror(errno));
+ listening = true;
+ anyListening = true;
+ return true;
+}
- freeaddrinfo(addr_results);
- // add our socket to the static registry
+void
+TCPIface::establishConnection()
+{
+ static unsigned cur_rank = 0;
+ static unsigned cur_id = 0;
+ NodeInfo ni;
+
+ if (isSwitch) {
+ if (cur_id == 0) { // first connection accepted in the ctor already
+ auto const &iface0 =
+ find_if(nodes.begin(), nodes.end(),
+ [](const pair<NodeInfo, int> &cn) -> bool {
+ return cn.first.rank == cur_rank;
+ });
+ assert(iface0 != nodes.end());
+ assert(iface0->first.distIfaceId == 0);
+ sock = iface0->second;
+ ni = iface0->first;
+ } else { // additional connections from the same compute node
+ accept();
+ DPRINTF(DistEthernet, "Next connection, waiting for link info\n");
+ if (!recvTCP(sock, &ni, sizeof(ni)))
+ panic("Failed to receive link info");
+ assert(ni.rank == cur_rank);
+ assert(ni.distIfaceId == cur_id);
+ }
+ inform("Link okay (iface:%d -> (node:%d, iface:%d))",
+ distIfaceId, ni.rank, ni.distIfaceId);
+ if (ni.distIfaceId < ni.distIfaceNum - 1) {
+ cur_id++;
+ } else {
+ cur_rank++;
+ cur_id = 0;
+ }
+ // send ack
+ ni.distIfaceId = distIfaceId;
+ ni.distIfaceNum = distIfaceNum;
+ sendTCP(sock, &ni, sizeof(ni));
+ } else { // this is not a switch
+ connect();
+ // send link info
+ ni.rank = rank;
+ ni.distIfaceId = distIfaceId;
+ ni.distIfaceNum = distIfaceNum;
+ sendTCP(sock, &ni, sizeof(ni));
+ DPRINTF(DistEthernet, "Connected, waiting for ack (distIfaceId:%d\n",
+ distIfaceId);
+ if (!recvTCP(sock, &ni, sizeof(ni)))
+ panic("Failed to receive ack");
+ assert(ni.rank == rank);
+ inform("Link okay (iface:%d -> switch iface:%d)", distIfaceId,
+ ni.distIfaceId);
+ }
sockRegistry.push_back(sock);
- // let the server know who we are
- sendTCP(sock, &multi_rank, sizeof(multi_rank));
+}
+
+void
+TCPIface::accept()
+{
+ struct sockaddr_in sockaddr;
+ socklen_t slen = sizeof (sockaddr);
+ sock = ::accept(fdStatic, (struct sockaddr *)&sockaddr, &slen);
+ if (sock != -1) {
+ int i = 1;
+ if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&i,
+ sizeof(i)) < 0)
+ warn("ListenSocket(accept): setsockopt() TCP_NODELAY failed!");
+ }
+}
+
+void
+TCPIface::connect()
+{
+ struct addrinfo addr_hint, *addr_results;
+ int ret;
+
+ string port_str = to_string(serverPort);
+
+ sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+ panic_if(sock < 0, "socket() failed: %s", strerror(errno));
+
+ int fl = 1;
+ if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&fl, sizeof(fl)) < 0)
+ warn("ConnectSocket(connect): setsockopt() TCP_NODELAY failed!");
+
+ bzero(&addr_hint, sizeof(addr_hint));
+ addr_hint.ai_family = AF_INET;
+ addr_hint.ai_socktype = SOCK_STREAM;
+ addr_hint.ai_protocol = IPPROTO_TCP;
+
+ ret = getaddrinfo(serverName.c_str(), port_str.c_str(),
+ &addr_hint, &addr_results);
+ panic_if(ret < 0, "getaddrinf() failed: %s", strerror(errno));
+
+ DPRINTF(DistEthernet, "Connecting to %s:%s\n",
+ serverName.c_str(), port_str.c_str());
+
+ ret = ::connect(sock, (struct sockaddr *)(addr_results->ai_addr),
+ addr_results->ai_addrlen);
+ panic_if(ret < 0, "connect() failed: %s", strerror(errno));
+
+ freeaddrinfo(addr_results);
}
TCPIface::~TCPIface()
@@ -111,12 +260,20 @@ TCPIface::~TCPIface()
}
void
-TCPIface::sendTCP(int sock, void *buf, unsigned length)
+TCPIface::sendTCP(int sock, const void *buf, unsigned length)
{
ssize_t ret;
ret = ::send(sock, buf, length, MSG_NOSIGNAL);
- panic_if(ret < 0, "send() failed: %s", strerror(errno));
+ if (ret < 0) {
+ if (errno == ECONNRESET || errno == EPIPE) {
+ inform("send(): %s", strerror(errno));
+ exit_message("info", 0, "Message server closed connection, "
+ "simulation is exiting");
+ } else {
+ panic("send() failed: %s", strerror(errno));
+ }
+ }
panic_if(ret != length, "send() failed");
}
@@ -140,19 +297,47 @@ TCPIface::recvTCP(int sock, void *buf, unsigned length)
}
void
-TCPIface::syncRaw(MultiHeaderPkt::MsgType sync_req, Tick sync_tick)
+TCPIface::sendPacket(const Header &header, const EthPacketPtr &packet)
+{
+ sendTCP(sock, &header, sizeof(header));
+ sendTCP(sock, packet->data, packet->length);
+}
+
+void
+TCPIface::sendCmd(const Header &header)
{
- /*
- * Barrier is simply implemented by point-to-point messages to the server
- * for now. This method is called by only one TCPIface object.
- * The server will send back an 'ack' message when it gets the
- * sync request from all clients.
- */
- MultiHeaderPkt::Header header_pkt;
- header_pkt.msgType = sync_req;
- header_pkt.sendTick = sync_tick;
-
- for (auto s : sockRegistry)
- sendTCP(s, (void *)&header_pkt, sizeof(header_pkt));
+ DPRINTF(DistEthernetCmd, "TCPIface::sendCmd() type: %d\n",
+ static_cast<int>(header.msgType));
+ // Global commands (i.e. sync request) are always sent by the master
+ // DistIface. The transfer method is simply implemented as point-to-point
+ // messages for now
+ for (auto s: sockRegistry)
+ sendTCP(s, (void*)&header, sizeof(header));
}
+bool
+TCPIface::recvHeader(Header &header)
+{
+ bool ret = recvTCP(sock, &header, sizeof(header));
+ DPRINTF(DistEthernetCmd, "TCPIface::recvHeader() type: %d ret: %d\n",
+ static_cast<int>(header.msgType), ret);
+ return ret;
+}
+
+void
+TCPIface::recvPacket(const Header &header, EthPacketPtr &packet)
+{
+ packet = make_shared<EthPacketData>(header.dataPacketLength);
+ bool ret = recvTCP(sock, packet->data, header.dataPacketLength);
+ panic_if(!ret, "Error while reading socket");
+ packet->length = header.dataPacketLength;
+}
+
+void
+TCPIface::initTransport()
+{
+ // We cannot setup the conections in the constructor because the number
+ // of dist interfaces (per process) is unknown until the (simobject) init
+ // phase. That information is necessary for global connection ordering.
+ establishConnection();
+}
diff --git a/src/dev/net/tcp_iface.hh b/src/dev/net/tcp_iface.hh
index 2eb2c1c07..97ae9cde0 100644
--- a/src/dev/net/tcp_iface.hh
+++ b/src/dev/net/tcp_iface.hh
@@ -35,17 +35,17 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Authors: Gabor Dozsa
+ * Mohammad Alian
*/
/* @file
- * TCP stream socket based interface class for multi gem5 runs.
+ * TCP stream socket based interface class for dist-gem5 runs.
*
- * For a high level description about multi gem5 see comments in
- * header file multi_iface.hh.
+ * For a high level description about dist-gem5 see comments in
+ * header file dist_iface.hh.
*
- * The TCP subclass of MultiIface uses a separate server process
- * (see tcp_server.[hh,cc] under directory gem5/util/multi). Each gem5
- * process connects to the server via a stream socket. The server process
+ * Each gem5 process connects to the server (another gem5 process which
+ * simulates a switch box) via a stream socket. The server process
* transfers messages and co-ordinates the synchronisation among the gem5
* peers.
*/
@@ -55,11 +55,11 @@
#include <string>
-#include "dev/net/multi_iface.hh"
+#include "dev/net/dist_iface.hh"
class EventManager;
-class TCPIface : public MultiIface
+class TCPIface : public DistIface
{
private:
/**
@@ -67,8 +67,28 @@ class TCPIface : public MultiIface
*/
int sock;
+ std::string serverName;
+ int serverPort;
+
+ bool isSwitch;
+
+ bool listening;
+ static bool anyListening;
+ static int fdStatic;
+
+ /**
+ * Compute node info and storage for the very first connection from each
+ * node (used by the switch)
+ */
+ struct NodeInfo
+ {
+ unsigned rank;
+ unsigned distIfaceId;
+ unsigned distIfaceNum;
+ };
+ static std::vector<std::pair<NodeInfo, int> > nodes;
/**
- * Registry for all sockets to the server opened by this gem5 process.
+ * Storage for all opened sockets
*/
static std::vector<int> sockRegistry;
@@ -82,7 +102,7 @@ class TCPIface : public MultiIface
* @param length Size of the message in bytes.
*/
void
- sendTCP(int sock, void *buf, unsigned length);
+ sendTCP(int sock, const void *buf, unsigned length);
/**
* Receive the next incoming message through a TCP stream socket.
@@ -92,24 +112,26 @@ class TCPIface : public MultiIface
* @param length Exact size of the expected message in bytes.
*/
bool recvTCP(int sock, void *buf, unsigned length);
-
+ bool listen(int port);
+ void accept();
+ void connect();
+ int getfdStatic() const { return fdStatic; }
+ bool islistening() const { return listening; }
+ bool anyislistening() const { return anyListening; }
+ void establishConnection();
protected:
- virtual void
- sendRaw(void *buf, unsigned length,
- const MultiHeaderPkt::AddressType dest_addr=nullptr) override
- {
- sendTCP(sock, buf, length);
- }
+ void sendPacket(const Header &header,
+ const EthPacketPtr &packet) override;
- virtual bool recvRaw(void *buf, unsigned length) override
- {
- return recvTCP(sock, buf, length);
- }
+ void sendCmd(const Header &header) override;
+
+ bool recvHeader(Header &header) override;
+
+ void recvPacket(const Header &header, EthPacketPtr &packet) override;
- virtual void syncRaw(MultiHeaderPkt::MsgType sync_req,
- Tick sync_tick) override;
+ void initTransport() override;
public:
/**
@@ -118,14 +140,15 @@ class TCPIface : public MultiIface
* server process.
* @param server_port The port number the server listening for new
* connections.
- * @param sync_start The tick for the first multi synchronisation.
- * @param sync_repeat The frequency of multi synchronisation.
+ * @param sync_start The tick for the first dist synchronisation.
+ * @param sync_repeat The frequency of dist synchronisation.
* @param em The EventManager object associated with the simulated
* Ethernet link.
*/
TCPIface(std::string server_name, unsigned server_port,
- unsigned multi_rank, Tick sync_start, Tick sync_repeat,
- EventManager *em);
+ unsigned dist_rank, unsigned dist_size,
+ Tick sync_start, Tick sync_repeat, EventManager *em,
+ bool is_switch, int num_nodes);
~TCPIface() override;
};
diff --git a/src/sim/global_event.hh b/src/sim/global_event.hh
index 88981b52a..1aab4a233 100644
--- a/src/sim/global_event.hh
+++ b/src/sim/global_event.hh
@@ -219,7 +219,7 @@ class GlobalSyncEvent : public BaseGlobalEventTemplate<GlobalSyncEvent>
};
GlobalSyncEvent(Priority p, Flags f)
- : Base(p, f)
+ : Base(p, f), repeat(0)
{ }
GlobalSyncEvent(Tick when, Tick _repeat, Priority p, Flags f)
diff --git a/src/dev/net/multi_packet.cc b/src/sim/initparam_keys.hh
index 85f76b0c4..3ea68ddfe 100644
--- a/src/dev/net/multi_packet.cc
+++ b/src/sim/initparam_keys.hh
@@ -38,63 +38,33 @@
*/
/* @file
- * MultiHeaderPkt class to encapsulate multi-gem5 header packets
- *
+ * Magic key definitions for the InitParam pseudo inst
*/
+#ifndef ___SIM_INITPARAM_KEYS_HH__
+#define ___SIM_INITPARAM_KEYS_HH__
-#include "dev/net/multi_packet.hh"
-
-#include <cstdint>
-#include <cstring>
-
-#include "base/inet.hh"
-
-unsigned
-MultiHeaderPkt::maxAddressLength()
-{
- return sizeof(AddressType);
-}
-
-void
-MultiHeaderPkt::clearAddress(AddressType &addr)
-{
- std::memset(addr, 0, sizeof(addr));
-}
-
-bool
-MultiHeaderPkt::isAddressEqual(const AddressType &addr1,
- const AddressType &addr2)
-{
- return (std::memcmp(addr1, addr2, sizeof(addr1)) == 0);
-}
-
-bool
-MultiHeaderPkt::isAddressLess(const AddressType &addr1,
- const AddressType &addr2)
-{
- return (std::memcmp(addr1, addr2, sizeof(addr1)) < 0);
-}
-
-void
-MultiHeaderPkt::copyAddress(AddressType &dest, const AddressType &src)
-{
- std::memcpy(dest, src, sizeof(dest));
-}
-
-bool
-MultiHeaderPkt::isBroadcastAddress(const AddressType &addr)
-{
- return ((Net::EthAddr *)&addr)->broadcast();
-}
-
-bool
-MultiHeaderPkt::isMulticastAddress(const AddressType &addr)
+namespace PseudoInst {
+/**
+ * Unique keys to retrieve various params by the initParam pseudo inst.
+ *
+ * @note Each key must be shorter than 16 characters (because we use
+ * two 64-bit registers two pass in the key to the initparam function)
+ */
+struct InitParamKey
{
- return ((Net::EthAddr *)&addr)->multicast();
-}
+ /**
+ * The default key (empty string)
+ */
+ static constexpr const char *DEFAULT = "";
+ /**
+ * Unique key for "rank" param (distributed gem5 runs)
+ */
+ static constexpr const char *DIST_RANK = "dist-rank";
+ /**
+ * Unique key for "size" param (distributed gem5 runs)
+ */
+ static constexpr const char *DIST_SIZE = "dist-size";
+};
+} // namespace PseudoInst
-bool
-MultiHeaderPkt::isUnicastAddress(const AddressType &addr)
-{
- return ((Net::EthAddr *)&addr)->unicast();
-}
+#endif
diff --git a/src/sim/pseudo_inst.cc b/src/sim/pseudo_inst.cc
index 0f7de0c3a..67c47ce77 100644
--- a/src/sim/pseudo_inst.cc
+++ b/src/sim/pseudo_inst.cc
@@ -63,8 +63,10 @@
#include "debug/PseudoInst.hh"
#include "debug/Quiesce.hh"
#include "debug/WorkItems.hh"
+#include "dev/net/dist_iface.hh"
#include "params/BaseCPU.hh"
#include "sim/full_system.hh"
+#include "sim/initparam_keys.hh"
#include "sim/process.hh"
#include "sim/pseudo_inst.hh"
#include "sim/serialize.hh"
@@ -357,8 +359,10 @@ void
m5exit(ThreadContext *tc, Tick delay)
{
DPRINTF(PseudoInst, "PseudoInst::m5exit(%i)\n", delay);
- Tick when = curTick() + delay * SimClock::Int::ns;
- exitSimLoop("m5_exit instruction encountered", 0, when, 0, true);
+ if (DistIface::readyToExit(delay)) {
+ Tick when = curTick() + delay * SimClock::Int::ns;
+ exitSimLoop("m5_exit instruction encountered", 0, when, 0, true);
+ }
}
void
@@ -471,10 +475,14 @@ initParam(ThreadContext *tc, uint64_t key_str1, uint64_t key_str2)
// Compare the key parameter with the known values to select the return
// value
uint64_t val;
- if (strlen(key_str) == 0) {
+ if (strcmp(key_str, InitParamKey::DEFAULT) == 0) {
val = tc->getCpuPtr()->system->init_param;
+ } else if (strcmp(key_str, InitParamKey::DIST_RANK) == 0) {
+ val = DistIface::rankParam();
+ } else if (strcmp(key_str, InitParamKey::DIST_SIZE) == 0) {
+ val = DistIface::sizeParam();
} else {
- panic("Unknown key for initparam pseudo instruction");
+ panic("Unknown key for initparam pseudo instruction:\"%s\"", key_str);
}
return val;
}
@@ -529,10 +537,11 @@ m5checkpoint(ThreadContext *tc, Tick delay, Tick period)
if (!tc->getCpuPtr()->params()->do_checkpoint_insts)
return;
- Tick when = curTick() + delay * SimClock::Int::ns;
- Tick repeat = period * SimClock::Int::ns;
-
- exitSimLoop("checkpoint", 0, when, repeat);
+ if (DistIface::readyToCkpt(delay, period)) {
+ Tick when = curTick() + delay * SimClock::Int::ns;
+ Tick repeat = period * SimClock::Int::ns;
+ exitSimLoop("checkpoint", 0, when, repeat);
+ }
}
uint64_t
diff --git a/util/multi/Makefile b/util/multi/Makefile
deleted file mode 100644
index a58dd3307..000000000
--- a/util/multi/Makefile
+++ /dev/null
@@ -1,63 +0,0 @@
-#
-# Copyright (c) 2015 ARM Limited
-# All rights reserved
-#
-# The license below extends only to copyright in the software and shall
-# not be construed as granting a license to any other intellectual
-# property including but not limited to intellectual property relating
-# to a hardware implementation of the functionality of the software
-# licensed hereunder. You may use the software subject to the license
-# terms below provided that you ensure that this notice is replicated
-# unmodified and in its entirety in all distributions of the software,
-# modified or unmodified, in source code or in binary form.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met: redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer;
-# redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution;
-# neither the name of the copyright holders nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-#
-# Authors: Gabor Dozsa
-
-CXX= g++
-
-DEBUG= -DDEBUG
-
-M5_ARCH?= ARM
-M5_DIR?= ../..
-
-vpath % $(M5_DIR)/build/$(M5_ARCH)/dev
-
-INCDIRS= -I. -I$(M5_DIR)/build/$(M5_ARCH) -I$(M5_DIR)/ext
-CCFLAGS= -g -Wall -O3 $(DEBUG) -std=c++11 -MMD $(INCDIRS)
-
-default: tcp_server
-
-clean:
- @rm -f tcp_server *.o *.d *~
-
-tcp_server: tcp_server.o multi_packet.o
- $(CXX) $(LFLAGS) -o $@ $^
-
-%.o: %.cc
- @echo '$(CXX) $(CCFLAGS) -c $(notdir $<) -o $@'
- @$(CXX) $(CCFLAGS) -c $< -o $@
-
--include *.d
diff --git a/util/multi/tcp_server.cc b/util/multi/tcp_server.cc
deleted file mode 100644
index 1ec4303d3..000000000
--- a/util/multi/tcp_server.cc
+++ /dev/null
@@ -1,463 +0,0 @@
-/*
- * Copyright (c) 2015 ARM Limited
- * All rights reserved
- *
- * The license below extends only to copyright in the software and shall
- * not be construed as granting a license to any other intellectual
- * property including but not limited to intellectual property relating
- * to a hardware implementation of the functionality of the software
- * licensed hereunder. You may use the software subject to the license
- * terms below provided that you ensure that this notice is replicated
- * unmodified and in its entirety in all distributions of the software,
- * modified or unmodified, in source code or in binary form.
- *
- * Copyright (c) 2008 The Regents of The University of Michigan
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met: redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer;
- * redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution;
- * neither the name of the copyright holders nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- * Authors: Gabor Dozsa
- */
-
-
-/* @file
- * Message server implementation using TCP stream sockets for parallel gem5
- * runs.
- */
-#include <arpa/inet.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-#include <cstdio>
-#include <cstdlib>
-
-#include "tcp_server.hh"
-
-using namespace std;
-
-// Some basic macros for information and error reporting.
-#define PRINTF(...) fprintf(stderr, __VA_ARGS__)
-
-#ifdef DEBUG
-static bool debugSetup = true;
-static bool debugPeriodic = false;
-static bool debugSync = true;
-static bool debugPkt = false;
-#define DPRINTF(v,...) if (v) PRINTF(__VA_ARGS__)
-#else
-#define DPRINTF(v,...)
-#endif
-
-#define inform(...) do { PRINTF("info: "); \
- PRINTF(__VA_ARGS__); } while(0)
-
-#define panic(...) do { PRINTF("panic: "); \
- PRINTF(__VA_ARGS__); \
- PRINTF("\n[%s:%s], line %d\n", \
- __FUNCTION__, __FILE__, __LINE__); \
- exit(-1); } while(0)
-
-TCPServer *TCPServer::instance = nullptr;
-
-TCPServer::Channel::Channel() : fd(-1), isAlive(false), state(SyncState::idle)
-{
- MultiHeaderPkt::clearAddress(address);
-}
-
-unsigned
-TCPServer::Channel::recvRaw(void *buf, unsigned size) const
-{
- ssize_t n;
- // This is a blocking receive.
- n = recv(fd, buf, size, MSG_WAITALL);
-
- if (n < 0)
- panic("read() failed:%s", strerror(errno));
- else if (n > 0 && n < size)
- // the recv() call should wait for the full message
- panic("read() failed");
-
- return n;
-}
-
-void
-TCPServer::Channel::sendRaw(const void *buf, unsigned size) const
-{
- ssize_t n;
- n = send(fd, buf, size, MSG_NOSIGNAL);
- if (n < 0)
- panic("write() failed:%s", strerror(errno));
- else if (n != size)
- panic("write() failed");
-}
-
-void TCPServer::Channel::updateAddress(const AddressType &new_address)
-{
- // check if the known address has changed (e.g. the client reconfigured
- // its Ethernet NIC)
- if (MultiHeaderPkt::isAddressEqual(address, new_address))
- return;
-
- // So we have to update the address. Note that we always
- // store the same address as key in the map but the ordering
- // may change so we need to erase and re-insert it again.
- auto info = TCPServer::instance->addressMap.find(&address);
- if (info != TCPServer::instance->addressMap.end()) {
- TCPServer::instance->addressMap.erase(info);
- }
-
- MultiHeaderPkt::copyAddress(address, new_address);
- TCPServer::instance->addressMap[&address] = this;
-}
-
-void
-TCPServer::Channel::headerPktIn()
-{
- ssize_t n;
- Header hdr_pkt;
-
- n = recvRaw(&hdr_pkt, sizeof(hdr_pkt));
-
- if (n == 0) {
- // EOF - nothing to do here, we will handle this as a POLLRDHUP event
- // in the main loop.
- return;
- }
-
- if (hdr_pkt.msgType == MsgType::dataDescriptor) {
- updateAddress(hdr_pkt.srcAddress);
- TCPServer::instance->xferData(hdr_pkt, *this);
- } else {
- processCmd(hdr_pkt.msgType, hdr_pkt.sendTick);
- }
-}
-
-void TCPServer::Channel::processCmd(MsgType cmd, Tick send_tick)
-{
- switch (cmd) {
- case MsgType::cmdAtomicSyncReq:
- DPRINTF(debugSync,"Atomic sync request (rank:%d)\n",rank);
- assert(state == SyncState::idle);
- state = SyncState::atomic;
- TCPServer::instance->syncTryComplete(SyncState::atomic,
- MsgType::cmdAtomicSyncAck);
- break;
- case MsgType::cmdPeriodicSyncReq:
- DPRINTF(debugPeriodic,"PERIODIC sync request (at %ld)\n",send_tick);
- // sanity check
- if (TCPServer::instance->periodicSyncTick() == 0) {
- TCPServer::instance->periodicSyncTick(send_tick);
- } else if ( TCPServer::instance->periodicSyncTick() != send_tick) {
- panic("Out of order periodic sync request - rank:%d "
- "(send_tick:%ld ongoing:%ld)", rank, send_tick,
- TCPServer::instance->periodicSyncTick());
- }
- switch (state) {
- case SyncState::idle:
- state = SyncState::periodic;
- TCPServer::instance->syncTryComplete(SyncState::periodic,
- MsgType::cmdPeriodicSyncAck);
- break;
- case SyncState::asyncCkpt:
- // An async ckpt request has already been sent to this client and
- // that will interrupt this periodic sync. We can simply drop this
- // message.
- break;
- default:
- panic("Unexpected state for periodic sync request (rank:%d)",
- rank);
- break;
- }
- break;
- case MsgType::cmdCkptSyncReq:
- DPRINTF(debugSync, "CKPT sync request (rank:%d)\n",rank);
- switch (state) {
- case SyncState::idle:
- TCPServer::instance->ckptPropagate(*this);
- // we fall through here to complete #clients==1 case
- case SyncState::asyncCkpt:
- state = SyncState::ckpt;
- TCPServer::instance->syncTryComplete(SyncState::ckpt,
- MsgType::cmdCkptSyncAck);
- break;
- default:
- panic("Unexpected state for ckpt sync request (rank:%d)", rank);
- break;
- }
- break;
- default:
- panic("Unexpected header packet (rank:%d)",rank);
- break;
- }
-}
-
-TCPServer::TCPServer(unsigned clients_num,
- unsigned listen_port,
- int timeout_in_sec)
-{
- assert(instance == nullptr);
- construct(clients_num, listen_port, timeout_in_sec);
- instance = this;
-}
-
-TCPServer::~TCPServer()
-{
- for (auto &c : clientsPollFd)
- close(c.fd);
-}
-
-void
-TCPServer::construct(unsigned clients_num, unsigned port, int timeout_in_sec)
-{
- int listen_sock, new_sock, ret;
- unsigned client_len;
- struct sockaddr_in server_addr, client_addr;
- struct pollfd new_pollfd;
- Channel new_channel;
-
- DPRINTF(debugSetup, "Start listening on port %u ...\n", port);
-
- listen_sock = socket(AF_INET, SOCK_STREAM, 0);
- if (listen_sock < 0)
- panic("socket() failed:%s", strerror(errno));
-
- bzero(&server_addr, sizeof(server_addr));
- server_addr.sin_family = AF_INET;
- server_addr.sin_addr.s_addr = INADDR_ANY;
- server_addr.sin_port = htons(port);
- if (bind(listen_sock, (struct sockaddr *) &server_addr,
- sizeof(server_addr)) < 0)
- panic("bind() failed:%s", strerror(errno));
- listen(listen_sock, 10);
-
- clientsPollFd.reserve(clients_num);
- clientsChannel.reserve(clients_num);
-
- new_pollfd.events = POLLIN | POLLRDHUP;
- new_pollfd.revents = 0;
- while (clientsPollFd.size() < clients_num) {
- new_pollfd.fd = listen_sock;
- ret = poll(&new_pollfd, 1, timeout_in_sec*1000);
- if (ret == 0)
- panic("Timeout while waiting for clients to connect");
- assert(ret == 1 && new_pollfd.revents == POLLIN);
- client_len = sizeof(client_addr);
- new_sock = accept(listen_sock,
- (struct sockaddr *) &client_addr,
- &client_len);
- if (new_sock < 0)
- panic("accept() failed:%s", strerror(errno));
- new_pollfd.fd = new_sock;
- new_pollfd.revents = 0;
- clientsPollFd.push_back(new_pollfd);
- new_channel.fd = new_sock;
- new_channel.isAlive = true;
- new_channel.recvRaw(&new_channel.rank, sizeof(new_channel.rank));
- clientsChannel.push_back(new_channel);
-
- DPRINTF(debugSetup, "New client connection addr:%u port:%hu rank:%d\n",
- client_addr.sin_addr.s_addr, client_addr.sin_port,
- new_channel.rank);
- }
- ret = close(listen_sock);
- assert(ret == 0);
-
- DPRINTF(debugSetup, "Setup complete\n");
-}
-
-void
-TCPServer::run()
-{
- int nfd;
- unsigned num_active_clients = clientsPollFd.size();
-
- DPRINTF(debugSetup, "Entering run() loop\n");
- while (num_active_clients == clientsPollFd.size()) {
- nfd = poll(&clientsPollFd[0], clientsPollFd.size(), -1);
- if (nfd == -1)
- panic("poll() failed:%s", strerror(errno));
-
- for (unsigned i = 0, n = 0;
- i < clientsPollFd.size() && (signed)n < nfd;
- i++) {
- struct pollfd &pfd = clientsPollFd[i];
- if (pfd.revents) {
- if (pfd.revents & POLLERR)
- panic("poll() returned POLLERR");
- if (pfd.revents & POLLIN) {
- clientsChannel[i].headerPktIn();
- }
- if (pfd.revents & POLLRDHUP) {
- // One gem5 process exited or aborted. Either way, we
- // assume the full simulation should stop now (either
- // because m5 exit was called or a serious error
- // occurred.) So we quit the run loop here and close all
- // sockets to notify the remaining peer gem5 processes.
- pfd.events = 0;
- clientsChannel[i].isAlive = false;
- num_active_clients--;
- DPRINTF(debugSetup, "POLLRDHUP event");
- }
- n++;
- if ((signed)n == nfd)
- break;
- }
- }
- }
- DPRINTF(debugSetup, "Exiting run() loop\n");
-}
-
-void
-TCPServer::xferData(const Header &hdr_pkt, const Channel &src)
-{
- unsigned n;
- assert(hdr_pkt.dataPacketLength <= sizeof(packetBuffer));
- n = src.recvRaw(packetBuffer, hdr_pkt.dataPacketLength);
-
- if (n == 0)
- panic("recvRaw() failed");
- DPRINTF(debugPkt, "Incoming data packet (from rank %d) "
- "src:0x%02x%02x%02x%02x%02x%02x "
- "dst:0x%02x%02x%02x%02x%02x%02x\n",
- src.rank,
- hdr_pkt.srcAddress[0],
- hdr_pkt.srcAddress[1],
- hdr_pkt.srcAddress[2],
- hdr_pkt.srcAddress[3],
- hdr_pkt.srcAddress[4],
- hdr_pkt.srcAddress[5],
- hdr_pkt.dstAddress[0],
- hdr_pkt.dstAddress[1],
- hdr_pkt.dstAddress[2],
- hdr_pkt.dstAddress[3],
- hdr_pkt.dstAddress[4],
- hdr_pkt.dstAddress[5]);
- // Now try to figure out the destination client(s).
- auto dst_info = addressMap.find(&hdr_pkt.dstAddress);
-
- // First handle the multicast/broadcast or unknonw destination case. These
- // all trigger a broadcast of the packet to all clients.
- if (MultiHeaderPkt::isUnicastAddress(hdr_pkt.dstAddress) == false ||
- dst_info == addressMap.end()) {
- unsigned n = 0;
- for (auto const &c: clientsChannel) {
- if (c.isAlive && &c!=&src) {
- c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
- c.sendRaw(packetBuffer, hdr_pkt.dataPacketLength);
- n++;
- }
- }
- if (n == 0) {
- inform("Broadcast/multicast packet dropped\n");
- }
- } else {
- // It is a unicast address with a known destination
- Channel *dst = dst_info->second;
- if (dst->isAlive) {
- dst->sendRaw(&hdr_pkt, sizeof(hdr_pkt));
- dst->sendRaw(packetBuffer, hdr_pkt.dataPacketLength);
- DPRINTF(debugPkt, "Unicast packet sent (to rank %d)\n",dst->rank);
- } else {
- inform("Unicast packet dropped (destination exited)\n");
- }
- }
-}
-
-void
-TCPServer::syncTryComplete(SyncState st, MsgType ack)
-{
- // Check if the barrieris complete. If so then notify all the clients.
- for (auto &c : clientsChannel) {
- if (c.isAlive && (c.state != st)) {
- // sync not complete yet, stop here
- return;
- }
- }
- // Sync complete, send out the acks
- MultiHeaderPkt::Header hdr_pkt;
- hdr_pkt.msgType = ack;
- for (auto &c : clientsChannel) {
- if (c.isAlive) {
- c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
- c.state = SyncState::idle;
- }
- }
- // Reset periodic send tick
- _periodicSyncTick = 0;
- DPRINTF(st == SyncState::periodic ? debugPeriodic : debugSync,
- "Sync COMPLETE\n");
-}
-
-void
-TCPServer::ckptPropagate(Channel &ch)
-{
- // Channel ch got a ckpt request that needs to be propagated to the other
- // clients
- MultiHeaderPkt::Header hdr_pkt;
- hdr_pkt.msgType = MsgType::cmdCkptSyncReq;
- for (auto &c : clientsChannel) {
- if (c.isAlive && (&c != &ch)) {
- switch (c.state) {
- case SyncState::idle:
- case SyncState::periodic:
- c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
- c.state = SyncState::asyncCkpt;
- break;
- default:
- panic("Unexpected state for ckpt sync request propagation "
- "(rank:%d)\n",c.rank);
- break;
- }
- }
- }
-}
-
-int main(int argc, char *argv[])
-{
- TCPServer *server;
- int clients_num = -1, listen_port = -1;
- int first_arg = 1, timeout_in_sec = 60;
-
- if (argc > 1 && string(argv[1]).compare("-debug") == 0) {
- timeout_in_sec = -1;
- first_arg++;
- argc--;
- }
-
- if (argc != 3)
- panic("We need two command line args (number of clients and tcp listen"
- " port");
-
- clients_num = atoi(argv[first_arg]);
- listen_port = atoi(argv[first_arg + 1]);
-
- server = new TCPServer(clients_num, listen_port, timeout_in_sec);
-
- server->run();
-
- delete server;
-
- return 0;
-}
diff --git a/util/multi/tcp_server.hh b/util/multi/tcp_server.hh
deleted file mode 100644
index c5f2a9ce8..000000000
--- a/util/multi/tcp_server.hh
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright (c) 2015 ARM Limited
- * All rights reserved
- *
- * The license below extends only to copyright in the software and shall
- * not be construed as granting a license to any other intellectual
- * property including but not limited to intellectual property relating
- * to a hardware implementation of the functionality of the software
- * licensed hereunder. You may use the software subject to the license
- * terms below provided that you ensure that this notice is replicated
- * unmodified and in its entirety in all distributions of the software,
- * modified or unmodified, in source code or in binary form.
- *
- * Copyright (c) 2008 The Regents of The University of Michigan
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met: redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer;
- * redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution;
- * neither the name of the copyright holders nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- * Authors: Gabor Dozsa
- */
-
-/* @file
- * Message server using TCP stream sockets for parallel gem5 runs.
- *
- * For a high level description about multi gem5 see comments in
- * header files src/dev/multi_iface.hh and src/dev/tcp_iface.hh.
- *
- * This file implements the central message server process for multi gem5.
- * The server is responsible the following tasks.
- * 1. Establishing a TCP socket connection for each gem5 process (clients).
- *
- * 2. Process data messages coming in from clients. The server checks
- * the MAC addresses in the header message and transfers the message
- * to the target(s) client(s).
- *
- * 3. Processing synchronisation related control messages. Synchronisation
- * is performed as follows. The server waits for a 'barrier enter' message
- * from all the clients. When the last such control message arrives, the
- * server sends out a 'barrier leave' control message to all the clients.
- *
- * 4. Triggers complete termination in case a client exits. A client may
- * exit either by calling 'm5 exit' pseudo instruction or due to a fatal
- * error. In either case, we assume that the entire multi simulation needs to
- * terminate. The server triggers full termination by tearing down the
- * open TCP sockets.
- *
- * The TCPServer class is instantiated as a singleton object.
- *
- * The server can be built independently from the rest of gem5 (and it is
- * architecture agnostic). See the Makefile in the same directory.
- *
- */
-
-#include <poll.h>
-
-#include <map>
-#include <vector>
-
-#include "dev/etherpkt.hh"
-#include "dev/multi_packet.hh"
-
-/**
- * The maximum length of an Ethernet packet (allowing Jumbo frames).
- */
-#define MAX_ETH_PACKET_LENGTH 9014
-
-class TCPServer
-{
- public:
- typedef MultiHeaderPkt::AddressType AddressType;
- typedef MultiHeaderPkt::Header Header;
- typedef MultiHeaderPkt::MsgType MsgType;
-
- private:
-
- enum
- class SyncState { periodic, ckpt, asyncCkpt, atomic, idle };
- /**
- * The Channel class encapsulates all the information about a client
- * and its current status.
- */
- class Channel
- {
- private:
- /**
- * The MAC address of the client.
- */
- AddressType address;
-
- /**
- * Update the client MAC address. It is called every time a new data
- * packet is to come in.
- */
- void updateAddress(const AddressType &new_addr);
- /**
- * Process an incoming command message.
- */
- void processCmd(MultiHeaderPkt::MsgType cmd, Tick send_tick);
-
-
- public:
- /**
- * TCP stream socket.
- */
- int fd;
- /**
- * Is client connected?
- */
- bool isAlive;
- /**
- * Current state of the channel wrt. multi synchronisation.
- */
- SyncState state;
- /**
- * Multi rank of the client
- */
- unsigned rank;
-
- public:
- Channel();
- ~Channel () {}
-
-
- /**
- * Receive and process the next incoming header packet.
- */
- void headerPktIn();
- /**
- * Send raw data to the connected client.
- *
- * @param data The data to send.
- * @param size Size of the data (in bytes).
- */
- void sendRaw(const void *data, unsigned size) const;
- /**
- * Receive raw data from the connected client.
- *
- * @param buf The buffer to store the incoming data into.
- * @param size Size of data to receive (in bytes).
- * @return In case of success, it returns size. Zero is returned
- * if the socket is already closed by the client.
- */
- unsigned recvRaw(void *buf, unsigned size) const;
- };
-
- /**
- * The array of socket descriptors needed by the poll() system call.
- */
- std::vector<struct pollfd> clientsPollFd;
- /**
- * Array holding all clients info.
- */
- std::vector<Channel> clientsChannel;
-
-
- /**
- * We use a map to select the target client based on the destination
- * MAC address.
- */
- struct AddressCompare
- {
- bool operator()(const AddressType *a1, const AddressType *a2)
- {
- return MultiHeaderPkt::isAddressLess(*a1, *a2);
- }
- };
- std::map<const AddressType *, Channel *, AddressCompare> addressMap;
-
- /**
- * As we dealt with only one message at a time, we can allocate and re-use
- * a single packet buffer (to hold any incoming data packet).
- */
- uint8_t packetBuffer[MAX_ETH_PACKET_LENGTH];
- /**
- * Send tick of the current periodic sync. It is used for sanity check.
- */
- Tick _periodicSyncTick;
- /**
- * The singleton server object.
- */
- static TCPServer *instance;
-
- /**
- * Set up the socket connections to all the clients.
- *
- * @param listen_port The port we are listening on for new client
- * connection requests.
- * @param nclients The number of clients to connect to.
- * @param timeout Timeout in sec to complete the setup phase
- * (i.e. all gem5 establish socket connections)
- */
- void construct(unsigned listen_port, unsigned nclients, int timeout);
- /**
- * Transfer the header and the follow up data packet to the target(s)
- * clients.
- *
- * @param hdr The header message structure.
- * @param ch The source channel for the message.
- */
- void xferData(const Header &hdr, const Channel &ch);
- /**
- * Check if the current round of a synchronisation is completed and notify
- * the clients if it is so.
- *
- * @param st The state all channels should have if sync is complete.
- * @param ack The type of ack message to send out if the sync is compete.
- */
- void syncTryComplete(SyncState st, MultiHeaderPkt::MsgType ack);
- /**
- * Broadcast a request for checkpoint sync.
- *
- * @param ch The source channel of the checkpoint sync request.
- */
- void ckptPropagate(Channel &ch);
- /**
- * Setter for current periodic send tick.
- */
- void periodicSyncTick(Tick t) { _periodicSyncTick = t; }
- /**
- * Getter for current periodic send tick.
- */
- Tick periodicSyncTick() { return _periodicSyncTick; }
-
- public:
-
- TCPServer(unsigned clients_num, unsigned listen_port, int timeout_in_sec);
- ~TCPServer();
-
- /**
- * The main server loop that waits for and processes incoming messages.
- */
- void run();
-};