aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJason A. Donenfeld <Jason@zx2c4.com>2019-07-03 16:48:15 +0200
committerJason A. Donenfeld <Jason@zx2c4.com>2019-07-03 16:48:15 +0200
commit2b2b15dd315116deed14622a61ebd8f1b83ad5e1 (patch)
tree09de2cc5058afe60170c1843e5d64306cfb54297
parentdevice: receive: uniform message for source address check (diff)
downloadwireguard-go-jd/multiflush.tar.xz
wireguard-go-jd/multiflush.zip
tun: windows: use multiple flush buffers in paralleljd/multiflush
-rw-r--r--tun/tun_windows.go198
1 files changed, 109 insertions, 89 deletions
diff --git a/tun/tun_windows.go b/tun/tun_windows.go
index 543482e..0ce5ba9 100644
--- a/tun/tun_windows.go
+++ b/tun/tun_windows.go
@@ -15,39 +15,43 @@ import (
"unsafe"
"golang.org/x/sys/windows"
+
"golang.zx2c4.com/wireguard/tun/wintun"
)
const (
- packetExchangeAlignment uint32 = 16 // Number of bytes packets are aligned to in exchange buffers
- packetSizeMax uint32 = 0xf000 - packetExchangeAlignment // Maximum packet size
- packetExchangeSize uint32 = 0x100000 // Exchange buffer size (defaults to 1MiB)
- retryRate = 4 // Number of retries per second to reopen device pipe
- retryTimeout = 30 // Number of seconds to tolerate adapter unavailable
+ packetExchangeAlignment = 16 // Number of bytes packets are aligned to in exchange buffers
+ packetSizeMax = 0xf000 - packetExchangeAlignment // Maximum packet size
+ packetExchangeSize = 0x100000 // Exchange buffer size (1 MB)
+ writeExchanges = 128 // Number of simultaneous flushes
+ retryRate = 4 // Number of retries per second to reopen device pipe
+ retryTimeout = 30 // Number of seconds to tolerate adapter unavailable
)
type exchgBufRead struct {
+ file *os.File
data [packetExchangeSize]byte
offset uint32
avail uint32
}
type exchgBufWrite struct {
- data [packetExchangeSize]byte
- offset uint32
+ fd windows.Handle
+ data [packetExchangeSize]byte
+ offset uint32
+ flushing sync.Mutex
}
type NativeTun struct {
- wt *wintun.Wintun
- tunFileRead *os.File
- tunFileWrite *os.File
- tunLock sync.Mutex
- close bool
- rdBuff *exchgBufRead
- wrBuff *exchgBufWrite
- events chan Event
- errors chan error
- forcedMTU int
+ wt *wintun.Wintun
+ tunLock sync.Mutex
+ close bool
+ events chan Event
+ errors chan error
+ forcedMTU int
+ currentWrBuff int
+ rdBuff exchgBufRead
+ wrBuffs [writeExchanges]exchgBufWrite
}
func packetAlign(size uint32) uint32 {
@@ -103,8 +107,6 @@ func CreateTUNWithRequestedGUID(ifname string, requestedGUID *windows.GUID) (Dev
return &NativeTun{
wt: wt,
- rdBuff: &exchgBufRead{},
- wrBuff: &exchgBufWrite{},
events: make(chan Event, 10),
errors: make(chan error, 1),
forcedMTU: 1500,
@@ -119,8 +121,8 @@ func (tun *NativeTun) openTUN() error {
var err error
name := tun.wt.DataFileName()
- for tun.tunFileRead == nil {
- tun.tunFileRead, err = os.OpenFile(name, os.O_RDONLY, 0)
+ for tun.rdBuff.file == nil {
+ tun.rdBuff.file, err = os.OpenFile(name, os.O_RDONLY, 0)
if err != nil {
if retries > 0 && !tun.close {
time.Sleep(time.Second / retryRate)
@@ -130,66 +132,77 @@ func (tun *NativeTun) openTUN() error {
return err
}
}
- for tun.tunFileWrite == nil {
- tun.tunFileWrite, err = os.OpenFile(name, os.O_WRONLY, 0)
- if err != nil {
- if retries > 0 && !tun.close {
- time.Sleep(time.Second / retryRate)
- retries--
- continue
+ for i := range tun.wrBuffs {
+ for tun.wrBuffs[i].fd == 0 {
+ tun.wrBuffs[i].fd, err = windows.CreateFile(windows.StringToUTF16Ptr(name), windows.GENERIC_WRITE, windows.FILE_SHARE_READ|windows.FILE_SHARE_WRITE|windows.FILE_SHARE_DELETE, nil, windows.OPEN_EXISTING, windows.FILE_ATTRIBUTE_NORMAL|windows.FILE_FLAG_OVERLAPPED, 0)
+ if err != nil {
+ if retries > 0 && !tun.close {
+ time.Sleep(time.Second / retryRate)
+ retries--
+ continue
+ }
+ return err
}
- return err
+ firstSize := (*uint32)(unsafe.Pointer(&tun.wrBuffs[i].data[0]))
+ saved := *firstSize
+ *firstSize = 0
+ // Set the maximum buffer length with an invalid write.
+ var o windows.Overlapped
+ if windows.WriteFile(tun.wrBuffs[i].fd, tun.wrBuffs[i].data[:], nil, &o) == windows.ERROR_IO_PENDING {
+ var done uint32
+ windows.GetOverlappedResult(tun.wrBuffs[i].fd, &o, &done, true)
+ }
+ *firstSize = saved
}
- firstSize := (*uint32)(unsafe.Pointer(&tun.wrBuff.data[0]))
- saved := *firstSize
- *firstSize = 0
- // Set the maximum buffer length with an invalid write.
- tun.tunFileWrite.Write(tun.wrBuff.data[:])
- *firstSize = saved
}
+
return nil
}
func (tun *NativeTun) closeTUN() (err error) {
- for tun.tunFileRead != nil {
+ for tun.rdBuff.file != nil {
tun.tunLock.Lock()
- if tun.tunFileRead == nil {
+ if tun.rdBuff.file == nil {
tun.tunLock.Unlock()
break
}
- t := tun.tunFileRead
- tun.tunFileRead = nil
+ t := tun.rdBuff.file
+ tun.rdBuff.file = nil
windows.CancelIoEx(windows.Handle(t.Fd()), nil)
err = t.Close()
tun.tunLock.Unlock()
break
}
- for tun.tunFileWrite != nil {
- tun.tunLock.Lock()
- if tun.tunFileWrite == nil {
+ for i := range tun.wrBuffs {
+ for tun.wrBuffs[i].fd != 0 {
+ tun.tunLock.Lock()
+ if tun.wrBuffs[i].fd == 0 {
+ tun.tunLock.Unlock()
+ break
+ }
+ t := tun.wrBuffs[i].fd
+ tun.wrBuffs[i].fd = 0
+ windows.CancelIoEx(t, nil)
+ err2 := windows.Close(t)
tun.tunLock.Unlock()
+ if err == nil {
+ err = err2
+ }
break
}
- t := tun.tunFileWrite
- tun.tunFileWrite = nil
- windows.CancelIoEx(windows.Handle(t.Fd()), nil)
- err2 := t.Close()
- tun.tunLock.Unlock()
- if err == nil {
- err = err2
- }
- break
}
return
}
-func (tun *NativeTun) getTUN() (read *os.File, write *os.File, err error) {
- read, write = tun.tunFileRead, tun.tunFileWrite
- if read == nil || write == nil {
- read, write = nil, nil
+func (tun *NativeTun) getTUN() (read *os.File, write windows.Handle, err error) {
+ wrBuff := &tun.wrBuffs[tun.currentWrBuff]
+ read, write = tun.rdBuff.file, wrBuff.fd
+ if read == nil || write == 0 {
+ read, write = nil, 0
tun.tunLock.Lock()
- if tun.tunFileRead != nil && tun.tunFileWrite != nil {
- read, write = tun.tunFileRead, tun.tunFileWrite
+ wrBuff = &tun.wrBuffs[tun.currentWrBuff]
+ if tun.rdBuff.file != nil && wrBuff.fd != 0 {
+ read, write = tun.rdBuff.file, wrBuff.fd
tun.tunLock.Unlock()
return
}
@@ -200,7 +213,7 @@ func (tun *NativeTun) getTUN() (read *os.File, write *os.File, err error) {
}
err = tun.openTUN()
if err == nil {
- read, write = tun.tunFileRead, tun.tunFileWrite
+ read, write = tun.rdBuff.file, wrBuff.fd
}
tun.tunLock.Unlock()
return
@@ -309,44 +322,50 @@ func (tun *NativeTun) Read(buff []byte, offset int) (int, error) {
// Note: flush() and putTunPacket() assume the caller comes only from a single thread; there's no locking.
func (tun *NativeTun) Flush() error {
- if tun.wrBuff.offset == 0 {
+ wrBuff := &tun.wrBuffs[tun.currentWrBuff]
+ if wrBuff.offset == 0 {
return nil
}
- defer func() {
- tun.wrBuff.offset = 0
- }()
- retries := maybeRetry(1000)
-
- for {
- // Get TUN data pipe.
- _, file, err := tun.getTUN()
- if err != nil {
- return err
- }
-
- for {
- _, err = file.Write(tun.wrBuff.data[:tun.wrBuff.offset])
+ _, _, err := tun.getTUN()
+ if err != nil {
+ return err
+ }
+ wrBuff.flushing.Lock()
+ o := &windows.Overlapped{}
+ err = windows.WriteFile(wrBuff.fd, wrBuff.data[:wrBuff.offset], nil, o)
+ if err == windows.ERROR_IO_PENDING {
+ go func(o *windows.Overlapped, wrBuff *exchgBufWrite) {
+ var done uint32
+ err := windows.GetOverlappedResult(wrBuff.fd, o, &done, true)
+ wrBuff.offset = 0
+ wrBuff.flushing.Unlock()
if err != nil {
- pe, ok := err.(*os.PathError)
+ retries := maybeRetry(1000)
if tun.close {
- return os.ErrClosed
+ tun.errors <- os.ErrClosed
}
- if retries > 0 && ok && pe.Err == windows.ERROR_OPERATION_ABORTED { // Adapter is paused or in low-power state.
- retries--
- time.Sleep(time.Millisecond * 2)
- continue
+ if retries > 0 && err == windows.ERROR_OPERATION_ABORTED { // Adapter is paused or in low-power state.
+ return
}
- if retries > 0 && ok && pe.Err == windows.ERROR_HANDLE_EOF { // Adapter is going down.
- retries--
+ if retries > 0 && err == windows.ERROR_HANDLE_EOF { // Adapter is going down.
tun.closeTUN()
- time.Sleep(time.Millisecond * 2)
- break
+ return
}
- return err
+ tun.errors <- err
}
- return nil
- }
+ }(o, wrBuff)
+ } else if err == nil {
+ wrBuff.offset = 0
+ wrBuff.flushing.Unlock()
+ return nil
+ } else {
+ return err
}
+ tun.currentWrBuff = (tun.currentWrBuff + 1) % writeExchanges
+ wrBuff = &tun.wrBuffs[tun.currentWrBuff]
+ wrBuff.flushing.Lock()
+ wrBuff.flushing.Unlock()
+ return nil
}
func (tun *NativeTun) putTunPacket(buff []byte) error {
@@ -358,8 +377,9 @@ func (tun *NativeTun) putTunPacket(buff []byte) error {
return errors.New("Packet too big")
}
pSize := packetAlign(packetExchangeAlignment + size)
+ wrBuff := &tun.wrBuffs[tun.currentWrBuff]
- if tun.wrBuff.offset+pSize >= packetExchangeSize {
+ if wrBuff.offset+pSize >= packetExchangeSize {
// Exchange buffer is full -> flush first.
err := tun.Flush()
if err != nil {
@@ -368,12 +388,12 @@ func (tun *NativeTun) putTunPacket(buff []byte) error {
}
// Write packet to the exchange buffer.
- packet := tun.wrBuff.data[tun.wrBuff.offset : tun.wrBuff.offset+pSize]
+ packet := wrBuff.data[wrBuff.offset : wrBuff.offset+pSize]
*(*uint32)(unsafe.Pointer(&packet[0])) = size
packet = packet[packetExchangeAlignment : packetExchangeAlignment+size]
copy(packet, buff)
- tun.wrBuff.offset += pSize
+ wrBuff.offset += pSize
return nil
}