You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

116 lines
2.5 KiB

package tunnel
import (
"context"
"sync"
"time"
"go.uber.org/atomic"
"github.com/xjasonlyu/tun2socks/v2/core/adapter"
"github.com/xjasonlyu/tun2socks/v2/proxy"
"github.com/xjasonlyu/tun2socks/v2/tunnel/statistic"
)
const (
// tcpConnectTimeout is the default timeout for TCP handshakes.
tcpConnectTimeout = 5 * time.Second
// tcpWaitTimeout implements a TCP half-close timeout.
tcpWaitTimeout = 60 * time.Second
// udpSessionTimeout is the default timeout for UDP sessions.
udpSessionTimeout = 60 * time.Second
)
var _ adapter.TransportHandler = (*Tunnel)(nil)
type Tunnel struct {
// Unbuffered TCP/UDP queues.
tcpQueue chan adapter.TCPConn
udpQueue chan adapter.UDPConn
// UDP session timeout.
udpTimeout *atomic.Duration
// Internal proxy.Dialer for Tunnel.
dialerMu sync.RWMutex
dialer proxy.Dialer
// Where the Tunnel statistics are sent to.
manager *statistic.Manager
procOnce sync.Once
procCancel context.CancelFunc
}
func New(dialer proxy.Dialer, manager *statistic.Manager) *Tunnel {
return &Tunnel{
tcpQueue: make(chan adapter.TCPConn),
udpQueue: make(chan adapter.UDPConn),
udpTimeout: atomic.NewDuration(udpSessionTimeout),
dialer: dialer,
manager: manager,
procCancel: func() { /* nop */ },
}
}
// TCPIn return fan-in TCP queue.
func (t *Tunnel) TCPIn() chan<- adapter.TCPConn {
return t.tcpQueue
}
// UDPIn return fan-in UDP queue.
func (t *Tunnel) UDPIn() chan<- adapter.UDPConn {
return t.udpQueue
}
func (t *Tunnel) HandleTCP(conn adapter.TCPConn) {
t.TCPIn() <- conn
}
func (t *Tunnel) HandleUDP(conn adapter.UDPConn) {
t.UDPIn() <- conn
}
func (t *Tunnel) process(ctx context.Context) {
for {
select {
case conn := <-t.tcpQueue:
go t.handleTCPConn(conn)
case conn := <-t.udpQueue:
go t.handleUDPConn(conn)
case <-ctx.Done():
return
}
}
}
// ProcessAsync can be safely called multiple times, but will only be effective once.
func (t *Tunnel) ProcessAsync() {
t.procOnce.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
t.procCancel = cancel
go t.process(ctx)
})
}
// Close closes the Tunnel and releases its resources.
func (t *Tunnel) Close() {
t.procCancel()
}
func (t *Tunnel) Dialer() proxy.Dialer {
t.dialerMu.RLock()
d := t.dialer
t.dialerMu.RUnlock()
return d
}
func (t *Tunnel) SetDialer(dialer proxy.Dialer) {
t.dialerMu.Lock()
t.dialer = dialer
t.dialerMu.Unlock()
}
func (t *Tunnel) SetUDPTimeout(timeout time.Duration) {
t.udpTimeout.Store(timeout)
}