aboutsummaryrefslogblamecommitdiffstats
path: root/send.go
blob: f99bdcd3bb4a7f3929aaa08c930641f4905bedd1 (plain) (tree)
1
2
3
4
5
6
7
8
9
10




                                                                                     


            

                                              

                               

              

                     

 
                

               



                                   
  








                                                             
                                                                                 

                                                          
                                                                           
                                                             
   
 
                                  
                     
                          
                                                                      
                                                                    


                                                                

 
                                     
                                      
                                    





                                        

 
                                                                  



                                                                                  


                                          
                                                    


                                                    
                                                            





                                         

                        
                                      
                              

                                

                                            




                                
 










                                            
                                                                       







                                                  




                                         
                                            
 
                                           
 


                                    




                                                                 
 
             
 
                              
 


                                                                           
                               


                                                                                       
                 
 
                                                       

                                
 
                                                               
 
                              
 
                              
                                            
                                  


                                                              
                                                                                     
                                                                   
 
                                  


                                                              
                                                                                     
                                                                   
 
                        
                                                                                   


                                
                                
                 
 

                                                        
                                         
                                                               


                                                                  


         


                                              
  
                                  
   
                                  
                            
 
                             
                                    


                                             
                                                                           
           
 
                                     
                                                                   
 


                        
                                                 
                              
 




                                                    
 


                                            
                                                                 




                                                                                              
 

                                                                 
                                                                             
 
                                        
                                                                     

                                                                                             
                                                       
                                                                 


                                              
 
                                                
 




                                                                                
 
                                                               
 

                                                                           
                 
         

 

                                                                     
  
                             
   
                                           
 
                                                  

                                    















                                                                        





                                     
                                                 
                              
 




                                                           
 




                                             
 
                                                 
 
                                                                          
 


                                                    
 


                                                                                              
 
                                                        
 





                                                                                                    
                         
 
                                                                  

                                                                            





                                                             
                                           
                 







                                                              
                                             
 

                             
                                    
 

                                             
                                                                                

           
                                                                        
 

                                     

                        
 
                                                 
                              
 





                                                       
                                         


                                             
 
                                                                 
 
                                                          
                                                           

                                                            
                                                                                                     


                                                                     
 
                                        
 
                                                                   



                                                                     
                 

         
/* SPDX-License-Identifier: GPL-2.0
 *
 * Copyright (C) 2017-2018 Jason A. Donenfeld <Jason@zx2c4.com>. All Rights Reserved.
 */

package main

import (
	"encoding/binary"
	"golang.org/x/crypto/chacha20poly1305"
	"golang.org/x/net/ipv4"
	"golang.org/x/net/ipv6"
	"net"
	"sync"
	"sync/atomic"
	"time"
)

/* Outbound flow
 *
 * 1. TUN queue
 * 2. Routing (sequential)
 * 3. Nonce assignment (sequential)
 * 4. Encryption (parallel)
 * 5. Transmission (sequential)
 *
 * The functions in this file occur (roughly) in the order in
 * which the packets are processed.
 *
 * Locking, Producers and Consumers
 *
 * The order of packets (per peer) must be maintained,
 * but encryption of packets happen out-of-order:
 *
 * The sequential consumers will attempt to take the lock,
 * workers release lock when they have completed work (encryption) on the packet.
 *
 * If the element is inserted into the "encryption queue",
 * the content is preceded by enough "junk" to contain the transport header
 * (to allow the construction of transport messages in-place)
 */

type QueueOutboundElement struct {
	dropped int32
	mutex   sync.Mutex
	buffer  *[MaxMessageSize]byte // slice holding the packet data
	packet  []byte                // slice of "buffer" (always!)
	nonce   uint64                // nonce for encryption
	keyPair *KeyPair              // key-pair for encryption
	peer    *Peer                 // related peer
}

func (peer *Peer) flushNonceQueue() {
	elems := len(peer.queue.nonce)
	for i := 0; i < elems; i++ {
		select {
		case <-peer.queue.nonce:
		default:
			return
		}
	}
}

