summaryrefslogtreecommitdiff
path: root/src/dev/net/dist_iface.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/dev/net/dist_iface.cc')
-rw-r--r--src/dev/net/dist_iface.cc808
1 files changed, 808 insertions, 0 deletions
diff --git a/src/dev/net/dist_iface.cc b/src/dev/net/dist_iface.cc
new file mode 100644
index 000000000..45ce651a9
--- /dev/null
+++ b/src/dev/net/dist_iface.cc
@@ -0,0 +1,808 @@
+/*
+ * Copyright (c) 2015 ARM Limited
+ * All rights reserved
+ *
+ * The license below extends only to copyright in the software and shall
+ * not be construed as granting a license to any other intellectual
+ * property including but not limited to intellectual property relating
+ * to a hardware implementation of the functionality of the software
+ * licensed hereunder. You may use the software subject to the license
+ * terms below provided that you ensure that this notice is replicated
+ * unmodified and in its entirety in all distributions of the software,
+ * modified or unmodified, in source code or in binary form.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met: redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer;
+ * redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution;
+ * neither the name of the copyright holders nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Authors: Gabor Dozsa
+ */
+
+/* @file
+ * The interface class for dist-gem5 simulations.
+ */
+
+#include "dev/net/dist_iface.hh"
+
+#include <queue>
+#include <thread>
+
+#include "base/random.hh"
+#include "base/trace.hh"
+#include "debug/DistEthernet.hh"
+#include "debug/DistEthernetPkt.hh"
+#include "dev/net/etherpkt.hh"
+#include "sim/sim_exit.hh"
+#include "sim/sim_object.hh"
+
+using namespace std;
+DistIface::Sync *DistIface::sync = nullptr;
+DistIface::SyncEvent *DistIface::syncEvent = nullptr;
+unsigned DistIface::distIfaceNum = 0;
+unsigned DistIface::recvThreadsNum = 0;
+DistIface *DistIface::master = nullptr;
+
+void
+DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
+{
+ if (start_tick < firstAt) {
+ firstAt = start_tick;
+ inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
+ }
+
+ if (repeat_tick == 0)
+ panic("Dist synchronisation interval must be greater than zero");
+
+ if (repeat_tick < nextRepeat) {
+ nextRepeat = repeat_tick;
+ inform("Dist synchronisation interval is changed to %lu.\n",
+ nextRepeat);
+ }
+}
+
+DistIface::SyncSwitch::SyncSwitch(int num_nodes)
+{
+ numNodes = num_nodes;
+ waitNum = num_nodes;
+ numExitReq = 0;
+ numCkptReq = 0;
+ doExit = false;
+ doCkpt = false;
+ firstAt = std::numeric_limits<Tick>::max();
+ nextAt = 0;
+ nextRepeat = std::numeric_limits<Tick>::max();
+}
+
+DistIface::SyncNode::SyncNode()
+{
+ waitNum = 0;
+ needExit = ReqType::none;
+ needCkpt = ReqType::none;
+ doExit = false;
+ doCkpt = false;
+ firstAt = std::numeric_limits<Tick>::max();
+ nextAt = 0;
+ nextRepeat = std::numeric_limits<Tick>::max();
+}
+
+void
+DistIface::SyncNode::run(bool same_tick)
+{
+ std::unique_lock<std::mutex> sync_lock(lock);
+ Header header;
+
+ assert(waitNum == 0);
+ waitNum = DistIface::recvThreadsNum;
+ // initiate the global synchronisation
+ header.msgType = MsgType::cmdSyncReq;
+ header.sendTick = curTick();
+ header.syncRepeat = nextRepeat;
+ header.needCkpt = needCkpt;
+ if (needCkpt != ReqType::none)
+ needCkpt = ReqType::pending;
+ header.needExit = needExit;
+ if (needExit != ReqType::none)
+ needExit = ReqType::pending;
+ DistIface::master->sendCmd(header);
+ // now wait until all receiver threads complete the synchronisation
+ auto lf = [this]{ return waitNum == 0; };
+ cv.wait(sync_lock, lf);
+ // global synchronisation is done
+ assert(!same_tick || (nextAt == curTick()));
+}
+
+
+void
+DistIface::SyncSwitch::run(bool same_tick)
+{
+ std::unique_lock<std::mutex> sync_lock(lock);
+ Header header;
+ // Wait for the sync requests from the nodes
+ if (waitNum > 0) {
+ auto lf = [this]{ return waitNum == 0; };
+ cv.wait(sync_lock, lf);
+ }
+ assert(waitNum == 0);
+ assert(!same_tick || (nextAt == curTick()));
+ waitNum = numNodes;
+ // Complete the global synchronisation
+ header.msgType = MsgType::cmdSyncAck;
+ header.sendTick = nextAt;
+ header.syncRepeat = nextRepeat;
+ if (doCkpt || numCkptReq == numNodes) {
+ doCkpt = true;
+ header.needCkpt = ReqType::immediate;
+ numCkptReq = 0;
+ } else {
+ header.needCkpt = ReqType::none;
+ }
+ if (doExit || numExitReq == numNodes) {
+ doExit = true;
+ header.needExit = ReqType::immediate;
+ } else {
+ header.needExit = ReqType::none;
+ }
+ DistIface::master->sendCmd(header);
+}
+
+void
+DistIface::SyncSwitch::progress(Tick send_tick,
+ Tick sync_repeat,
+ ReqType need_ckpt,
+ ReqType need_exit)
+{
+ std::unique_lock<std::mutex> sync_lock(lock);
+ assert(waitNum > 0);
+
+ if (send_tick > nextAt)
+ nextAt = send_tick;
+ if (nextRepeat > sync_repeat)
+ nextRepeat = sync_repeat;
+
+ if (need_ckpt == ReqType::collective)
+ numCkptReq++;
+ else if (need_ckpt == ReqType::immediate)
+ doCkpt = true;
+ if (need_exit == ReqType::collective)
+ numExitReq++;
+ else if (need_exit == ReqType::immediate)
+ doExit = true;
+
+ waitNum--;
+ // Notify the simulation thread if the on-going sync is complete
+ if (waitNum == 0) {
+ sync_lock.unlock();
+ cv.notify_one();
+ }
+}
+
+void
+DistIface::SyncNode::progress(Tick max_send_tick,
+ Tick next_repeat,
+ ReqType do_ckpt,
+ ReqType do_exit)
+{
+ std::unique_lock<std::mutex> sync_lock(lock);
+ assert(waitNum > 0);
+
+ nextAt = max_send_tick;
+ nextRepeat = next_repeat;
+ doCkpt = (do_ckpt != ReqType::none);
+ doExit = (do_exit != ReqType::none);
+
+ waitNum--;
+ // Notify the simulation thread if the on-going sync is complete
+ if (waitNum == 0) {
+ sync_lock.unlock();
+ cv.notify_one();
+ }
+}
+
+void
+DistIface::SyncNode::requestCkpt(ReqType req)
+{
+ std::lock_guard<std::mutex> sync_lock(lock);
+ assert(req != ReqType::none);
+ if (needCkpt != ReqType::none)
+ warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
+ if (needCkpt == ReqType::none || req == ReqType::immediate)
+ needCkpt = req;
+}
+
+void
+DistIface::SyncNode::requestExit(ReqType req)
+{
+ std::lock_guard<std::mutex> sync_lock(lock);
+ assert(req != ReqType::none);
+ if (needExit != ReqType::none)
+ warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
+ if (needExit == ReqType::none || req == ReqType::immediate)
+ needExit = req;
+}
+
+void
+DistIface::Sync::drainComplete()
+{
+ if (doCkpt) {
+ // The first DistIface object called this right before writing the
+ // checkpoint. We need to drain the underlying physical network here.
+ // Note that other gem5 peers may enter this barrier at different
+ // ticks due to draining.
+ run(false);
+ // Only the "first" DistIface object has to perform the sync
+ doCkpt = false;
+ }
+}
+
+void
+DistIface::SyncNode::serialize(CheckpointOut &cp) const
+{
+ int need_exit = static_cast<int>(needExit);
+ SERIALIZE_SCALAR(need_exit);
+}
+
+void
+DistIface::SyncNode::unserialize(CheckpointIn &cp)
+{
+ int need_exit;
+ UNSERIALIZE_SCALAR(need_exit);
+ needExit = static_cast<ReqType>(need_exit);
+}
+
+void
+DistIface::SyncSwitch::serialize(CheckpointOut &cp) const
+{
+ SERIALIZE_SCALAR(numExitReq);
+}
+
+void
+DistIface::SyncSwitch::unserialize(CheckpointIn &cp)
+{
+ UNSERIALIZE_SCALAR(numExitReq);
+}
+
+void
+DistIface::SyncEvent::start()
+{
+ // Note that this may be called either from startup() or drainResume()
+
+ // At this point, all DistIface objects has already called Sync::init() so
+ // we have a local minimum of the start tick and repeat for the periodic
+ // sync.
+ Tick firstAt = DistIface::sync->firstAt;
+ repeat = DistIface::sync->nextRepeat;
+ // Do a global barrier to agree on a common repeat value (the smallest
+ // one from all participating nodes
+ DistIface::sync->run(curTick() == 0);
+
+ assert(!DistIface::sync->doCkpt);
+ assert(!DistIface::sync->doExit);
+ assert(DistIface::sync->nextAt >= curTick());
+ assert(DistIface::sync->nextRepeat <= repeat);
+
+ // if this is called at tick 0 then we use the config start param otherwise
+ // the maximum of the current tick of all participating nodes
+ if (curTick() == 0) {
+ assert(!scheduled());
+ assert(DistIface::sync->nextAt == 0);
+ schedule(firstAt);
+ } else {
+ if (scheduled())
+ reschedule(DistIface::sync->nextAt);
+ else
+ schedule(DistIface::sync->nextAt);
+ }
+ inform("Dist sync scheduled at %lu and repeats %lu\n", when(),
+ DistIface::sync->nextRepeat);
+}
+
+void
+DistIface::SyncEvent::process()
+{
+ // We may not start a global periodic sync while draining before taking a
+ // checkpoint. This is due to the possibility that peer gem5 processes
+ // may not hit the same periodic sync before they complete draining and
+ // that would make this periodic sync clash with sync called from
+ // DistIface::serialize() by other gem5 processes.
+ // We would need a 'distributed drain' solution to eliminate this
+ // restriction.
+ // Note that if draining was not triggered by checkpointing then we are
+ // fine since no extra global sync will happen (i.e. all peer gem5 will
+ // hit this periodic sync eventually).
+ panic_if(_draining && DistIface::sync->doCkpt,
+ "Distributed sync is hit while draining");
+ /*
+ * Note that this is a global event so this process method will be called
+ * by only exactly one thread.
+ */
+ /*
+ * We hold the eventq lock at this point but the receiver thread may
+ * need the lock to schedule new recv events while waiting for the
+ * dist sync to complete.
+ * Note that the other simulation threads also release their eventq
+ * locks while waiting for us due to the global event semantics.
+ */
+ {
+ EventQueue::ScopedRelease sr(curEventQueue());
+ // we do a global sync here that is supposed to happen at the same
+ // tick in all gem5 peers
+ DistIface::sync->run(true);
+ // global sync completed
+ }
+ if (DistIface::sync->doCkpt)
+ exitSimLoop("checkpoint");
+ if (DistIface::sync->doExit)
+ exitSimLoop("exit request from gem5 peers");
+
+ // schedule the next periodic sync
+ repeat = DistIface::sync->nextRepeat;
+ schedule(curTick() + repeat);
+}
+
+void
+DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay)
+{
+ // This is called from the receiver thread when it starts running. The new
+ // receiver thread shares the event queue with the simulation thread
+ // (associated with the simulated Ethernet link).
+ curEventQueue(eventManager->eventQueue());
+
+ recvDone = recv_done;
+ linkDelay = link_delay;
+}
+
+Tick
+DistIface::RecvScheduler::calcReceiveTick(Tick send_tick,
+ Tick send_delay,
+ Tick prev_recv_tick)
+{
+ Tick recv_tick = send_tick + send_delay + linkDelay;
+ // sanity check (we need atleast a send delay long window)
+ assert(recv_tick >= prev_recv_tick + send_delay);
+ panic_if(prev_recv_tick + send_delay > recv_tick,
+ "Receive window is smaller than send delay");
+ panic_if(recv_tick <= curTick(),
+ "Simulators out of sync - missed packet receive by %llu ticks"
+ "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
+ "linkDelay: %lu )",
+ curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
+ linkDelay);
+
+ return recv_tick;
+}
+
+void
+DistIface::RecvScheduler::resumeRecvTicks()
+{
+ // Schedule pending packets asap in case link speed/delay changed when
+ // restoring from the checkpoint.
+ // This may be done during unserialize except that curTick() is unknown
+ // so we call this during drainResume().
+ // If we are not restoring from a checkppint then link latency could not
+ // change so we just return.
+ if (!ckptRestore)
+ return;
+
+ std::vector<Desc> v;
+ while (!descQueue.empty()) {
+ Desc d = descQueue.front();
+ descQueue.pop();
+ d.sendTick = curTick();
+ d.sendDelay = d.packet->size(); // assume 1 tick/byte max link speed
+ v.push_back(d);
+ }
+
+ for (auto &d : v)
+ descQueue.push(d);
+
+ if (recvDone->scheduled()) {
+ assert(!descQueue.empty());
+ eventManager->reschedule(recvDone, curTick());
+ } else {
+ assert(descQueue.empty() && v.empty());
+ }
+ ckptRestore = false;
+}
+
+void
+DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet,
+ Tick send_tick,
+ Tick send_delay)
+{
+ // Note : this is called from the receiver thread
+ curEventQueue()->lock();
+ Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
+
+ DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
+ "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
+ send_tick, send_delay, linkDelay, recv_tick);
+ // Every packet must be sent and arrive in the same quantum
+ assert(send_tick > master->syncEvent->when() -
+ master->syncEvent->repeat);
+ // No packet may be scheduled for receive in the arrival quantum
+ assert(send_tick + send_delay + linkDelay > master->syncEvent->when());
+
+ // Now we are about to schedule a recvDone event for the new data packet.
+ // We use the same recvDone object for all incoming data packets. Packet
+ // descriptors are saved in the ordered queue. The currently scheduled
+ // packet is always on the top of the queue.
+ // NOTE: we use the event queue lock to protect the receive desc queue,
+ // too, which is accessed both by the receiver thread and the simulation
+ // thread.
+ descQueue.emplace(new_packet, send_tick, send_delay);
+ if (descQueue.size() == 1) {
+ assert(!recvDone->scheduled());
+ eventManager->schedule(recvDone, recv_tick);
+ } else {
+ assert(recvDone->scheduled());
+ panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
+ "Out of order packet received (recv_tick: %lu top(): %lu\n",
+ recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
+ }
+ curEventQueue()->unlock();
+}
+
+EthPacketPtr
+DistIface::RecvScheduler::popPacket()
+{
+ // Note : this is called from the simulation thread when a receive done
+ // event is being processed for the link. We assume that the thread holds
+ // the event queue queue lock when this is called!
+ EthPacketPtr next_packet = descQueue.front().packet;
+ descQueue.pop();
+
+ if (descQueue.size() > 0) {
+ Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
+ descQueue.front().sendDelay,
+ curTick());
+ eventManager->schedule(recvDone, recv_tick);
+ }
+ prevRecvTick = curTick();
+ return next_packet;
+}
+
+void
+DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const
+{
+ SERIALIZE_SCALAR(sendTick);
+ SERIALIZE_SCALAR(sendDelay);
+ packet->serialize("rxPacket", cp);
+}
+
+void
+DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp)
+{
+ UNSERIALIZE_SCALAR(sendTick);
+ UNSERIALIZE_SCALAR(sendDelay);
+ packet = std::make_shared<EthPacketData>(16384);
+ packet->unserialize("rxPacket", cp);
+}
+
+void
+DistIface::RecvScheduler::serialize(CheckpointOut &cp) const
+{
+ SERIALIZE_SCALAR(prevRecvTick);
+ // serialize the receive desc queue
+ std::queue<Desc> tmp_queue(descQueue);
+ unsigned n_desc_queue = descQueue.size();
+ assert(tmp_queue.size() == descQueue.size());
+ SERIALIZE_SCALAR(n_desc_queue);
+ for (int i = 0; i < n_desc_queue; i++) {
+ tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
+ tmp_queue.pop();
+ }
+ assert(tmp_queue.empty());
+}
+
+void
+DistIface::RecvScheduler::unserialize(CheckpointIn &cp)
+{
+ assert(descQueue.size() == 0);
+ assert(recvDone->scheduled() == false);
+ assert(ckptRestore == false);
+
+ UNSERIALIZE_SCALAR(prevRecvTick);
+ // unserialize the receive desc queue
+ unsigned n_desc_queue;
+ UNSERIALIZE_SCALAR(n_desc_queue);
+ for (int i = 0; i < n_desc_queue; i++) {
+ Desc recv_desc;
+ recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
+ descQueue.push(recv_desc);
+ }
+ ckptRestore = true;
+}
+
+DistIface::DistIface(unsigned dist_rank,
+ unsigned dist_size,
+ Tick sync_start,
+ Tick sync_repeat,
+ EventManager *em,
+ bool is_switch, int num_nodes) :
+ syncStart(sync_start), syncRepeat(sync_repeat),
+ recvThread(nullptr), recvScheduler(em),
+ rank(dist_rank), size(dist_size)
+{
+ DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
+ isMaster = false;
+ if (master == nullptr) {
+ assert(sync == nullptr);
+ assert(syncEvent == nullptr);
+ if (is_switch)
+ sync = new SyncSwitch(num_nodes);
+ else
+ sync = new SyncNode();
+ syncEvent = new SyncEvent();
+ master = this;
+ isMaster = true;
+ }
+ distIfaceId = distIfaceNum;
+ distIfaceNum++;
+}
+
+DistIface::~DistIface()
+{
+ assert(recvThread);
+ delete recvThread;
+ if (this == master) {
+ assert(syncEvent);
+ delete syncEvent;
+ assert(sync);
+ delete sync;
+ master = nullptr;
+ }
+}
+
+void
+DistIface::packetOut(EthPacketPtr pkt, Tick send_delay)
+{
+ Header header;
+
+ // Prepare a dist header packet for the Ethernet packet we want to
+ // send out.
+ header.msgType = MsgType::dataDescriptor;
+ header.sendTick = curTick();
+ header.sendDelay = send_delay;
+
+ header.dataPacketLength = pkt->size();
+
+ // Send out the packet and the meta info.
+ sendPacket(header, pkt);
+
+ DPRINTF(DistEthernetPkt,
+ "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
+ pkt->size(), send_delay);
+}
+
+void
+DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
+{
+ EthPacketPtr new_packet;
+ DistHeaderPkt::Header header;
+
+ // Initialize receive scheduler parameters
+ recvScheduler.init(recv_done, link_delay);
+
+ // Main loop to wait for and process any incoming message.
+ for (;;) {
+ // recvHeader() blocks until the next dist header packet comes in.
+ if (!recvHeader(header)) {
+ // We lost connection to the peer gem5 processes most likely
+ // because one of them called m5 exit. So we stop here.
+ // Grab the eventq lock to stop the simulation thread
+ curEventQueue()->lock();
+ exit_message("info",
+ 0,
+ "Message server closed connection, "
+ "simulation is exiting");
+ }
+
+ // We got a valid dist header packet, let's process it
+ if (header.msgType == MsgType::dataDescriptor) {
+ recvPacket(header, new_packet);
+ recvScheduler.pushPacket(new_packet,
+ header.sendTick,
+ header.sendDelay);
+ } else {
+ // everything else must be synchronisation related command
+ sync->progress(header.sendTick,
+ header.syncRepeat,
+ header.needCkpt,
+ header.needExit);
+ }
+ }
+}
+
+void
+DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
+{
+ assert(recvThread == nullptr);
+
+ recvThread = new std::thread(&DistIface::recvThreadFunc,
+ this,
+ const_cast<Event *>(recv_done),
+ link_delay);
+ recvThreadsNum++;
+}
+
+DrainState
+DistIface::drain()
+{
+ DPRINTF(DistEthernet,"DistIFace::drain() called\n");
+ // This can be called multiple times in the same drain cycle.
+ if (this == master)
+ syncEvent->draining(true);
+ return DrainState::Drained;
+}
+
+void
+DistIface::drainResume() {
+ DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
+ if (this == master)
+ syncEvent->draining(false);
+ recvScheduler.resumeRecvTicks();
+}
+
+void
+DistIface::serialize(CheckpointOut &cp) const
+{
+ // Drain the dist interface before the checkpoint is taken. We cannot call
+ // this as part of the normal drain cycle because this dist sync has to be
+ // called exactly once after the system is fully drained.
+ sync->drainComplete();
+
+ unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
+
+ SERIALIZE_SCALAR(rank_orig);
+ SERIALIZE_SCALAR(dist_iface_id_orig);
+
+ recvScheduler.serializeSection(cp, "recvScheduler");
+ if (this == master) {
+ sync->serializeSection(cp, "Sync");
+ }
+}
+
+void
+DistIface::unserialize(CheckpointIn &cp)
+{
+ unsigned rank_orig, dist_iface_id_orig;
+ UNSERIALIZE_SCALAR(rank_orig);
+ UNSERIALIZE_SCALAR(dist_iface_id_orig);
+
+ panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
+ rank, rank_orig);
+ panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
+ "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
+ dist_iface_id_orig);
+
+ recvScheduler.unserializeSection(cp, "recvScheduler");
+ if (this == master) {
+ sync->unserializeSection(cp, "Sync");
+ }
+}
+
+void
+DistIface::init(const Event *done_event, Tick link_delay)
+{
+ // Init hook for the underlaying message transport to setup/finalize
+ // communication channels
+ initTransport();
+
+ // Spawn a new receiver thread that will process messages
+ // coming in from peer gem5 processes.
+ // The receive thread will also schedule a (receive) doneEvent
+ // for each incoming data packet.
+ spawnRecvThread(done_event, link_delay);
+
+
+ // Adjust the periodic sync start and interval. Different DistIface
+ // might have different requirements. The singleton sync object
+ // will select the minimum values for both params.
+ assert(sync != nullptr);
+ sync->init(syncStart, syncRepeat);
+
+ // Initialize the seed for random generator to avoid the same sequence
+ // in all gem5 peer processes
+ assert(master != nullptr);
+ if (this == master)
+ random_mt.init(5489 * (rank+1) + 257);
+}
+
+void
+DistIface::startup()
+{
+ DPRINTF(DistEthernet, "DistIface::startup() started\n");
+ if (this == master)
+ syncEvent->start();
+ DPRINTF(DistEthernet, "DistIface::startup() done\n");
+}
+
+bool
+DistIface::readyToCkpt(Tick delay, Tick period)
+{
+ bool ret = true;
+ DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
+ "period:%lu\n", delay, period);
+ if (master) {
+ if (delay == 0) {
+ inform("m5 checkpoint called with zero delay => triggering collaborative "
+ "checkpoint\n");
+ sync->requestCkpt(ReqType::collective);
+ } else {
+ inform("m5 checkpoint called with non-zero delay => triggering immediate "
+ "checkpoint (at the next sync)\n");
+ sync->requestCkpt(ReqType::immediate);
+ }
+ if (period != 0)
+ inform("Non-zero period for m5_ckpt is ignored in "
+ "distributed gem5 runs\n");
+ ret = false;
+ }
+ return ret;
+}
+
+bool
+DistIface::readyToExit(Tick delay)
+{
+ bool ret = true;
+ DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
+ delay);
+ if (master) {
+ if (delay == 0) {
+ inform("m5 exit called with zero delay => triggering collaborative "
+ "exit\n");
+ sync->requestExit(ReqType::collective);
+ } else {
+ inform("m5 exit called with non-zero delay => triggering immediate "
+ "exit (at the next sync)\n");
+ sync->requestExit(ReqType::immediate);
+ }
+ ret = false;
+ }
+ return ret;
+}
+
+uint64_t
+DistIface::rankParam()
+{
+ uint64_t val;
+ if (master) {
+ val = master->rank;
+ } else {
+ warn("Dist-rank parameter is queried in single gem5 simulation.");
+ val = 0;
+ }
+ return val;
+}
+
+uint64_t
+DistIface::sizeParam()
+{
+ uint64_t val;
+ if (master) {
+ val = master->size;
+ } else {
+ warn("Dist-size parameter is queried in single gem5 simulation.");
+ val = 1;
+ }
+ return val;
+}