aboutsummaryrefslogtreecommitdiffstats
path: root/device/send.go
diff options
context:
space:
mode:
Diffstat (limited to 'device/send.go')
-rw-r--r--device/send.go652
1 files changed, 292 insertions, 360 deletions
diff --git a/device/send.go b/device/send.go
index 72633be..769720a 100644
--- a/device/send.go
+++ b/device/send.go
@@ -1,6 +1,6 @@
/* SPDX-License-Identifier: MIT
*
- * Copyright (C) 2017-2019 WireGuard LLC. All Rights Reserved.
+ * Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved.
*/
package device
@@ -8,14 +8,17 @@ package device
import (
"bytes"
"encoding/binary"
+ "errors"
"net"
+ "os"
"sync"
- "sync/atomic"
"time"
"golang.org/x/crypto/chacha20poly1305"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
+ "golang.zx2c4.com/wireguard/conn"
+ "golang.zx2c4.com/wireguard/tun"
)
/* Outbound flow
@@ -43,8 +46,6 @@ import (
*/
type QueueOutboundElement struct {
- dropped int32
- sync.Mutex
buffer *[MaxMessageSize]byte // slice holding the packet data
packet []byte // slice of "buffer" (always!)
nonce uint64 // nonce for encryption
@@ -52,80 +53,52 @@ type QueueOutboundElement struct {
peer *Peer // related peer
}
+type QueueOutboundElementsContainer struct {
+ sync.Mutex
+ elems []*QueueOutboundElement
+}
+
func (device *Device) NewOutboundElement() *QueueOutboundElement {
elem := device.GetOutboundElement()
- elem.dropped = AtomicFalse
elem.buffer = device.GetMessageBuffer()
- elem.Mutex = sync.Mutex{}
elem.nonce = 0
- elem.keypair = nil
- elem.peer = nil
+ // keypair and peer were cleared (if necessary) by clearPointers.
return elem
}
-func (elem *QueueOutboundElement) Drop() {
- atomic.StoreInt32(&elem.dropped, AtomicTrue)
-}
-
-func (elem *QueueOutboundElement) IsDropped() bool {
- return atomic.LoadInt32(&elem.dropped) == AtomicTrue
-}
-
-func addToNonceQueue(queue chan *QueueOutboundElement, element *QueueOutboundElement, device *Device) {
- for {
- select {
- case queue <- element:
- return
- default:
- select {
- case old := <-queue:
- device.PutMessageBuffer(old.buffer)
- device.PutOutboundElement(old)
- default:
- }
- }
- }
+// clearPointers clears elem fields that contain pointers.
+// This makes the garbage collector's life easier and
+// avoids accidentally keeping other objects around unnecessarily.
+// It also reduces the possible collateral damage from use-after-free bugs.
+func (elem *QueueOutboundElement) clearPointers() {
+ elem.buffer = nil
+ elem.packet = nil
+ elem.keypair = nil
+ elem.peer = nil
}
-func addToOutboundAndEncryptionQueues(outboundQueue chan *QueueOutboundElement, encryptionQueue chan *QueueOutboundElement, element *QueueOutboundElement) {
- select {
- case outboundQueue <- element:
+/* Queues a keepalive if no packets are queued for peer
+ */
+func (peer *Peer) SendKeepalive() {
+ if len(peer.queue.staged) == 0 && peer.isRunning.Load() {
+ elem := peer.device.NewOutboundElement()
+ elemsContainer := peer.device.GetOutboundElementsContainer()
+ elemsContainer.elems = append(elemsContainer.elems, elem)
select {
- case encryptionQueue <- element:
- return
+ case peer.queue.staged <- elemsContainer:
+ peer.device.log.Verbosef("%v - Sending keepalive packet", peer)
default:
- element.Drop()
- element.peer.device.PutMessageBuffer(element.buffer)
- element.Unlock()
+ peer.device.PutMessageBuffer(elem.buffer)
+ peer.device.PutOutboundElement(elem)
+ peer.device.PutOutboundElementsContainer(elemsContainer)
}
- default:
- element.peer.device.PutMessageBuffer(element.buffer)
- element.peer.device.PutOutboundElement(element)
- }
-}
-
-/* Queues a keepalive if no packets are queued for peer
- */
-func (peer *Peer) SendKeepalive() bool {
- if len(peer.queue.nonce) != 0 || peer.queue.packetInNonceQueueIsAwaitingKey.Get() || !peer.isRunning.Get() {
- return false
- }
- elem := peer.device.NewOutboundElement()
- elem.packet = nil
- select {
- case peer.queue.nonce <- elem:
- peer.device.log.Debug.Println(peer, "- Sending keepalive packet")
- return true
- default:
- peer.device.PutMessageBuffer(elem.buffer)
- peer.device.PutOutboundElement(elem)
- return false
}
+ peer.SendStagedPackets()
}
func (peer *Peer) SendHandshakeInitiation(isRetry bool) error {
if !isRetry {
- atomic.StoreUint32(&peer.timers.handshakeAttempts, 0)
+ peer.timers.handshakeAttempts.Store(0)
}
peer.handshake.mutex.RLock()
@@ -143,16 +116,16 @@ func (peer *Peer) SendHandshakeInitiation(isRetry bool) error {
peer.handshake.lastSentHandshake = time.Now()
peer.handshake.mutex.Unlock()
- peer.device.log.Debug.Println(peer, "- Sending handshake initiation")
+ peer.device.log.Verbosef("%v - Sending handshake initiation", peer)
msg, err := peer.device.CreateMessageInitiation(peer)
if err != nil {
- peer.device.log.Error.Println(peer, "- Failed to create initiation message:", err)
+ peer.device.log.Errorf("%v - Failed to create initiation message: %v", peer, err)
return err
}
- var buff [MessageInitiationSize]byte
- writer := bytes.NewBuffer(buff[:0])
+ var buf [MessageInitiationSize]byte
+ writer := bytes.NewBuffer(buf[:0])
binary.Write(writer, binary.LittleEndian, msg)
packet := writer.Bytes()
peer.cookieGenerator.AddMacs(packet)
@@ -160,9 +133,9 @@ func (peer *Peer) SendHandshakeInitiation(isRetry bool) error {
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
- err = peer.SendBuffer(packet)
+ err = peer.SendBuffers([][]byte{packet})
if err != nil {
- peer.device.log.Error.Println(peer, "- Failed to send handshake initiation", err)
+ peer.device.log.Errorf("%v - Failed to send handshake initiation: %v", peer, err)
}
peer.timersHandshakeInitiated()
@@ -174,23 +147,23 @@ func (peer *Peer) SendHandshakeResponse() error {
peer.handshake.lastSentHandshake = time.Now()
peer.handshake.mutex.Unlock()
- peer.device.log.Debug.Println(peer, "- Sending handshake response")
+ peer.device.log.Verbosef("%v - Sending handshake response", peer)
response, err := peer.device.CreateMessageResponse(peer)
if err != nil {
- peer.device.log.Error.Println(peer, "- Failed to create response message:", err)
+ peer.device.log.Errorf("%v - Failed to create response message: %v", peer, err)
return err
}
- var buff [MessageResponseSize]byte
- writer := bytes.NewBuffer(buff[:0])
+ var buf [MessageResponseSize]byte
+ writer := bytes.NewBuffer(buf[:0])
binary.Write(writer, binary.LittleEndian, response)
packet := writer.Bytes()
peer.cookieGenerator.AddMacs(packet)
err = peer.BeginSymmetricSession()
if err != nil {
- peer.device.log.Error.Println(peer, "- Failed to derive keypair:", err)
+ peer.device.log.Errorf("%v - Failed to derive keypair: %v", peer, err)
return err
}
@@ -198,28 +171,29 @@ func (peer *Peer) SendHandshakeResponse() error {
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
- err = peer.SendBuffer(packet)
+ // TODO: allocation could be avoided
+ err = peer.SendBuffers([][]byte{packet})
if err != nil {
- peer.device.log.Error.Println(peer, "- Failed to send handshake response", err)
+ peer.device.log.Errorf("%v - Failed to send handshake response: %v", peer, err)
}
return err
}
func (device *Device) SendHandshakeCookie(initiatingElem *QueueHandshakeElement) error {
-
- device.log.Debug.Println("Sending cookie response for denied handshake message for", initiatingElem.endpoint.DstToString())
+ device.log.Verbosef("Sending cookie response for denied handshake message for %v", initiatingElem.endpoint.DstToString())
sender := binary.LittleEndian.Uint32(initiatingElem.packet[4:8])
reply, err := device.cookieChecker.CreateReply(initiatingElem.packet, sender, initiatingElem.endpoint.DstToBytes())
if err != nil {
- device.log.Error.Println("Failed to create cookie reply:", err)
+ device.log.Errorf("Failed to create cookie reply: %v", err)
return err
}
- var buff [MessageCookieReplySize]byte
- writer := bytes.NewBuffer(buff[:0])
+ var buf [MessageCookieReplySize]byte
+ writer := bytes.NewBuffer(buf[:0])
binary.Write(writer, binary.LittleEndian, reply)
- device.net.bind.Send(writer.Bytes(), initiatingElem.endpoint)
+ // TODO: allocation could be avoided
+ device.net.bind.Send([][]byte{writer.Bytes()}, initiatingElem.endpoint)
return nil
}
@@ -228,280 +202,255 @@ func (peer *Peer) keepKeyFreshSending() {
if keypair == nil {
return
}
- nonce := atomic.LoadUint64(&keypair.sendNonce)
+ nonce := keypair.sendNonce.Load()
if nonce > RekeyAfterMessages || (keypair.isInitiator && time.Since(keypair.created) > RekeyAfterTime) {
peer.SendHandshakeInitiation(false)
}
}
-/* Reads packets from the TUN and inserts
- * into nonce queue for peer
- *
- * Obs. Single instance per TUN device
- */
func (device *Device) RoutineReadFromTUN() {
-
- logDebug := device.log.Debug
- logError := device.log.Error
-
defer func() {
- logDebug.Println("Routine: TUN reader - stopped")
+ device.log.Verbosef("Routine: TUN reader - stopped")
device.state.stopping.Done()
+ device.queue.encryption.wg.Done()
}()
- logDebug.Println("Routine: TUN reader - started")
- device.state.starting.Done()
-
- var elem *QueueOutboundElement
+ device.log.Verbosef("Routine: TUN reader - started")
+
+ var (
+ batchSize = device.BatchSize()
+ readErr error
+ elems = make([]*QueueOutboundElement, batchSize)
+ bufs = make([][]byte, batchSize)
+ elemsByPeer = make(map[*Peer]*QueueOutboundElementsContainer, batchSize)
+ count = 0
+ sizes = make([]int, batchSize)
+ offset = MessageTransportHeaderSize
+ )
+
+ for i := range elems {
+ elems[i] = device.NewOutboundElement()
+ bufs[i] = elems[i].buffer[:]
+ }
- for {
- if elem != nil {
- device.PutMessageBuffer(elem.buffer)
- device.PutOutboundElement(elem)
+ defer func() {
+ for _, elem := range elems {
+ if elem != nil {
+ device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
+ }
}
- elem = device.NewOutboundElement()
-
- // read packet
-
- offset := MessageTransportHeaderSize
- size, err := device.tun.device.Read(elem.buffer[:], offset)
+ }()
- if err != nil {
- if !device.isClosed.Get() {
- logError.Println("Failed to read packet from TUN device:", err)
- device.Close()
+ for {
+ // read packets
+ count, readErr = device.tun.device.Read(bufs, sizes, offset)
+ for i := 0; i < count; i++ {
+ if sizes[i] < 1 {
+ continue
}
- device.PutMessageBuffer(elem.buffer)
- device.PutOutboundElement(elem)
- return
- }
- if size == 0 || size > MaxContentSize {
- continue
- }
+ elem := elems[i]
+ elem.packet = bufs[i][offset : offset+sizes[i]]
- elem.packet = elem.buffer[offset : offset+size]
+ // lookup peer
+ var peer *Peer
+ switch elem.packet[0] >> 4 {
+ case 4:
+ if len(elem.packet) < ipv4.HeaderLen {
+ continue
+ }
+ dst := elem.packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len]
+ peer = device.allowedips.Lookup(dst)
- // lookup peer
+ case 6:
+ if len(elem.packet) < ipv6.HeaderLen {
+ continue
+ }
+ dst := elem.packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len]
+ peer = device.allowedips.Lookup(dst)
- var peer *Peer
- switch elem.packet[0] >> 4 {
- case ipv4.Version:
- if len(elem.packet) < ipv4.HeaderLen {
- continue
+ default:
+ device.log.Verbosef("Received packet with unknown IP version")
}
- dst := elem.packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len]
- peer = device.allowedips.LookupIPv4(dst)
- case ipv6.Version:
- if len(elem.packet) < ipv6.HeaderLen {
+ if peer == nil {
continue
}
- dst := elem.packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len]
- peer = device.allowedips.LookupIPv6(dst)
-
- default:
- logDebug.Println("Received packet with unknown IP version")
+ elemsForPeer, ok := elemsByPeer[peer]
+ if !ok {
+ elemsForPeer = device.GetOutboundElementsContainer()
+ elemsByPeer[peer] = elemsForPeer
+ }
+ elemsForPeer.elems = append(elemsForPeer.elems, elem)
+ elems[i] = device.NewOutboundElement()
+ bufs[i] = elems[i].buffer[:]
}
- if peer == nil {
- continue
+ for peer, elemsForPeer := range elemsByPeer {
+ if peer.isRunning.Load() {
+ peer.StagePackets(elemsForPeer)
+ peer.SendStagedPackets()
+ } else {
+ for _, elem := range elemsForPeer.elems {
+ device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
+ }
+ device.PutOutboundElementsContainer(elemsForPeer)
+ }
+ delete(elemsByPeer, peer)
}
- // insert into nonce/pre-handshake queue
-
- if peer.isRunning.Get() {
- if peer.queue.packetInNonceQueueIsAwaitingKey.Get() {
- peer.SendHandshakeInitiation(false)
+ if readErr != nil {
+ if errors.Is(readErr, tun.ErrTooManySegments) {
+ // TODO: record stat for this
+ // This will happen if MSS is surprisingly small (< 576)
+ // coincident with reasonably high throughput.
+ device.log.Verbosef("Dropped some packets from multi-segment read: %v", readErr)
+ continue
}
- addToNonceQueue(peer.queue.nonce, elem, device)
- elem = nil
+ if !device.isClosed() {
+ if !errors.Is(readErr, os.ErrClosed) {
+ device.log.Errorf("Failed to read packet from TUN device: %v", readErr)
+ }
+ go device.Close()
+ }
+ return
}
}
}
-func (peer *Peer) FlushNonceQueue() {
- select {
- case peer.signals.flushNonceQueue <- struct{}{}:
- default:
- }
-}
-
-/* Queues packets when there is no handshake.
- * Then assigns nonces to packets sequentially
- * and creates "work" structs for workers
- *
- * Obs. A single instance per peer
- */
-func (peer *Peer) RoutineNonce() {
- var keypair *Keypair
-
- device := peer.device
- logDebug := device.log.Debug
-
- flush := func() {
- for {
- select {
- case elem := <-peer.queue.nonce:
- device.PutMessageBuffer(elem.buffer)
- device.PutOutboundElement(elem)
- default:
- return
+func (peer *Peer) StagePackets(elems *QueueOutboundElementsContainer) {
+ for {
+ select {
+ case peer.queue.staged <- elems:
+ return
+ default:
+ }
+ select {
+ case tooOld := <-peer.queue.staged:
+ for _, elem := range tooOld.elems {
+ peer.device.PutMessageBuffer(elem.buffer)
+ peer.device.PutOutboundElement(elem)
}
+ peer.device.PutOutboundElementsContainer(tooOld)
+ default:
}
}
+}
- defer func() {
- flush()
- logDebug.Println(peer, "- Routine: nonce worker - stopped")
- peer.queue.packetInNonceQueueIsAwaitingKey.Set(false)
- peer.routines.stopping.Done()
- }()
+func (peer *Peer) SendStagedPackets() {
+top:
+ if len(peer.queue.staged) == 0 || !peer.device.isUp() {
+ return
+ }
- peer.routines.starting.Done()
- logDebug.Println(peer, "- Routine: nonce worker - started")
+ keypair := peer.keypairs.Current()
+ if keypair == nil || keypair.sendNonce.Load() >= RejectAfterMessages || time.Since(keypair.created) >= RejectAfterTime {
+ peer.SendHandshakeInitiation(false)
+ return
+ }
for {
- NextPacket:
- peer.queue.packetInNonceQueueIsAwaitingKey.Set(false)
-
+ var elemsContainerOOO *QueueOutboundElementsContainer
select {
- case <-peer.routines.stop:
- return
-
- case <-peer.signals.flushNonceQueue:
- flush()
- goto NextPacket
-
- case elem, ok := <-peer.queue.nonce:
-
- if !ok {
- return
- }
-
- // make sure to always pick the newest key
-
- for {
-
- // check validity of newest key pair
-
- keypair = peer.keypairs.Current()
- if keypair != nil && keypair.sendNonce < RejectAfterMessages {
- if time.Since(keypair.created) < RejectAfterTime {
- break
+ case elemsContainer := <-peer.queue.staged:
+ i := 0
+ for _, elem := range elemsContainer.elems {
+ elem.peer = peer
+ elem.nonce = keypair.sendNonce.Add(1) - 1
+ if elem.nonce >= RejectAfterMessages {
+ keypair.sendNonce.Store(RejectAfterMessages)
+ if elemsContainerOOO == nil {
+ elemsContainerOOO = peer.device.GetOutboundElementsContainer()
}
- }
- peer.queue.packetInNonceQueueIsAwaitingKey.Set(true)
-
- // no suitable key pair, request for new handshake
-
- select {
- case <-peer.signals.newKeypairArrived:
- default:
+ elemsContainerOOO.elems = append(elemsContainerOOO.elems, elem)
+ continue
+ } else {
+ elemsContainer.elems[i] = elem
+ i++
}
- peer.SendHandshakeInitiation(false)
-
- // wait for key to be established
-
- logDebug.Println(peer, "- Awaiting keypair")
+ elem.keypair = keypair
+ }
+ elemsContainer.Lock()
+ elemsContainer.elems = elemsContainer.elems[:i]
- select {
- case <-peer.signals.newKeypairArrived:
- logDebug.Println(peer, "- Obtained awaited keypair")
+ if elemsContainerOOO != nil {
+ peer.StagePackets(elemsContainerOOO) // XXX: Out of order, but we can't front-load go chans
+ }
- case <-peer.signals.flushNonceQueue:
- device.PutMessageBuffer(elem.buffer)
- device.PutOutboundElement(elem)
- flush()
- goto NextPacket
+ if len(elemsContainer.elems) == 0 {
+ peer.device.PutOutboundElementsContainer(elemsContainer)
+ goto top
+ }
- case <-peer.routines.stop:
- device.PutMessageBuffer(elem.buffer)
- device.PutOutboundElement(elem)
- return
+ // add to parallel and sequential queue
+ if peer.isRunning.Load() {
+ peer.queue.outbound.c <- elemsContainer
+ peer.device.queue.encryption.c <- elemsContainer
+ } else {
+ for _, elem := range elemsContainer.elems {
+ peer.device.PutMessageBuffer(elem.buffer)
+ peer.device.PutOutboundElement(elem)
}
+ peer.device.PutOutboundElementsContainer(elemsContainer)
}
- peer.queue.packetInNonceQueueIsAwaitingKey.Set(false)
-
- // populate work element
- elem.peer = peer
- elem.nonce = atomic.AddUint64(&keypair.sendNonce, 1) - 1
-
- // double check in case of race condition added by future code
-
- if elem.nonce >= RejectAfterMessages {
- atomic.StoreUint64(&keypair.sendNonce, RejectAfterMessages)
- device.PutMessageBuffer(elem.buffer)
- device.PutOutboundElement(elem)
- goto NextPacket
+ if elemsContainerOOO != nil {
+ goto top
}
+ default:
+ return
+ }
+ }
+}
- elem.keypair = keypair
- elem.dropped = AtomicFalse
- elem.Lock()
-
- // add to parallel and sequential queue
- addToOutboundAndEncryptionQueues(peer.queue.outbound, device.queue.encryption, elem)
+func (peer *Peer) FlushStagedPackets() {
+ for {
+ select {
+ case elemsContainer := <-peer.queue.staged:
+ for _, elem := range elemsContainer.elems {
+ peer.device.PutMessageBuffer(elem.buffer)
+ peer.device.PutOutboundElement(elem)
+ }
+ peer.device.PutOutboundElementsContainer(elemsContainer)
+ default:
+ return
}
}
}
+func calculatePaddingSize(packetSize, mtu int) int {
+ lastUnit := packetSize
+ if mtu == 0 {
+ return ((lastUnit + PaddingMultiple - 1) & ^(PaddingMultiple - 1)) - lastUnit
+ }
+ if lastUnit > mtu {
+ lastUnit %= mtu
+ }
+ paddedSize := ((lastUnit + PaddingMultiple - 1) & ^(PaddingMultiple - 1))
+ if paddedSize > mtu {
+ paddedSize = mtu
+ }
+ return paddedSize - lastUnit
+}
+
/* Encrypts the elements in the queue
* and marks them for sequential consumption (by releasing the mutex)
*
* Obs. One instance per core
*/
-func (device *Device) RoutineEncryption() {
-
+func (device *Device) RoutineEncryption(id int) {
+ var paddingZeros [PaddingMultiple]byte
var nonce [chacha20poly1305.NonceSize]byte
- logDebug := device.log.Debug
-
- defer func() {
- for {
- select {
- case elem, ok := <-device.queue.encryption:
- if ok && !elem.IsDropped() {
- elem.Drop()
- device.PutMessageBuffer(elem.buffer)
- elem.Unlock()
- }
- default:
- goto out
- }
- }
- out:
- logDebug.Println("Routine: encryption worker - stopped")
- device.state.stopping.Done()
- }()
-
- logDebug.Println("Routine: encryption worker - started")
- device.state.starting.Done()
-
- for {
-
- // fetch next element
-
- select {
- case <-device.signals.stop:
- return
-
- case elem, ok := <-device.queue.encryption:
-
- if !ok {
- return
- }
-
- // check if dropped
-
- if elem.IsDropped() {
- continue
- }
+ defer device.log.Verbosef("Routine: encryption worker %d - stopped", id)
+ device.log.Verbosef("Routine: encryption worker %d - started", id)
+ for elemsContainer := range device.queue.encryption.c {
+ for _, elem := range elemsContainer.elems {
// populate header fields
-
header := elem.buffer[:MessageTransportHeaderSize]
fieldType := header[0:4]
@@ -513,16 +462,8 @@ func (device *Device) RoutineEncryption() {
binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)
// pad content to multiple of 16
-
- mtu := int(atomic.LoadInt32(&device.tun.mtu))
- lastUnit := len(elem.packet) % mtu
- paddedSize := (lastUnit + PaddingMultiple - 1) & ^(PaddingMultiple - 1)
- if paddedSize > mtu {
- paddedSize = mtu
- }
- for i := len(elem.packet); i < paddedSize; i++ {
- elem.packet = append(elem.packet, 0)
- }
+ paddingSize := calculatePaddingSize(len(elem.packet), int(device.tun.mtu.Load()))
+ elem.packet = append(elem.packet, paddingZeros[:paddingSize]...)
// encrypt content and release to consumer
@@ -533,82 +474,73 @@ func (device *Device) RoutineEncryption() {
elem.packet,
nil,
)
- elem.Unlock()
}
+ elemsContainer.Unlock()
}
}
-/* Sequentially reads packets from queue and sends to endpoint
- *
- * Obs. Single instance per peer.
- * The routine terminates then the outbound queue is closed.
- */
-func (peer *Peer) RoutineSequentialSender() {
-
+func (peer *Peer) RoutineSequentialSender(maxBatchSize int) {
device := peer.device
-
- logDebug := device.log.Debug
- logError := device.log.Error
-
defer func() {
- for {
- select {
- case elem, ok := <-peer.queue.outbound:
- if ok {
- if !elem.IsDropped() {
- device.PutMessageBuffer(elem.buffer)
- elem.Drop()
- }
- device.PutOutboundElement(elem)
- }
- default:
- goto out
- }
- }
- out:
- logDebug.Println(peer, "- Routine: sequential sender - stopped")
- peer.routines.stopping.Done()
+ defer device.log.Verbosef("%v - Routine: sequential sender - stopped", peer)
+ peer.stopping.Done()
}()
+ device.log.Verbosef("%v - Routine: sequential sender - started", peer)
- logDebug.Println(peer, "- Routine: sequential sender - started")
-
- peer.routines.starting.Done()
-
- for {
- select {
+ bufs := make([][]byte, 0, maxBatchSize)
- case <-peer.routines.stop:
+ for elemsContainer := range peer.queue.outbound.c {
+ bufs = bufs[:0]
+ if elemsContainer == nil {
return
-
- case elem, ok := <-peer.queue.outbound:
-
- if !ok {
- return
- }
-
- elem.Lock()
- if elem.IsDropped() {
+ }
+ if !peer.isRunning.Load() {
+ // peer has been stopped; return re-usable elems to the shared pool.
+ // This is an optimization only. It is possible for the peer to be stopped
+ // immediately after this check, in which case, elem will get processed.
+ // The timers and SendBuffers code are resilient to a few stragglers.
+ // TODO: rework peer shutdown order to ensure
+ // that we never accidentally keep timers alive longer than necessary.
+ elemsContainer.Lock()
+ for _, elem := range elemsContainer.elems {
+ device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
- continue
}
-
- peer.timersAnyAuthenticatedPacketTraversal()
- peer.timersAnyAuthenticatedPacketSent()
-
- // send message and return buffer to pool
-
- err := peer.SendBuffer(elem.packet)
+ continue
+ }
+ dataSent := false
+ elemsContainer.Lock()
+ for _, elem := range elemsContainer.elems {
if len(elem.packet) != MessageKeepaliveSize {
- peer.timersDataSent()
+ dataSent = true
}
+ bufs = append(bufs, elem.packet)
+ }
+
+ peer.timersAnyAuthenticatedPacketTraversal()
+ peer.timersAnyAuthenticatedPacketSent()
+
+ err := peer.SendBuffers(bufs)
+ if dataSent {
+ peer.timersDataSent()
+ }
+ for _, elem := range elemsContainer.elems {
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
- if err != nil {
- logError.Println(peer, "- Failed to send data packet", err)
- continue
+ }
+ device.PutOutboundElementsContainer(elemsContainer)
+ if err != nil {
+ var errGSO conn.ErrUDPGSODisabled
+ if errors.As(err, &errGSO) {
+ device.log.Verbosef(err.Error())
+ err = errGSO.RetryErr
}
-
- peer.keepKeyFreshSending()
}
+ if err != nil {
+ device.log.Errorf("%v - Failed to send data packets: %v", peer, err)
+ continue
+ }
+
+ peer.keepKeyFreshSending()
}
}