func (device *Device) NewOutboundElement() *QueueOutboundElement {
	return &QueueOutboundElement{
		dropped: AtomicFalse,
		buffer:  device.pool.messageBuffers.Get().(*[MaxMessageSize]byte),
	}
}

func (elem *QueueOutboundElement) Drop() {
	atomic.StoreInt32(&elem.dropped, AtomicTrue)
}

func (elem *QueueOutboundElement) IsDropped() bool {
	return atomic.LoadInt32(&elem.dropped) == AtomicTrue
}

func addToOutboundQueue(
	queue chan *QueueOutboundElement,
	element *QueueOutboundElement,
) {
	for {
		select {
		case queue <- element:
			return
		default:
			select {
			case old := <-queue:
				old.Drop()
			default:
			}
		}
	}
}

func addToEncryptionQueue(
	queue chan *QueueOutboundElement,
	element *QueueOutboundElement,
) {
	for {
		select {
		case queue <- element:
			return
		default:
			select {
			case old := <-queue:
				// drop & release to potential consumer
				old.Drop()
				old.mutex.Unlock()
			default:
			}
		}
	}
}

/* Reads packets from the TUN and inserts
 * into nonce queue for peer
 *
 * Obs. Single instance per TUN device
 */
func (device *Device) RoutineReadFromTUN() {

	elem := device.NewOutboundElement()

	logDebug := device.log.Debug
	logError := device.log.Error

	defer func() {
		logDebug.Println("Routine: TUN reader - stopped")
	}()

	logDebug.Println("Routine: TUN reader - started")

	for {

		// read packet

		offset := MessageTransportHeaderSize
		size, err := device.tun.device.Read(elem.buffer[:], offset)

		if err != nil {
			logError.Println("Failed to read packet from TUN device:", err)
			device.Close()
			return
		}

		if size == 0 || size > MaxContentSize {
			continue
		}

		elem.packet = elem.buffer[offset : offset+size]

		// lookup peer

		var peer *Peer
		switch elem.packet[0] >> 4 {
		case ipv4.Version:
			if len(elem.packet) < ipv4.HeaderLen {
				continue
			}
			dst := elem.packet[IPv4offsetDst : IPv4offsetDst+net.IPv4len]
			peer = device.routing.table.LookupIPv4(dst)

		case ipv6.Version:
			if len(elem.packet) < ipv6.HeaderLen {
				continue
			}
			dst := elem.packet[IPv6offsetDst : IPv6offsetDst+net.IPv6len]
			peer = device.routing.table.LookupIPv6(dst)

		default:
			logDebug.Println("Received packet with unknown IP version")
		}

		if peer == nil {
			continue
		}

		// insert into nonce/pre-handshake queue

		if peer.isRunning.Get() {
			peer.event.handshakePushDeadline.Fire()
			addToOutboundQueue(peer.queue.nonce, elem)
			elem = device.NewOutboundElement()
		}
	}
}

/* Queues packets when there is no handshake.
 * Then assigns nonces to packets sequentially
 * and creates "work" structs for workers
 *
 * Obs. A single instance per peer
 */
func (peer *Peer) RoutineNonce() {
	var keyPair *KeyPair

	device := peer.device
	logDebug := device.log.Debug

	defer func() {
		peer.routines.stopping.Done()
		logDebug.Println(peer, ": Routine: nonce worker - stopped")
	}()

	peer.routines.starting.Done()
	logDebug.Println(peer, ": Routine: nonce worker - started")

	for {
	NextPacket:
		select {
		case <-peer.routines.stop.Wait():
			return

		case elem, ok := <-peer.queue.nonce:

			if !ok {
				return
			}

			// wait for key pair

			for {
				keyPair = peer.keyPairs.Current()
				if keyPair != nil && keyPair.sendNonce < RejectAfterMessages {
					if time.Now().Sub(keyPair.created) < RejectAfterTime {
						break
					}
				}

				peer.signal.handshakeBegin.Send()

				logDebug.Println(peer, ": Awaiting key-pair")

				select {
				case <-peer.signal.newKeyPair.Wait():
					logDebug.Println(peer, ": Obtained awaited key-pair")
				case <-peer.signal.flushNonceQueue:
					goto NextPacket
				case <-peer.routines.stop.Wait():
					return
				}
			}

			// populate work element

			elem.peer = peer
			elem.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1
			elem.keyPair = keyPair
			elem.dropped = AtomicFalse
			elem.mutex.Lock()

			// add to parallel and sequential queue

			addToEncryptionQueue(device.queue.encryption, elem)
			addToOutboundQueue(peer.queue.outbound, elem)
		}
	}
}

