diff options
Diffstat (limited to 'src/dev/net')
-rw-r--r-- | src/dev/net/Ethernet.py | 1 | ||||
-rw-r--r-- | src/dev/net/dist_etherlink.cc | 3 | ||||
-rw-r--r-- | src/dev/net/dist_iface.cc | 145 | ||||
-rw-r--r-- | src/dev/net/dist_iface.hh | 49 | ||||
-rw-r--r-- | src/dev/net/dist_packet.hh | 1 | ||||
-rw-r--r-- | src/dev/net/tcp_iface.cc | 5 | ||||
-rw-r--r-- | src/dev/net/tcp_iface.hh | 2 |
7 files changed, 170 insertions, 36 deletions
diff --git a/src/dev/net/Ethernet.py b/src/dev/net/Ethernet.py index 981a19223..d79aa138a 100644 --- a/src/dev/net/Ethernet.py +++ b/src/dev/net/Ethernet.py @@ -73,6 +73,7 @@ class DistEtherLink(EtherObject): server_name = Param.String('localhost', "Message server name") server_port = Param.UInt32('2200', "Message server port") is_switch = Param.Bool(False, "true if this a link in etherswitch") + dist_sync_on_pseudo_op = Param.Bool(False, "Start sync with pseudo_op") num_nodes = Param.UInt32('2', "Number of simulate nodes") class EtherBus(EtherObject): diff --git a/src/dev/net/dist_etherlink.cc b/src/dev/net/dist_etherlink.cc index a1cdc01b7..01f21d136 100644 --- a/src/dev/net/dist_etherlink.cc +++ b/src/dev/net/dist_etherlink.cc @@ -94,7 +94,8 @@ DistEtherLink::DistEtherLink(const Params *p) // create the dist (TCP) interface to talk to the peer gem5 processes. distIface = new TCPIface(p->server_name, p->server_port, p->dist_rank, p->dist_size, - p->sync_start, sync_repeat, this, p->is_switch, + p->sync_start, sync_repeat, this, + p->dist_sync_on_pseudo_op, p->is_switch, p->num_nodes); localIface = new LocalIface(name() + ".int0", txLink, rxLink, distIface); diff --git a/src/dev/net/dist_iface.cc b/src/dev/net/dist_iface.cc index 26fe45317..79408c304 100644 --- a/src/dev/net/dist_iface.cc +++ b/src/dev/net/dist_iface.cc @@ -48,24 +48,28 @@ #include "base/random.hh" #include "base/trace.hh" +#include "cpu/thread_context.hh" #include "debug/DistEthernet.hh" #include "debug/DistEthernetPkt.hh" #include "dev/net/etherpkt.hh" #include "sim/sim_exit.hh" #include "sim/sim_object.hh" +#include "sim/system.hh" using namespace std; DistIface::Sync *DistIface::sync = nullptr; +System *DistIface::sys = nullptr; DistIface::SyncEvent *DistIface::syncEvent = nullptr; unsigned DistIface::distIfaceNum = 0; unsigned DistIface::recvThreadsNum = 0; DistIface *DistIface::master = nullptr; +bool DistIface::isSwitch = false; void DistIface::Sync::init(Tick start_tick, Tick repeat_tick) { - if (start_tick < firstAt) { - firstAt = start_tick; + if (start_tick < nextAt) { + nextAt = start_tick; inform("Next dist synchronisation tick is changed to %lu.\n", nextAt); } @@ -85,10 +89,11 @@ DistIface::SyncSwitch::SyncSwitch(int num_nodes) waitNum = num_nodes; numExitReq = 0; numCkptReq = 0; + numStopSyncReq = 0; doExit = false; doCkpt = false; - firstAt = std::numeric_limits<Tick>::max(); - nextAt = 0; + doStopSync = false; + nextAt = std::numeric_limits<Tick>::max(); nextRepeat = std::numeric_limits<Tick>::max(); } @@ -97,10 +102,11 @@ DistIface::SyncNode::SyncNode() waitNum = 0; needExit = ReqType::none; needCkpt = ReqType::none; + needStopSync = ReqType::none; doExit = false; doCkpt = false; - firstAt = std::numeric_limits<Tick>::max(); - nextAt = 0; + doStopSync = false; + nextAt = std::numeric_limits<Tick>::max(); nextRepeat = std::numeric_limits<Tick>::max(); } @@ -117,11 +123,14 @@ DistIface::SyncNode::run(bool same_tick) header.sendTick = curTick(); header.syncRepeat = nextRepeat; header.needCkpt = needCkpt; + header.needStopSync = needStopSync; if (needCkpt != ReqType::none) needCkpt = ReqType::pending; header.needExit = needExit; if (needExit != ReqType::none) needExit = ReqType::pending; + if (needStopSync != ReqType::none) + needStopSync = ReqType::pending; DistIface::master->sendCmd(header); // now wait until all receiver threads complete the synchronisation auto lf = [this]{ return waitNum == 0; }; @@ -161,6 +170,13 @@ DistIface::SyncSwitch::run(bool same_tick) } else { header.needExit = ReqType::none; } + if (doStopSync || numStopSyncReq == numNodes) { + doStopSync = true; + numStopSyncReq = 0; + header.needStopSync = ReqType::immediate; + } else { + header.needStopSync = ReqType::none; + } DistIface::master->sendCmd(header); } @@ -168,7 +184,8 @@ void DistIface::SyncSwitch::progress(Tick send_tick, Tick sync_repeat, ReqType need_ckpt, - ReqType need_exit) + ReqType need_exit, + ReqType need_stop_sync) { std::unique_lock<std::mutex> sync_lock(lock); assert(waitNum > 0); @@ -186,6 +203,10 @@ DistIface::SyncSwitch::progress(Tick send_tick, numExitReq++; else if (need_exit == ReqType::immediate) doExit = true; + if (need_stop_sync == ReqType::collective) + numStopSyncReq++; + else if (need_stop_sync == ReqType::immediate) + doStopSync = true; waitNum--; // Notify the simulation thread if the on-going sync is complete @@ -199,7 +220,8 @@ void DistIface::SyncNode::progress(Tick max_send_tick, Tick next_repeat, ReqType do_ckpt, - ReqType do_exit) + ReqType do_exit, + ReqType do_stop_sync) { std::unique_lock<std::mutex> sync_lock(lock); assert(waitNum > 0); @@ -208,6 +230,7 @@ DistIface::SyncNode::progress(Tick max_send_tick, nextRepeat = next_repeat; doCkpt = (do_ckpt != ReqType::none); doExit = (do_exit != ReqType::none); + doStopSync = (do_stop_sync != ReqType::none); waitNum--; // Notify the simulation thread if the on-going sync is complete @@ -288,29 +311,27 @@ DistIface::SyncEvent::start() // At this point, all DistIface objects has already called Sync::init() so // we have a local minimum of the start tick and repeat for the periodic // sync. - Tick firstAt = DistIface::sync->firstAt; repeat = DistIface::sync->nextRepeat; // Do a global barrier to agree on a common repeat value (the smallest - // one from all participating nodes - DistIface::sync->run(curTick() == 0); + // one from all participating nodes. + DistIface::sync->run(false); assert(!DistIface::sync->doCkpt); assert(!DistIface::sync->doExit); + assert(!DistIface::sync->doStopSync); assert(DistIface::sync->nextAt >= curTick()); assert(DistIface::sync->nextRepeat <= repeat); - // if this is called at tick 0 then we use the config start param otherwise - // the maximum of the current tick of all participating nodes - if (curTick() == 0) { + if (curTick() == 0) assert(!scheduled()); - assert(DistIface::sync->nextAt == 0); - schedule(firstAt); - } else { - if (scheduled()) - reschedule(DistIface::sync->nextAt); - else - schedule(DistIface::sync->nextAt); - } + + // Use the maximum of the current tick for all participating nodes or a + // user provided starting tick. + if (scheduled()) + reschedule(DistIface::sync->nextAt); + else + schedule(DistIface::sync->nextAt); + inform("Dist sync scheduled at %lu and repeats %lu\n", when(), DistIface::sync->nextRepeat); } @@ -352,7 +373,27 @@ DistIface::SyncEvent::process() exitSimLoop("checkpoint"); if (DistIface::sync->doExit) exitSimLoop("exit request from gem5 peers"); + if (DistIface::sync->doStopSync) { + DistIface::sync->doStopSync = false; + inform("synchronization disabled at %lu\n", curTick()); + // The switch node needs to wait for the next sync immediately. + if (DistIface::isSwitch) { + start(); + } else { + // Wake up thread contexts on non-switch nodes. + for (int i = 0; i < DistIface::master->sys->numContexts(); i++) { + ThreadContext *tc = + DistIface::master->sys->getThreadContext(i); + if (tc->status() == ThreadContext::Suspended) + tc->activate(); + else + warn_once("Tried to wake up thread in dist-gem5, but it " + "was already awake!\n"); + } + } + return; + } // schedule the next periodic sync repeat = DistIface::sync->nextRepeat; schedule(curTick() + repeat); @@ -537,9 +578,10 @@ DistIface::DistIface(unsigned dist_rank, Tick sync_start, Tick sync_repeat, EventManager *em, + bool use_pseudo_op, bool is_switch, int num_nodes) : syncStart(sync_start), syncRepeat(sync_repeat), - recvThread(nullptr), recvScheduler(em), + recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op), rank(dist_rank), size(dist_size) { DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank); @@ -547,6 +589,7 @@ DistIface::DistIface(unsigned dist_rank, if (master == nullptr) { assert(sync == nullptr); assert(syncEvent == nullptr); + isSwitch = is_switch; if (is_switch) sync = new SyncSwitch(num_nodes); else @@ -628,7 +671,8 @@ DistIface::recvThreadFunc(Event *recv_done, Tick link_delay) sync->progress(header.sendTick, header.syncRepeat, header.needCkpt, - header.needExit); + header.needExit, + header.needStopSync); } } } @@ -732,7 +776,8 @@ void DistIface::startup() { DPRINTF(DistEthernet, "DistIface::startup() started\n"); - if (this == master) + // Schedule synchronization unless we are not a switch in pseudo_op mode. + if (this == master && (!syncStartOnPseudoOp || isSwitch)) syncEvent->start(); DPRINTF(DistEthernet, "DistIface::startup() done\n"); } @@ -761,6 +806,52 @@ DistIface::readyToCkpt(Tick delay, Tick period) return ret; } +void +DistIface::SyncNode::requestStopSync(ReqType req) +{ + std::lock_guard<std::mutex> sync_lock(lock); + needStopSync = req; +} + +void +DistIface::toggleSync(ThreadContext *tc) +{ + // Unforunate that we have to populate the system pointer member this way. + master->sys = tc->getSystemPtr(); + + // The invariant for both syncing and "unsyncing" is that all threads will + // stop executing intructions until the desired sync state has been reached + // for all nodes. This is the easiest way to prevent deadlock (in the case + // of "unsyncing") and causality errors (in the case of syncing). + if (master->syncEvent->scheduled()) { + inform("Request toggling syncronization off\n"); + master->sync->requestStopSync(ReqType::collective); + + // At this point, we have no clue when everyone will reach the sync + // stop point. Suspend execution of all local thread contexts. + // Dist-gem5 will reactivate all thread contexts when everyone has + // reached the sync stop point. + for (int i = 0; i < master->sys->numContexts(); i++) { + ThreadContext *tc = master->sys->getThreadContext(i); + if (tc->status() == ThreadContext::Active) + tc->quiesce(); + } + } else { + inform("Request toggling syncronization on\n"); + master->syncEvent->start(); + + // We need to suspend all CPUs until the sync point is reached by all + // nodes to prevent causality errors. We can also schedule CPU + // activation here, since we know exactly when the next sync will + // occur. + for (int i = 0; i < master->sys->numContexts(); i++) { + ThreadContext *tc = master->sys->getThreadContext(i); + if (tc->status() == ThreadContext::Active) + tc->quiesceTick(master->syncEvent->when() + 1); + } + } +} + bool DistIface::readyToExit(Tick delay) { @@ -768,6 +859,10 @@ DistIface::readyToExit(Tick delay) DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n", delay); if (master) { + // To successfully coordinate an exit, all nodes must be synchronising + if (!master->syncEvent->scheduled()) + master->syncEvent->start(); + if (delay == 0) { inform("m5 exit called with zero delay => triggering collaborative " "exit\n"); diff --git a/src/dev/net/dist_iface.hh b/src/dev/net/dist_iface.hh index e69211fb8..20ac0989b 100644 --- a/src/dev/net/dist_iface.hh +++ b/src/dev/net/dist_iface.hh @@ -91,6 +91,8 @@ #include "sim/serialize.hh" class EventManager; +class System; +class ThreadContext; /** * The interface class to talk to peer gem5 processes. @@ -139,13 +141,13 @@ class DistIface : public Drainable, public Serializable */ bool doCkpt; /** - * The repeat value for the next periodic sync + * Flag is set if sync is to stop upon sync completion */ - Tick nextRepeat; + bool doStopSync; /** - * Tick for the very first periodic sync + * The repeat value for the next periodic sync */ - Tick firstAt; + Tick nextRepeat; /** * Tick for the next periodic sync (if the event is not scheduled yet) */ @@ -172,10 +174,12 @@ class DistIface : public Drainable, public Serializable virtual void progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, - ReqType do_exit) = 0; + ReqType do_exit, + ReqType do_stop_sync) = 0; virtual void requestCkpt(ReqType req) = 0; virtual void requestExit(ReqType req) = 0; + virtual void requestStopSync(ReqType req) = 0; void drainComplete(); @@ -194,6 +198,10 @@ class DistIface : public Drainable, public Serializable * Ckpt requested */ ReqType needCkpt; + /** + * Sync stop requested + */ + ReqType needStopSync; public: @@ -203,10 +211,12 @@ class DistIface : public Drainable, public Serializable void progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, - ReqType do_exit) override; + ReqType do_exit, + ReqType do_stop_sync) override; void requestCkpt(ReqType req) override; void requestExit(ReqType req) override; + void requestStopSync(ReqType req) override; void serialize(CheckpointOut &cp) const override; void unserialize(CheckpointIn &cp) override; @@ -224,6 +234,10 @@ class DistIface : public Drainable, public Serializable */ unsigned numCkptReq; /** + * Counter for recording stop sync requests + */ + unsigned numStopSyncReq; + /** * Number of connected simulated nodes */ unsigned numNodes; @@ -236,7 +250,8 @@ class DistIface : public Drainable, public Serializable void progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, - ReqType do_exit) override; + ReqType do_exit, + ReqType do_stop_sync) override; void requestCkpt(ReqType) override { panic("Switch requested checkpoint"); @@ -244,6 +259,9 @@ class DistIface : public Drainable, public Serializable void requestExit(ReqType) override { panic("Switch requested exit"); } + void requestStopSync(ReqType) override { + panic("Switch requested stop sync"); + } void serialize(CheckpointOut &cp) const override; void unserialize(CheckpointIn &cp) override; @@ -437,6 +455,10 @@ class DistIface : public Drainable, public Serializable * Meta information about data packets received. */ RecvScheduler recvScheduler; + /** + * Use pseudoOp to start synchronization. + */ + bool syncStartOnPseudoOp; protected: /** @@ -476,6 +498,14 @@ class DistIface : public Drainable, public Serializable * a master to co-ordinate the global synchronisation. */ static DistIface *master; + /** + * System pointer used to wakeup sleeping threads when stopping sync. + */ + static System *sys; + /** + * Is this node a switch? + */ + static bool isSwitch; private: /** @@ -533,6 +563,7 @@ class DistIface : public Drainable, public Serializable Tick sync_start, Tick sync_repeat, EventManager *em, + bool use_pseudo_op, bool is_switch, int num_nodes); @@ -590,6 +621,10 @@ class DistIface : public Drainable, public Serializable * Getter for the dist size param. */ static uint64_t sizeParam(); + /** + * Trigger the master to start/stop synchronization. + */ + static void toggleSync(ThreadContext *tc); }; #endif diff --git a/src/dev/net/dist_packet.hh b/src/dev/net/dist_packet.hh index b154ab4a7..6e30101d4 100644 --- a/src/dev/net/dist_packet.hh +++ b/src/dev/net/dist_packet.hh @@ -103,6 +103,7 @@ class DistHeaderPkt unsigned dataPacketLength; struct { ReqType needCkpt; + ReqType needStopSync; ReqType needExit; }; }; diff --git a/src/dev/net/tcp_iface.cc b/src/dev/net/tcp_iface.cc index fba069674..f9c927acb 100644 --- a/src/dev/net/tcp_iface.cc +++ b/src/dev/net/tcp_iface.cc @@ -82,8 +82,9 @@ bool TCPIface::anyListening = false; TCPIface::TCPIface(string server_name, unsigned server_port, unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, - EventManager *em, bool is_switch, int num_nodes) : - DistIface(dist_rank, dist_size, sync_start, sync_repeat, em, + EventManager *em, bool use_pseudo_op, bool is_switch, + int num_nodes) : + DistIface(dist_rank, dist_size, sync_start, sync_repeat, em, use_pseudo_op, is_switch, num_nodes), serverName(server_name), serverPort(server_port), isSwitch(is_switch), listening(false) { diff --git a/src/dev/net/tcp_iface.hh b/src/dev/net/tcp_iface.hh index 97ae9cde0..8ba5c7e81 100644 --- a/src/dev/net/tcp_iface.hh +++ b/src/dev/net/tcp_iface.hh @@ -148,7 +148,7 @@ class TCPIface : public DistIface TCPIface(std::string server_name, unsigned server_port, unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, - bool is_switch, int num_nodes); + bool use_pseudo_op, bool is_switch, int num_nodes); ~TCPIface() override; }; |