aboutsummaryrefslogtreecommitdiffstats
path: root/device/send.go
diff options
context:
space:
mode:
Diffstat (limited to 'device/send.go')
-rw-r--r--device/send.go87
1 files changed, 55 insertions, 32 deletions
diff --git a/device/send.go b/device/send.go
index c4aa5b9..edc58c0 100644
--- a/device/send.go
+++ b/device/send.go
@@ -160,7 +160,7 @@ func (peer *Peer) SendHandshakeInitiation(isRetry bool) error {
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
- err = peer.SendBuffer(packet)
+ err = peer.SendBuffer(packet, true)
if err != nil {
peer.device.log.Error.Println(peer, "- Failed to send handshake initiation", err)
}
@@ -198,7 +198,7 @@ func (peer *Peer) SendHandshakeResponse() error {
peer.timersAnyAuthenticatedPacketTraversal()
peer.timersAnyAuthenticatedPacketSent()
- err = peer.SendBuffer(packet)
+ err = peer.SendBuffer(packet, true)
if err != nil {
peer.device.log.Error.Println(peer, "- Failed to send handshake response", err)
}
@@ -219,7 +219,7 @@ func (device *Device) SendHandshakeCookie(initiatingElem *QueueHandshakeElement)
var buff [MessageCookieReplySize]byte
writer := bytes.NewBuffer(buff[:0])
binary.Write(writer, binary.LittleEndian, reply)
- device.net.bind.Send(writer.Bytes(), initiatingElem.endpoint)
+ device.net.bind.Send(writer.Bytes(), initiatingElem.endpoint, true)
if err != nil {
device.log.Error.Println("Failed to send cookie reply:", err)
}
@@ -541,6 +541,33 @@ func (device *Device) RoutineEncryption() {
}
}
+func (peer *Peer) sendElementStopOrFlush(shouldFlush *bool) (stop bool, elemOk bool, elem *QueueOutboundElement) {
+ if !*shouldFlush {
+ select {
+ case <-peer.routines.stop:
+ stop = true
+ return
+ case elem, elemOk = <-peer.queue.outbound:
+ return
+ }
+ } else {
+ select {
+ case <-peer.routines.stop:
+ stop = true
+ return
+ case elem, elemOk = <-peer.queue.outbound:
+ return
+ default:
+ *shouldFlush = false
+ err := peer.device.net.bind.Flush()
+ if err != nil {
+ peer.device.log.Error.Printf("Unable to flush send packets: %v", err)
+ }
+ return peer.sendElementStopOrFlush(shouldFlush)
+ }
+ }
+}
+
/* Sequentially reads packets from queue and sends to endpoint
*
* Obs. Single instance per peer.
@@ -577,41 +604,37 @@ func (peer *Peer) RoutineSequentialSender() {
peer.routines.starting.Done()
+ shouldFlush := false
for {
- select {
-
- case <-peer.routines.stop:
+ stop, ok, elem := peer.sendElementStopOrFlush(&shouldFlush)
+ if stop || !ok {
return
+ }
- case elem, ok := <-peer.queue.outbound:
-
- if !ok {
- return
- }
-
- elem.Lock()
- if elem.IsDropped() {
- device.PutOutboundElement(elem)
- continue
- }
-
- peer.timersAnyAuthenticatedPacketTraversal()
- peer.timersAnyAuthenticatedPacketSent()
+ elem.Lock()
+ if elem.IsDropped() {
+ device.PutOutboundElement(elem)
+ continue
+ }
- // send message and return buffer to pool
+ peer.timersAnyAuthenticatedPacketTraversal()
+ peer.timersAnyAuthenticatedPacketSent()
- err := peer.SendBuffer(elem.packet)
- if len(elem.packet) != MessageKeepaliveSize {
- peer.timersDataSent()
- }
- device.PutMessageBuffer(elem.buffer)
- device.PutOutboundElement(elem)
- if err != nil {
- logError.Println(peer, "- Failed to send data packet", err)
- continue
- }
+ // send message and return buffer to pool
- peer.keepKeyFreshSending()
+ err := peer.SendBuffer(elem.packet, false)
+ if len(elem.packet) != MessageKeepaliveSize {
+ peer.timersDataSent()
}
+ device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
+ if err != nil {
+ logError.Println(peer, "- Failed to send data packet", err)
+ continue
+ } else {
+ shouldFlush = true
+ }
+
+ peer.keepKeyFreshSending()
}
}