From 8993b3927cf66517e2884b181d6b71d4c6599b7a Mon Sep 17 00:00:00 2001 From: Mathias Hall-Andersen Date: Fri, 14 Jul 2017 14:25:18 +0200 Subject: Improved throughput - Improved performance by adding the message buffers to a sync.Pool. - Fixed issue with computing "next" key-pair upon receiving a response message. --- src/send.go | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) (limited to 'src/send.go') diff --git a/src/send.go b/src/send.go index d8ddc82..7a2fe44 100644 --- a/src/send.go +++ b/src/send.go @@ -33,11 +33,11 @@ import ( type QueueOutboundElement struct { dropped int32 mutex sync.Mutex - data [MaxMessageSize]byte // slice holding the packet data - packet []byte // slice of "data" (always!) - nonce uint64 // nonce for encryption - keyPair *KeyPair // key-pair for encryption - peer *Peer // related peer + buffer *[MaxMessageSize]byte // slice holding the packet data + packet []byte // slice of "data" (always!) + nonce uint64 // nonce for encryption + keyPair *KeyPair // key-pair for encryption + peer *Peer // related peer } func (peer *Peer) FlushNonceQueue() { @@ -51,13 +51,11 @@ func (peer *Peer) FlushNonceQueue() { } } -/* - * Assumption: The mutex of the returned element is released - */ func (device *Device) NewOutboundElement() *QueueOutboundElement { - // TODO: profile, consider sync.Pool - elem := new(QueueOutboundElement) - return elem + return &QueueOutboundElement{ + dropped: AtomicFalse, + buffer: device.pool.messageBuffers.Get().(*[MaxMessageSize]byte), + } } func (elem *QueueOutboundElement) Drop() { @@ -130,7 +128,7 @@ func (device *Device) RoutineReadFromTUN(tun TUNDevice) { elem = device.NewOutboundElement() } - elem.packet = elem.data[MessageTransportHeaderSize:] + elem.packet = elem.buffer[MessageTransportHeaderSize:] size, err := tun.Read(elem.packet) if err != nil { @@ -284,7 +282,7 @@ func (device *Device) RoutineEncryption() { // populate header fields func() { - header := work.data[:MessageTransportHeaderSize] + header := work.buffer[:MessageTransportHeaderSize] fieldType := header[0:4] fieldReceiver := header[4:8] @@ -305,7 +303,7 @@ func (device *Device) RoutineEncryption() { nil, ) length := MessageTransportHeaderSize + len(work.packet) - work.packet = work.data[:length] + work.packet = work.buffer[:length] work.mutex.Unlock() // refresh key if necessary @@ -333,12 +331,16 @@ func (peer *Peer) RoutineSequentialSender() { case work := <-peer.queue.outbound: work.mutex.Lock() - if work.IsDropped() { - continue - } func() { + // return buffer to pool after processing + + defer device.PutMessageBuffer(work.buffer) + if work.IsDropped() { + return + } + // send to endpoint peer.mutex.RLock() @@ -357,10 +359,13 @@ func (peer *Peer) RoutineSequentialSender() { return } + // send message and return buffer to pool + _, err := device.net.conn.WriteToUDP(work.packet, peer.endpoint) if err != nil { return } + atomic.AddUint64(&peer.txBytes, uint64(len(work.packet))) // reset keep-alive -- cgit v1.2.3-59-g8ed1b