/* Encrypts the elements in the queue
 * and marks them for sequential consumption (by releasing the mutex)
 *
 * Obs. One instance per core
 */
func (device *Device) RoutineEncryption() {

	var nonce [chacha20poly1305.NonceSize]byte

	logDebug := device.log.Debug

	defer func() {
		for {
			select {
			case elem, ok := <-device.queue.encryption:
				if ok {
					elem.Drop()
				}
			default:
				break
			}
		}
		logDebug.Println("Routine: encryption worker - stopped")
	}()

	logDebug.Println("Routine: encryption worker - started")

	for {

		// fetch next element

		select {
		case <-device.signal.stop.Wait():
			return

		case elem, ok := <-device.queue.encryption:

			if !ok {
				return
			}

			// check if dropped

			if elem.IsDropped() {
				continue
			}

			// populate header fields

			header := elem.buffer[:MessageTransportHeaderSize]

			fieldType := header[0:4]
			fieldReceiver := header[4:8]
			fieldNonce := header[8:16]

			binary.LittleEndian.PutUint32(fieldType, MessageTransportType)
			binary.LittleEndian.PutUint32(fieldReceiver, elem.keyPair.remoteIndex)
			binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)

			// pad content to multiple of 16

			mtu := int(atomic.LoadInt32(&device.tun.mtu))
			rem := len(elem.packet) % PaddingMultiple
			if rem > 0 {
				for i := 0; i < PaddingMultiple-rem && len(elem.packet) < mtu; i++ {
					elem.packet = append(elem.packet, 0)
				}
			}

			// encrypt content and release to consumer

			binary.LittleEndian.PutUint64(nonce[4:], elem.nonce)
			elem.packet = elem.keyPair.send.Seal(
				header,
				nonce[:],
				elem.packet,
				nil,
			)
			elem.mutex.Unlock()
		}
	}
}

/* Sequentially reads packets from queue and sends to endpoint
 *
 * Obs. Single instance per peer.
 * The routine terminates then the outbound queue is closed.
 */
func (peer *Peer) RoutineSequentialSender() {

	device := peer.device

	logDebug := device.log.Debug

	defer func() {
		peer.routines.stopping.Done()
		logDebug.Println(peer, ": Routine: sequential sender - stopped")
	}()

	logDebug.Println(peer, ": Routine: sequential sender - started")

	peer.routines.starting.Done()

	for {
		select {

		case <-peer.routines.stop.Wait():
			return

		case elem, ok := <-peer.queue.outbound:

			if !ok {
				return
			}

			elem.mutex.Lock()
			if elem.IsDropped() {
				continue
			}

			// send message and return buffer to pool

			length := uint64(len(elem.packet))
			err := peer.SendBuffer(elem.packet)
			device.PutMessageBuffer(elem.buffer)
			if err != nil {
				logDebug.Println("Failed to send authenticated packet to peer", peer)
				continue
			}
			atomic.AddUint64(&peer.stats.txBytes, length)

			// update timers

			peer.TimerAnyAuthenticatedPacketTraversal()
			if len(elem.packet) != MessageKeepaliveSize {
				peer.TimerDataSent()
			}
			peer.KeepKeyFreshSending()
		}
	}
}