From 47e06b525c3fbae3ec843d2513368df47ebfcf1b Mon Sep 17 00:00:00 2001 From: Git Sagar Date: Sat, 6 Jun 2026 23:16:27 +0530 Subject: [PATCH] tunnel: replace write mutex with channel-based single writer All writes (frames, keepalive, DHCP renewal) are queued to a buffered channel and drained by a single writer goroutine. Eliminates mutex contention on the data path entirely. Co-Authored-By: Claude Opus 4.6 (1M context) --- pkg/client/tunnel.go | 63 ++++++++++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 25 deletions(-) diff --git a/pkg/client/tunnel.go b/pkg/client/tunnel.go index 5a36c44..1332204 100644 --- a/pkg/client/tunnel.go +++ b/pkg/client/tunnel.go @@ -22,13 +22,15 @@ const ( keepAliveMagic uint32 = 0xFFFFFFFF // Magic value indicating a keepalive packet maxKeepaliveSize uint32 = 512 // Max random data in keepalive keepAliveInterval = 3 * time.Second + writeChanSize = 128 // Buffered write queue depth ) // Tunnel handles bidirectional TCP block framing for Ethernet frames over a // SoftEther VPN session. Each "block" is one Ethernet frame. type Tunnel struct { sess *Session - writeMu sync.Mutex + writeCh chan []byte // serialized messages queued for the single writer + writeErr error // last write error stopCh chan struct{} stopped sync.Once } @@ -36,15 +38,32 @@ type Tunnel struct { // NewTunnel creates a tunnel from an established session. // Call StartKeepalive() before reading/writing frames. func NewTunnel(sess *Session) *Tunnel { - return &Tunnel{ - sess: sess, - stopCh: make(chan struct{}), + t := &Tunnel{ + sess: sess, + writeCh: make(chan []byte, writeChanSize), + stopCh: make(chan struct{}), + } + go t.writeLoop() + return t +} + +// writeLoop is the single goroutine that writes to the connection. +// All writes are serialized through writeCh — no mutex needed. +func (t *Tunnel) writeLoop() { + for buf := range t.writeCh { + if _, err := t.sess.Conn.Write(buf); err != nil { + t.writeErr = err + return + } } } // Close stops the keepalive goroutine and closes the underlying connection. func (t *Tunnel) Close() error { - t.stopped.Do(func() { close(t.stopCh) }) + t.stopped.Do(func() { + close(t.stopCh) + close(t.writeCh) + }) return t.sess.Conn.Close() } @@ -84,8 +103,7 @@ func (t *Tunnel) ReadFrames() ([][]byte, error) { } // WriteFrames sends a batch of Ethernet frames to the server. -// Safe for concurrent use from multiple goroutines. -// Assembles the entire message into one buffer for a single TLS write. +// Safe for concurrent use — messages are queued for the single writer goroutine. func (t *Tunnel) WriteFrames(frames [][]byte) error { // Calculate total size: 4 (numBlocks) + per frame: 4 (size) + len(data) total := 4 @@ -104,14 +122,12 @@ func (t *Tunnel) WriteFrames(frames [][]byte) error { off += len(f) } - t.writeMu.Lock() - _, err := t.sess.Conn.Write(buf) - t.writeMu.Unlock() - - if err != nil { - return fmt.Errorf("write frames: %w", err) + select { + case t.writeCh <- buf: + return t.writeErr + case <-t.stopCh: + return fmt.Errorf("tunnel closed") } - return nil } // FrameHandler is called for each Ethernet frame received from the server. @@ -188,18 +204,15 @@ func (t *Tunnel) StartKeepalive() { size := uint32(rng.Intn(int(maxKeepaliveSize))) + 1 rng.Read(randBuf[:size]) - t.writeMu.Lock() - err1 := binary.Write(t.sess.Conn, binary.BigEndian, keepAliveMagic) - var err2, err3 error - if err1 == nil { - err2 = binary.Write(t.sess.Conn, binary.BigEndian, size) - } - if err2 == nil { - _, err3 = t.sess.Conn.Write(randBuf[:size]) - } - t.writeMu.Unlock() + // Assemble keepalive into one buffer + buf := make([]byte, 8+size) + binary.BigEndian.PutUint32(buf[0:4], keepAliveMagic) + binary.BigEndian.PutUint32(buf[4:8], size) + copy(buf[8:], randBuf[:size]) - if err1 != nil || err2 != nil || err3 != nil { + select { + case t.writeCh <- buf: + case <-t.stopCh: return } }