aboutsummaryrefslogtreecommitdiffstats
path: root/src/send.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/send.go')
-rw-r--r--src/send.go206
1 files changed, 140 insertions, 66 deletions
diff --git a/src/send.go b/src/send.go
index ab75750..d4f9342 100644
--- a/src/send.go
+++ b/src/send.go
@@ -5,6 +5,8 @@ import (
"golang.org/x/crypto/chacha20poly1305"
"net"
"sync"
+ "sync/atomic"
+ "time"
)
/* Handles outbound flow
@@ -29,6 +31,7 @@ type QueueOutboundElement struct {
packet []byte
nonce uint64
keyPair *KeyPair
+ peer *Peer
}
func (peer *Peer) FlushNonceQueue() {
@@ -46,6 +49,7 @@ func (peer *Peer) InsertOutbound(elem *QueueOutboundElement) {
for {
select {
case peer.queue.outbound <- elem:
+ return
default:
select {
case <-peer.queue.outbound:
@@ -61,11 +65,15 @@ func (peer *Peer) InsertOutbound(elem *QueueOutboundElement) {
* Obs. Single instance per TUN device
*/
func (device *Device) RoutineReadFromTUN(tun TUNDevice) {
+ if tun.MTU() == 0 {
+ // Dummy
+ return
+ }
+
device.log.Debug.Println("Routine, TUN Reader: started")
for {
// read packet
- device.log.Debug.Println("Read")
packet := make([]byte, 1<<16) // TODO: Fix & avoid dynamic allocation
size, err := tun.Read(packet)
if err != nil {
@@ -94,13 +102,16 @@ func (device *Device) RoutineReadFromTUN(tun TUNDevice) {
default:
device.log.Debug.Println("Receieved packet with unknown IP version")
- return
}
if peer == nil {
device.log.Debug.Println("No peer configured for IP")
continue
}
+ if peer.endpoint == nil {
+ device.log.Debug.Println("No known endpoint for peer", peer.id)
+ continue
+ }
// insert into nonce/pre-handshake queue
@@ -131,69 +142,95 @@ func (peer *Peer) RoutineNonce() {
var packet []byte
var keyPair *KeyPair
- for {
+ device := peer.device
+ logger := device.log.Debug
- // wait for packet
+ logger.Println("Routine, nonce worker, started for peer", peer.id)
- if packet == nil {
- select {
- case packet = <-peer.queue.nonce:
- case <-peer.signal.stopSending:
- close(peer.queue.outbound)
- return
+ func() {
+
+ for {
+ NextPacket:
+
+ // wait for packet
+
+ if packet == nil {
+ select {
+ case packet = <-peer.queue.nonce:
+ case <-peer.signal.stop:
+ return
+ }
}
- }
- // wait for key pair
+ // wait for key pair
+
+ for {
+ select {
+ case <-peer.signal.newKeyPair:
+ default:
+ }
- for keyPair == nil {
- peer.signal.newHandshake <- true
- select {
- case <-peer.keyPairs.newKeyPair:
keyPair = peer.keyPairs.Current()
- continue
- case <-peer.signal.flushNonceQueue:
- peer.FlushNonceQueue()
- packet = nil
- continue
- case <-peer.signal.stopSending:
- close(peer.queue.outbound)
- return
- }
- }
+ if keyPair != nil && keyPair.sendNonce < RejectAfterMessages {
+ if time.Now().Sub(keyPair.created) < RejectAfterTime {
+ break
+ }
+ }
- // process current packet
+ sendSignal(peer.signal.handshakeBegin)
+ logger.Println("Waiting for key-pair, peer", peer.id)
- if packet != nil {
+ select {
+ case <-peer.signal.newKeyPair:
+ logger.Println("Key-pair negotiated for peer", peer.id)
+ goto NextPacket
+
+ case <-peer.signal.flushNonceQueue:
+ logger.Println("Clearing queue for peer", peer.id)
+ peer.FlushNonceQueue()
+ packet = nil
+ goto NextPacket
+
+ case <-peer.signal.stop:
+ return
+ }
+ }
- // create work element
+ // process current packet
- work := new(QueueOutboundElement) // TODO: profile, maybe use pool
- work.keyPair = keyPair
- work.packet = packet
- work.nonce = keyPair.sendNonce
- work.mutex.Lock()
+ if packet != nil {
- packet = nil
- keyPair.sendNonce += 1
+ // create work element
- // drop packets until there is space
+ work := new(QueueOutboundElement) // TODO: profile, maybe use pool
+ work.keyPair = keyPair
+ work.packet = packet
+ work.nonce = atomic.AddUint64(&keyPair.sendNonce, 1)
+ work.peer = peer
+ work.mutex.Lock()
- func() {
- for {
- select {
- case peer.device.queue.encryption <- work:
- return
- default:
- drop := <-peer.device.queue.encryption
- drop.packet = nil
- drop.mutex.Unlock()
+ packet = nil
+
+ // drop packets until there is space
+
+ func() {
+ for {
+ select {
+ case peer.device.queue.encryption <- work:
+ return
+ default:
+ drop := <-peer.device.queue.encryption
+ drop.packet = nil
+ drop.mutex.Unlock()
+ }
}
- }
- }()
- peer.queue.outbound <- work
+ }()
+ peer.queue.outbound <- work
+ }
}
- }
+ }()
+
+ logger.Println("Routine, nonce worker, stopped for peer", peer.id)
}
/* Encrypts the elements in the queue
@@ -227,6 +264,10 @@ func (device *Device) RoutineEncryption() {
nil,
)
work.mutex.Unlock()
+
+ // initiate new handshake
+
+ work.peer.KeepKeyFreshSending()
}
}
@@ -235,21 +276,54 @@ func (device *Device) RoutineEncryption() {
* Obs. Single instance per peer.
* The routine terminates then the outbound queue is closed.
*/
-func (peer *Peer) RoutineSequential() {
- for work := range peer.queue.outbound {
- work.mutex.Lock()
- func() {
- peer.mutex.RLock()
- defer peer.mutex.RUnlock()
- if work.packet == nil {
- return
- }
- if peer.endpoint == nil {
- return
- }
- peer.device.conn.WriteToUDP(work.packet, peer.endpoint)
- peer.timer.sendKeepalive.Reset(peer.persistentKeepaliveInterval)
- }()
- work.mutex.Unlock()
+func (peer *Peer) RoutineSequentialSender() {
+ logger := peer.device.log.Debug
+ logger.Println("Routine, sequential sender, started for peer", peer.id)
+
+ device := peer.device
+
+ for {
+ select {
+ case <-peer.signal.stop:
+ logger.Println("Routine, sequential sender, stopped for peer", peer.id)
+ return
+ case work := <-peer.queue.outbound:
+ work.mutex.Lock()
+ func() {
+ if work.packet == nil {
+ return
+ }
+
+ peer.mutex.RLock()
+ defer peer.mutex.RUnlock()
+
+ if peer.endpoint == nil {
+ logger.Println("No endpoint for peer:", peer.id)
+ return
+ }
+
+ device.net.mutex.RLock()
+ defer device.net.mutex.RUnlock()
+
+ if device.net.conn == nil {
+ logger.Println("No source for device")
+ return
+ }
+
+ logger.Println("Sending packet for peer", peer.id, work.packet)
+
+ _, err := device.net.conn.WriteToUDP(work.packet, peer.endpoint)
+ logger.Println("SEND:", peer.endpoint, err)
+ atomic.AddUint64(&peer.tx_bytes, uint64(len(work.packet)))
+
+ // shift keep-alive timer
+
+ if peer.persistentKeepaliveInterval != 0 {
+ interval := time.Duration(peer.persistentKeepaliveInterval) * time.Second
+ peer.timer.sendKeepalive.Reset(interval)
+ }
+ }()
+ work.mutex.Unlock()
+ }
}
}