aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--device/peer.go3
-rw-r--r--device/send.go86
2 files changed, 34 insertions, 55 deletions
diff --git a/device/peer.go b/device/peer.go
index 31b75c7..c094160 100644
--- a/device/peer.go
+++ b/device/peer.go
@@ -17,7 +17,7 @@ import (
)
const (
- PeerRoutineNumber = 3
+ PeerRoutineNumber = 2
)
type Peer struct {
@@ -287,7 +287,6 @@ func (peer *Peer) Stop() {
peer.queue.Lock()
close(peer.queue.nonce)
- close(peer.queue.outbound)
close(peer.queue.inbound)
peer.queue.Unlock()
diff --git a/device/send.go b/device/send.go
index 1b16edd..1f71f79 100644
--- a/device/send.go
+++ b/device/send.go
@@ -372,6 +372,7 @@ func (peer *Peer) RoutineNonce() {
logDebug.Println(peer, "- Routine: nonce worker - stopped")
peer.queue.packetInNonceQueueIsAwaitingKey.Set(false)
device.queue.encryption.wg.Done() // no more writes from us
+ close(peer.queue.outbound) // no more writes to this channel
peer.routines.stopping.Done()
}()
@@ -545,64 +546,43 @@ func (peer *Peer) RoutineSequentialSender() {
logDebug := device.log.Debug
logError := device.log.Error
- defer func() {
- for {
- select {
- case elem, ok := <-peer.queue.outbound:
- if ok {
- elem.Lock()
- if !elem.IsDropped() {
- device.PutMessageBuffer(elem.buffer)
- elem.Drop()
- }
- device.PutOutboundElement(elem)
- }
- default:
- goto out
- }
- }
- out:
- logDebug.Println(peer, "- Routine: sequential sender - stopped")
- peer.routines.stopping.Done()
- }()
-
+ defer logDebug.Println(peer, "- Routine: sequential sender - stopped")
logDebug.Println(peer, "- Routine: sequential sender - started")
- for {
- select {
-
- case <-peer.routines.stop:
- return
-
- case elem, ok := <-peer.queue.outbound:
-
- if !ok {
- return
- }
-
- elem.Lock()
- if elem.IsDropped() {
- device.PutOutboundElement(elem)
- continue
- }
-
- peer.timersAnyAuthenticatedPacketTraversal()
- peer.timersAnyAuthenticatedPacketSent()
-
- // send message and return buffer to pool
-
- err := peer.SendBuffer(elem.packet)
- if len(elem.packet) != MessageKeepaliveSize {
- peer.timersDataSent()
- }
+ for elem := range peer.queue.outbound {
+ elem.Lock()
+ if elem.IsDropped() {
+ device.PutOutboundElement(elem)
+ continue
+ }
+ if !peer.isRunning.Get() {
+ // peer has been stopped; return re-usable elems to the shared pool.
+ // This is an optimization only. It is possible for the peer to be stopped
+ // immediately after this check, in which case, elem will get processed.
+ // The timers and SendBuffer code are resilient to a few stragglers.
+ // TODO(josharian): rework peer shutdown order to ensure
+ // that we never accidentally keep timers alive longer than necessary.
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
- if err != nil {
- logError.Println(peer, "- Failed to send data packet", err)
- continue
- }
+ continue
+ }
+
+ peer.timersAnyAuthenticatedPacketTraversal()
+ peer.timersAnyAuthenticatedPacketSent()
- peer.keepKeyFreshSending()
+ // send message and return buffer to pool
+
+ err := peer.SendBuffer(elem.packet)
+ if len(elem.packet) != MessageKeepaliveSize {
+ peer.timersDataSent()
}
+ device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
+ if err != nil {
+ logError.Println(peer, "- Failed to send data packet", err)
+ continue
+ }
+
+ peer.keepKeyFreshSending()
}
}