diff options
-rw-r--r-- | src/dev/net/dist_iface.cc | 76 | ||||
-rw-r--r-- | src/dev/net/dist_iface.hh | 28 |
2 files changed, 78 insertions, 26 deletions
diff --git a/src/dev/net/dist_iface.cc b/src/dev/net/dist_iface.cc index 79408c304..7eef5d841 100644 --- a/src/dev/net/dist_iface.cc +++ b/src/dev/net/dist_iface.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 ARM Limited + * Copyright (c) 2015-2016 ARM Limited * All rights reserved * * The license below extends only to copyright in the software and shall @@ -83,6 +83,16 @@ DistIface::Sync::init(Tick start_tick, Tick repeat_tick) } } +void +DistIface::Sync::abort() +{ + std::unique_lock<std::mutex> sync_lock(lock); + waitNum = 0; + isAbort = true; + sync_lock.unlock(); + cv.notify_one(); +} + DistIface::SyncSwitch::SyncSwitch(int num_nodes) { numNodes = num_nodes; @@ -95,6 +105,7 @@ DistIface::SyncSwitch::SyncSwitch(int num_nodes) doStopSync = false; nextAt = std::numeric_limits<Tick>::max(); nextRepeat = std::numeric_limits<Tick>::max(); + isAbort = false; } DistIface::SyncNode::SyncNode() @@ -108,15 +119,17 @@ DistIface::SyncNode::SyncNode() doStopSync = false; nextAt = std::numeric_limits<Tick>::max(); nextRepeat = std::numeric_limits<Tick>::max(); + isAbort = false; } -void +bool DistIface::SyncNode::run(bool same_tick) { std::unique_lock<std::mutex> sync_lock(lock); Header header; assert(waitNum == 0); + assert(!isAbort); waitNum = DistIface::recvThreadsNum; // initiate the global synchronisation header.msgType = MsgType::cmdSyncReq; @@ -135,12 +148,13 @@ DistIface::SyncNode::run(bool same_tick) // now wait until all receiver threads complete the synchronisation auto lf = [this]{ return waitNum == 0; }; cv.wait(sync_lock, lf); - // global synchronisation is done - assert(!same_tick || (nextAt == curTick())); + // global synchronisation is done. + assert(isAbort || !same_tick || (nextAt == curTick())); + return !isAbort; } -void +bool DistIface::SyncSwitch::run(bool same_tick) { std::unique_lock<std::mutex> sync_lock(lock); @@ -151,6 +165,8 @@ DistIface::SyncSwitch::run(bool same_tick) cv.wait(sync_lock, lf); } assert(waitNum == 0); + if (isAbort) // sync aborted + return false; assert(!same_tick || (nextAt == curTick())); waitNum = numNodes; // Complete the global synchronisation @@ -178,9 +194,10 @@ DistIface::SyncSwitch::run(bool same_tick) header.needStopSync = ReqType::none; } DistIface::master->sendCmd(header); + return true; } -void +bool DistIface::SyncSwitch::progress(Tick send_tick, Tick sync_repeat, ReqType need_ckpt, @@ -188,6 +205,8 @@ DistIface::SyncSwitch::progress(Tick send_tick, ReqType need_stop_sync) { std::unique_lock<std::mutex> sync_lock(lock); + if (isAbort) // sync aborted + return false; assert(waitNum > 0); if (send_tick > nextAt) @@ -214,9 +233,12 @@ DistIface::SyncSwitch::progress(Tick send_tick, sync_lock.unlock(); cv.notify_one(); } + // The receive thread must keep alive in the switch until the node + // closes the connection. Thus, we always return true here. + return true; } -void +bool DistIface::SyncNode::progress(Tick max_send_tick, Tick next_repeat, ReqType do_ckpt, @@ -224,6 +246,8 @@ DistIface::SyncNode::progress(Tick max_send_tick, ReqType do_stop_sync) { std::unique_lock<std::mutex> sync_lock(lock); + if (isAbort) // sync aborted + return false; assert(waitNum > 0); nextAt = max_send_tick; @@ -238,6 +262,8 @@ DistIface::SyncNode::progress(Tick max_send_tick, sync_lock.unlock(); cv.notify_one(); } + // The receive thread must finish when simulation is about to exit + return !doExit; } void @@ -314,7 +340,8 @@ DistIface::SyncEvent::start() repeat = DistIface::sync->nextRepeat; // Do a global barrier to agree on a common repeat value (the smallest // one from all participating nodes. - DistIface::sync->run(false); + if (!DistIface::sync->run(false)) + panic("DistIface::SyncEvent::start() aborted\n"); assert(!DistIface::sync->doCkpt); assert(!DistIface::sync->doExit); @@ -366,13 +393,16 @@ DistIface::SyncEvent::process() EventQueue::ScopedRelease sr(curEventQueue()); // we do a global sync here that is supposed to happen at the same // tick in all gem5 peers - DistIface::sync->run(true); + if (!DistIface::sync->run(true)) + return; // global sync aborted // global sync completed } if (DistIface::sync->doCkpt) exitSimLoop("checkpoint"); - if (DistIface::sync->doExit) + if (DistIface::sync->doExit) { exitSimLoop("exit request from gem5 peers"); + return; + } if (DistIface::sync->doStopSync) { DistIface::sync->doStopSync = false; inform("synchronization disabled at %lu\n", curTick()); @@ -605,14 +635,16 @@ DistIface::DistIface(unsigned dist_rank, DistIface::~DistIface() { assert(recvThread); + recvThread->join(); delete recvThread; - if (this == master) { + if (distIfaceNum-- == 0) { assert(syncEvent); delete syncEvent; assert(sync); delete sync; - master = nullptr; } + if (this == master) + master = nullptr; } void @@ -654,9 +686,13 @@ DistIface::recvThreadFunc(Event *recv_done, Tick link_delay) // because one of them called m5 exit. So we stop here. // Grab the eventq lock to stop the simulation thread curEventQueue()->lock(); - exitSimLoop("Message server closed connection, simulator " - "is exiting"); + exitSimLoop("connection to gem5 peer got closed"); curEventQueue()->unlock(); + // The simulation thread may be blocked in processing an on-going + // global synchronisation. Abort the sync to give the simulation + // thread a chance to make progress and process the exit event. + sync->abort(); + // Finish receiver thread break; } @@ -668,11 +704,13 @@ DistIface::recvThreadFunc(Event *recv_done, Tick link_delay) header.sendDelay); } else { // everything else must be synchronisation related command - sync->progress(header.sendTick, - header.syncRepeat, - header.needCkpt, - header.needExit, - header.needStopSync); + if (!sync->progress(header.sendTick, + header.syncRepeat, + header.needCkpt, + header.needExit, + header.needStopSync)) + // Finish receiver thread if simulation is about to exit + break; } } } diff --git a/src/dev/net/dist_iface.hh b/src/dev/net/dist_iface.hh index 20ac0989b..a56b9a3bb 100644 --- a/src/dev/net/dist_iface.hh +++ b/src/dev/net/dist_iface.hh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 ARM Limited + * Copyright (c) 2015-2016 ARM Limited * All rights reserved * * The license below extends only to copyright in the software and shall @@ -152,6 +152,10 @@ class DistIface : public Drainable, public Serializable * Tick for the next periodic sync (if the event is not scheduled yet) */ Tick nextAt; + /** + * Flag is set if the sync is aborted (e.g. due to connection lost) + */ + bool isAbort; friend class SyncEvent; @@ -166,16 +170,26 @@ class DistIface : public Drainable, public Serializable void init(Tick start, Tick repeat); /** * Core method to perform a full dist sync. + * + * @return true if the sync completes, false if it gets aborted */ - virtual void run(bool same_tick) = 0; + virtual bool run(bool same_tick) = 0; /** * Callback when the receiver thread gets a sync ack message. + * + * @return false if the receiver thread needs to stop (e.g. + * simulation is to exit) */ - virtual void progress(Tick send_tick, + virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) = 0; + /** + * Abort processing an on-going sync event (in case of an error, e.g. + * lost connection to a peer gem5) + */ + void abort(); virtual void requestCkpt(ReqType req) = 0; virtual void requestExit(ReqType req) = 0; @@ -207,8 +221,8 @@ class DistIface : public Drainable, public Serializable SyncNode(); ~SyncNode() {} - void run(bool same_tick) override; - void progress(Tick max_req_tick, + bool run(bool same_tick) override; + bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, @@ -246,8 +260,8 @@ class DistIface : public Drainable, public Serializable SyncSwitch(int num_nodes); ~SyncSwitch() {} - void run(bool same_tick) override; - void progress(Tick max_req_tick, + bool run(bool same_tick) override; + bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, |