summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/dev/net/dist_iface.cc76
-rw-r--r--src/dev/net/dist_iface.hh28
2 files changed, 78 insertions, 26 deletions
diff --git a/src/dev/net/dist_iface.cc b/src/dev/net/dist_iface.cc
index 79408c304..7eef5d841 100644
--- a/src/dev/net/dist_iface.cc
+++ b/src/dev/net/dist_iface.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015 ARM Limited
+ * Copyright (c) 2015-2016 ARM Limited
* All rights reserved
*
* The license below extends only to copyright in the software and shall
@@ -83,6 +83,16 @@ DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
}
}
+void
+DistIface::Sync::abort()
+{
+ std::unique_lock<std::mutex> sync_lock(lock);
+ waitNum = 0;
+ isAbort = true;
+ sync_lock.unlock();
+ cv.notify_one();
+}
+
DistIface::SyncSwitch::SyncSwitch(int num_nodes)
{
numNodes = num_nodes;
@@ -95,6 +105,7 @@ DistIface::SyncSwitch::SyncSwitch(int num_nodes)
doStopSync = false;
nextAt = std::numeric_limits<Tick>::max();
nextRepeat = std::numeric_limits<Tick>::max();
+ isAbort = false;
}
DistIface::SyncNode::SyncNode()
@@ -108,15 +119,17 @@ DistIface::SyncNode::SyncNode()
doStopSync = false;
nextAt = std::numeric_limits<Tick>::max();
nextRepeat = std::numeric_limits<Tick>::max();
+ isAbort = false;
}
-void
+bool
DistIface::SyncNode::run(bool same_tick)
{
std::unique_lock<std::mutex> sync_lock(lock);
Header header;
assert(waitNum == 0);
+ assert(!isAbort);
waitNum = DistIface::recvThreadsNum;
// initiate the global synchronisation
header.msgType = MsgType::cmdSyncReq;
@@ -135,12 +148,13 @@ DistIface::SyncNode::run(bool same_tick)
// now wait until all receiver threads complete the synchronisation
auto lf = [this]{ return waitNum == 0; };
cv.wait(sync_lock, lf);
- // global synchronisation is done
- assert(!same_tick || (nextAt == curTick()));
+ // global synchronisation is done.
+ assert(isAbort || !same_tick || (nextAt == curTick()));
+ return !isAbort;
}
-void
+bool
DistIface::SyncSwitch::run(bool same_tick)
{
std::unique_lock<std::mutex> sync_lock(lock);
@@ -151,6 +165,8 @@ DistIface::SyncSwitch::run(bool same_tick)
cv.wait(sync_lock, lf);
}
assert(waitNum == 0);
+ if (isAbort) // sync aborted
+ return false;
assert(!same_tick || (nextAt == curTick()));
waitNum = numNodes;
// Complete the global synchronisation
@@ -178,9 +194,10 @@ DistIface::SyncSwitch::run(bool same_tick)
header.needStopSync = ReqType::none;
}
DistIface::master->sendCmd(header);
+ return true;
}
-void
+bool
DistIface::SyncSwitch::progress(Tick send_tick,
Tick sync_repeat,
ReqType need_ckpt,
@@ -188,6 +205,8 @@ DistIface::SyncSwitch::progress(Tick send_tick,
ReqType need_stop_sync)
{
std::unique_lock<std::mutex> sync_lock(lock);
+ if (isAbort) // sync aborted
+ return false;
assert(waitNum > 0);
if (send_tick > nextAt)
@@ -214,9 +233,12 @@ DistIface::SyncSwitch::progress(Tick send_tick,
sync_lock.unlock();
cv.notify_one();
}
+ // The receive thread must keep alive in the switch until the node
+ // closes the connection. Thus, we always return true here.
+ return true;
}
-void
+bool
DistIface::SyncNode::progress(Tick max_send_tick,
Tick next_repeat,
ReqType do_ckpt,
@@ -224,6 +246,8 @@ DistIface::SyncNode::progress(Tick max_send_tick,
ReqType do_stop_sync)
{
std::unique_lock<std::mutex> sync_lock(lock);
+ if (isAbort) // sync aborted
+ return false;
assert(waitNum > 0);
nextAt = max_send_tick;
@@ -238,6 +262,8 @@ DistIface::SyncNode::progress(Tick max_send_tick,
sync_lock.unlock();
cv.notify_one();
}
+ // The receive thread must finish when simulation is about to exit
+ return !doExit;
}
void
@@ -314,7 +340,8 @@ DistIface::SyncEvent::start()
repeat = DistIface::sync->nextRepeat;
// Do a global barrier to agree on a common repeat value (the smallest
// one from all participating nodes.
- DistIface::sync->run(false);
+ if (!DistIface::sync->run(false))
+ panic("DistIface::SyncEvent::start() aborted\n");
assert(!DistIface::sync->doCkpt);
assert(!DistIface::sync->doExit);
@@ -366,13 +393,16 @@ DistIface::SyncEvent::process()
EventQueue::ScopedRelease sr(curEventQueue());
// we do a global sync here that is supposed to happen at the same
// tick in all gem5 peers
- DistIface::sync->run(true);
+ if (!DistIface::sync->run(true))
+ return; // global sync aborted
// global sync completed
}
if (DistIface::sync->doCkpt)
exitSimLoop("checkpoint");
- if (DistIface::sync->doExit)
+ if (DistIface::sync->doExit) {
exitSimLoop("exit request from gem5 peers");
+ return;
+ }
if (DistIface::sync->doStopSync) {
DistIface::sync->doStopSync = false;
inform("synchronization disabled at %lu\n", curTick());
@@ -605,14 +635,16 @@ DistIface::DistIface(unsigned dist_rank,
DistIface::~DistIface()
{
assert(recvThread);
+ recvThread->join();
delete recvThread;
- if (this == master) {
+ if (distIfaceNum-- == 0) {
assert(syncEvent);
delete syncEvent;
assert(sync);
delete sync;
- master = nullptr;
}
+ if (this == master)
+ master = nullptr;
}
void
@@ -654,9 +686,13 @@ DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
// because one of them called m5 exit. So we stop here.
// Grab the eventq lock to stop the simulation thread
curEventQueue()->lock();
- exitSimLoop("Message server closed connection, simulator "
- "is exiting");
+ exitSimLoop("connection to gem5 peer got closed");
curEventQueue()->unlock();
+ // The simulation thread may be blocked in processing an on-going
+ // global synchronisation. Abort the sync to give the simulation
+ // thread a chance to make progress and process the exit event.
+ sync->abort();
+ // Finish receiver thread
break;
}
@@ -668,11 +704,13 @@ DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
header.sendDelay);
} else {
// everything else must be synchronisation related command
- sync->progress(header.sendTick,
- header.syncRepeat,
- header.needCkpt,
- header.needExit,
- header.needStopSync);
+ if (!sync->progress(header.sendTick,
+ header.syncRepeat,
+ header.needCkpt,
+ header.needExit,
+ header.needStopSync))
+ // Finish receiver thread if simulation is about to exit
+ break;
}
}
}
diff --git a/src/dev/net/dist_iface.hh b/src/dev/net/dist_iface.hh
index 20ac0989b..a56b9a3bb 100644
--- a/src/dev/net/dist_iface.hh
+++ b/src/dev/net/dist_iface.hh
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015 ARM Limited
+ * Copyright (c) 2015-2016 ARM Limited
* All rights reserved
*
* The license below extends only to copyright in the software and shall
@@ -152,6 +152,10 @@ class DistIface : public Drainable, public Serializable
* Tick for the next periodic sync (if the event is not scheduled yet)
*/
Tick nextAt;
+ /**
+ * Flag is set if the sync is aborted (e.g. due to connection lost)
+ */
+ bool isAbort;
friend class SyncEvent;
@@ -166,16 +170,26 @@ class DistIface : public Drainable, public Serializable
void init(Tick start, Tick repeat);
/**
* Core method to perform a full dist sync.
+ *
+ * @return true if the sync completes, false if it gets aborted
*/
- virtual void run(bool same_tick) = 0;
+ virtual bool run(bool same_tick) = 0;
/**
* Callback when the receiver thread gets a sync ack message.
+ *
+ * @return false if the receiver thread needs to stop (e.g.
+ * simulation is to exit)
*/
- virtual void progress(Tick send_tick,
+ virtual bool progress(Tick send_tick,
Tick next_repeat,
ReqType do_ckpt,
ReqType do_exit,
ReqType do_stop_sync) = 0;
+ /**
+ * Abort processing an on-going sync event (in case of an error, e.g.
+ * lost connection to a peer gem5)
+ */
+ void abort();
virtual void requestCkpt(ReqType req) = 0;
virtual void requestExit(ReqType req) = 0;
@@ -207,8 +221,8 @@ class DistIface : public Drainable, public Serializable
SyncNode();
~SyncNode() {}
- void run(bool same_tick) override;
- void progress(Tick max_req_tick,
+ bool run(bool same_tick) override;
+ bool progress(Tick max_req_tick,
Tick next_repeat,
ReqType do_ckpt,
ReqType do_exit,
@@ -246,8 +260,8 @@ class DistIface : public Drainable, public Serializable
SyncSwitch(int num_nodes);
~SyncSwitch() {}
- void run(bool same_tick) override;
- void progress(Tick max_req_tick,
+ bool run(bool same_tick) override;
+ bool progress(Tick max_req_tick,
Tick next_repeat,
ReqType do_ckpt,
ReqType do_exit,