aboutsummaryrefslogtreecommitdiffstats
path: root/device/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'device/peer.go')
-rw-r--r--device/peer.go23
1 files changed, 13 insertions, 10 deletions
diff --git a/device/peer.go b/device/peer.go
index 3e4f4ec..49b9acb 100644
--- a/device/peer.go
+++ b/device/peer.go
@@ -51,8 +51,11 @@ type Peer struct {
sentLastMinuteHandshake AtomicBool
}
+ state struct {
+ mu sync.Mutex // protects against concurrent Start/Stop
+ }
+
queue struct {
- sync.RWMutex
staged chan *QueueOutboundElement // staged packets before a handshake is available
outbound chan *QueueOutboundElement // sequential ordering of udp transmission
inbound chan *QueueInboundElement // sequential ordering of tun writing
@@ -158,8 +161,8 @@ func (peer *Peer) Start() {
}
// prevent simultaneous start/stop operations
- peer.queue.Lock()
- defer peer.queue.Unlock()
+ peer.state.mu.Lock()
+ defer peer.state.mu.Unlock()
if peer.isRunning.Get() {
return
@@ -177,8 +180,8 @@ func (peer *Peer) Start() {
peer.handshake.mutex.Unlock()
// prepare queues
- peer.queue.outbound = make(chan *QueueOutboundElement, QueueOutboundSize)
- peer.queue.inbound = make(chan *QueueInboundElement, QueueInboundSize)
+ peer.queue.outbound = newAutodrainingOutboundQueue(device)
+ peer.queue.inbound = newAutodrainingInboundQueue(device)
if peer.queue.staged == nil {
peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize)
}
@@ -239,8 +242,8 @@ func (peer *Peer) ExpireCurrentKeypairs() {
}
func (peer *Peer) Stop() {
- peer.queue.Lock()
- defer peer.queue.Unlock()
+ peer.state.mu.Lock()
+ defer peer.state.mu.Unlock()
if !peer.isRunning.Swap(false) {
return
@@ -249,9 +252,9 @@ func (peer *Peer) Stop() {
peer.device.log.Verbosef("%v - Stopping...", peer)
peer.timersStop()
-
- close(peer.queue.inbound)
- close(peer.queue.outbound)
+ // Signal that RoutineSequentialSender and RoutineSequentialReceiver should exit.
+ peer.queue.inbound <- nil
+ peer.queue.outbound <- nil
peer.stopping.Wait()
peer.device.queue.encryption.wg.Done() // no more writes to encryption queue from us