summaryrefslogtreecommitdiff
path: root/src/dev/net/dist_iface.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/dev/net/dist_iface.cc')
-rw-r--r--src/dev/net/dist_iface.cc76
1 files changed, 57 insertions, 19 deletions
diff --git a/src/dev/net/dist_iface.cc b/src/dev/net/dist_iface.cc
index 79408c304..7eef5d841 100644
--- a/src/dev/net/dist_iface.cc
+++ b/src/dev/net/dist_iface.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015 ARM Limited
+ * Copyright (c) 2015-2016 ARM Limited
* All rights reserved
*
* The license below extends only to copyright in the software and shall
@@ -83,6 +83,16 @@ DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
}
}
+void
+DistIface::Sync::abort()
+{
+ std::unique_lock<std::mutex> sync_lock(lock);
+ waitNum = 0;
+ isAbort = true;
+ sync_lock.unlock();
+ cv.notify_one();
+}
+
DistIface::SyncSwitch::SyncSwitch(int num_nodes)
{
numNodes = num_nodes;
@@ -95,6 +105,7 @@ DistIface::SyncSwitch::SyncSwitch(int num_nodes)
doStopSync = false;
nextAt = std::numeric_limits<Tick>::max();
nextRepeat = std::numeric_limits<Tick>::max();
+ isAbort = false;
}
DistIface::SyncNode::SyncNode()
@@ -108,15 +119,17 @@ DistIface::SyncNode::SyncNode()
doStopSync = false;
nextAt = std::numeric_limits<Tick>::max();
nextRepeat = std::numeric_limits<Tick>::max();
+ isAbort = false;
}
-void
+bool
DistIface::SyncNode::run(bool same_tick)
{
std::unique_lock<std::mutex> sync_lock(lock);
Header header;
assert(waitNum == 0);
+ assert(!isAbort);
waitNum = DistIface::recvThreadsNum;
// initiate the global synchronisation
header.msgType = MsgType::cmdSyncReq;
@@ -135,12 +148,13 @@ DistIface::SyncNode::run(bool same_tick)
// now wait until all receiver threads complete the synchronisation
auto lf = [this]{ return waitNum == 0; };
cv.wait(sync_lock, lf);
- // global synchronisation is done
- assert(!same_tick || (nextAt == curTick()));
+ // global synchronisation is done.
+ assert(isAbort || !same_tick || (nextAt == curTick()));
+ return !isAbort;
}
-void
+bool
DistIface::SyncSwitch::run(bool same_tick)
{
std::unique_lock<std::mutex> sync_lock(lock);
@@ -151,6 +165,8 @@ DistIface::SyncSwitch::run(bool same_tick)
cv.wait(sync_lock, lf);
}
assert(waitNum == 0);
+ if (isAbort) // sync aborted
+ return false;
assert(!same_tick || (nextAt == curTick()));
waitNum = numNodes;
// Complete the global synchronisation
@@ -178,9 +194,10 @@ DistIface::SyncSwitch::run(bool same_tick)
header.needStopSync = ReqType::none;
}
DistIface::master->sendCmd(header);
+ return true;
}
-void
+bool
DistIface::SyncSwitch::progress(Tick send_tick,
Tick sync_repeat,
ReqType need_ckpt,
@@ -188,6 +205,8 @@ DistIface::SyncSwitch::progress(Tick send_tick,
ReqType need_stop_sync)
{
std::unique_lock<std::mutex> sync_lock(lock);
+ if (isAbort) // sync aborted
+ return false;
assert(waitNum > 0);
if (send_tick > nextAt)
@@ -214,9 +233,12 @@ DistIface::SyncSwitch::progress(Tick send_tick,
sync_lock.unlock();
cv.notify_one();
}
+ // The receive thread must keep alive in the switch until the node
+ // closes the connection. Thus, we always return true here.
+ return true;
}
-void
+bool
DistIface::SyncNode::progress(Tick max_send_tick,
Tick next_repeat,
ReqType do_ckpt,
@@ -224,6 +246,8 @@ DistIface::SyncNode::progress(Tick max_send_tick,
ReqType do_stop_sync)
{
std::unique_lock<std::mutex> sync_lock(lock);
+ if (isAbort) // sync aborted
+ return false;
assert(waitNum > 0);
nextAt = max_send_tick;
@@ -238,6 +262,8 @@ DistIface::SyncNode::progress(Tick max_send_tick,
sync_lock.unlock();
cv.notify_one();
}
+ // The receive thread must finish when simulation is about to exit
+ return !doExit;
}
void
@@ -314,7 +340,8 @@ DistIface::SyncEvent::start()
repeat = DistIface::sync->nextRepeat;
// Do a global barrier to agree on a common repeat value (the smallest
// one from all participating nodes.
- DistIface::sync->run(false);
+ if (!DistIface::sync->run(false))
+ panic("DistIface::SyncEvent::start() aborted\n");
assert(!DistIface::sync->doCkpt);
assert(!DistIface::sync->doExit);
@@ -366,13 +393,16 @@ DistIface::SyncEvent::process()
EventQueue::ScopedRelease sr(curEventQueue());
// we do a global sync here that is supposed to happen at the same
// tick in all gem5 peers
- DistIface::sync->run(true);
+ if (!DistIface::sync->run(true))
+ return; // global sync aborted
// global sync completed
}
if (DistIface::sync->doCkpt)
exitSimLoop("checkpoint");
- if (DistIface::sync->doExit)
+ if (DistIface::sync->doExit) {
exitSimLoop("exit request from gem5 peers");
+ return;
+ }
if (DistIface::sync->doStopSync) {
DistIface::sync->doStopSync = false;
inform("synchronization disabled at %lu\n", curTick());
@@ -605,14 +635,16 @@ DistIface::DistIface(unsigned dist_rank,
DistIface::~DistIface()
{
assert(recvThread);
+ recvThread->join();
delete recvThread;
- if (this == master) {
+ if (distIfaceNum-- == 0) {
assert(syncEvent);
delete syncEvent;
assert(sync);
delete sync;
- master = nullptr;
}
+ if (this == master)
+ master = nullptr;
}
void
@@ -654,9 +686,13 @@ DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
// because one of them called m5 exit. So we stop here.
// Grab the eventq lock to stop the simulation thread
curEventQueue()->lock();
- exitSimLoop("Message server closed connection, simulator "
- "is exiting");
+ exitSimLoop("connection to gem5 peer got closed");
curEventQueue()->unlock();
+ // The simulation thread may be blocked in processing an on-going
+ // global synchronisation. Abort the sync to give the simulation
+ // thread a chance to make progress and process the exit event.
+ sync->abort();
+ // Finish receiver thread
break;
}
@@ -668,11 +704,13 @@ DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
header.sendDelay);
} else {
// everything else must be synchronisation related command
- sync->progress(header.sendTick,
- header.syncRepeat,
- header.needCkpt,
- header.needExit,
- header.needStopSync);
+ if (!sync->progress(header.sendTick,
+ header.syncRepeat,
+ header.needCkpt,
+ header.needExit,
+ header.needStopSync))
+ // Finish receiver thread if simulation is about to exit
+ break;
}
}
}