Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,6 @@ Usage of /home/user/go/bin/dumbproxy:
Unix domain socket to listen to, overrides bind-address if set.
-bw-limit uint
per-user bandwidth limit in bytes per second
-bw-limit-buckets uint
number of buckets of bandwidth limit (default 1048576)
-bw-limit-burst int
allowed burst size for bandwidth limit, how many "tokens" can fit into leaky bucket
-bw-limit-separate
Expand Down
159 changes: 94 additions & 65 deletions forward/bwlimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,54 @@ import (
"context"
"errors"
"io"
"math/rand/v2"
"sync"
"time"

"github.com/zeebo/xxh3"
"github.com/ajwerner/orderstat"

"github.com/SenseUnit/dumbproxy/rate"
)

const copyChunkSize = 128 * 1024

type treeItem struct {
key string
mux sync.RWMutex
ul *rate.Limiter
dl *rate.Limiter
}

func (i *treeItem) Less(other orderstat.Item) bool {
return other.(*treeItem).key > i.key
}

func (i *treeItem) rLock() {
i.mux.RLock()
}

func (i *treeItem) rUnlock() {
i.mux.RUnlock()
}

func (i *treeItem) tryLock() bool {
return i.mux.TryLock()
}

type BWLimit struct {
limit rate.Limit
burst int64
d []rate.Limiter
u []rate.Limiter
mux sync.Mutex
m *orderstat.Tree
bps float64
burst int64
separate bool
}

func NewBWLimit(bytesPerSecond float64, burst int64, buckets uint, separate bool) *BWLimit {
if buckets == 0 {
buckets = 1
}
burst = max(copyChunkSize, burst)
lim := *(rate.NewLimiter(burst))
d := make([]rate.Limiter, buckets)
for i := range d {
d[i] = lim
}
u := d
if separate {
u = make([]rate.Limiter, buckets)
for i := range u {
u[i] = lim
}
}
func NewBWLimit(bytesPerSecond float64, burst int64, separate bool) *BWLimit {
return &BWLimit{
limit: rate.Limit(bytesPerSecond),
burst: burst,
d: d,
u: u,
m: orderstat.NewTree(),
bps: bytesPerSecond,
burst: burst,
separate: separate,
}
}

Expand All @@ -53,7 +63,7 @@ func (l *BWLimit) copy(ctx context.Context, rl *rate.Limiter, dst io.Writer, src
var n int64
for {
t := time.Now()
r := rl.ReserveN(l.limit, l.burst, t, copyChunkSize)
r := rl.ReserveN(t, copyChunkSize)
if !r.OK() {
err = errors.New("can't get rate limit reservation")
return
Expand All @@ -70,9 +80,9 @@ func (l *BWLimit) copy(ctx context.Context, rl *rate.Limiter, dst io.Writer, src
n, err = io.Copy(dst, lim)
written += n
if n < copyChunkSize {
r.CancelAt(l.limit, l.burst, t)
r.CancelAt(t)
if n > 0 {
rl.ReserveN(l.limit, l.burst, t, n)
rl.ReserveN(t, n)
}
}
if err != nil {
Expand Down Expand Up @@ -104,21 +114,70 @@ func (l *BWLimit) futureCopyAndCloseWrite(ctx context.Context, c chan<- error, r
close(c)
}

func (l *BWLimit) getRatelimiters(username string) (*rate.Limiter, *rate.Limiter) {
idx := int(hashUsername(username, uint64(len(l.d))))
return &(l.d[idx]), &(l.u[idx])
func (l *BWLimit) newTreeItem(username string) *treeItem {
ul := rate.NewLimiter(rate.Limit(l.bps), max(copyChunkSize, l.burst))
dl := ul
if l.separate {
dl = rate.NewLimiter(rate.Limit(l.bps), max(copyChunkSize, l.burst))
}
return &treeItem{
key: username,
ul: ul,
dl: dl,
}
}

const randomEvictions = 2

func (l *BWLimit) evictRandom() {
for _ = range randomEvictions {
n := l.m.Len()
if n == 0 {
return
}
item := l.m.Select(rand.IntN(n))
if item == nil {
panic("random tree sampling failed")
}
ti := item.(*treeItem)
if ti.tryLock() {
if ti.ul.Tokens() >= float64(ti.ul.Burst()) && ti.dl.Tokens() >= float64(ti.dl.Burst()) {
// RL is full and nobody touches it. Removing...
l.m.Delete(item)
}
}
}
}

func (l *BWLimit) getRatelimiters(username string) *treeItem {
l.mux.Lock()
defer l.mux.Unlock()
item := l.m.Get(&treeItem{
key: username,
})
if item == nil {
ti := l.newTreeItem(username)
ti.rLock()
l.m.ReplaceOrInsert(ti)
l.evictRandom()
return ti
}
ti := item.(*treeItem)
ti.rLock()
return ti
}

func (l *BWLimit) PairConnections(ctx context.Context, username string, incoming, outgoing io.ReadWriteCloser) error {
dl, ul := l.getRatelimiters(username)
ti := l.getRatelimiters(username)
defer ti.rUnlock()

var err error
i2oErr := make(chan error, 1)
o2iErr := make(chan error, 1)
ctxErr := ctx.Done()

go l.futureCopyAndCloseWrite(ctx, i2oErr, ul, outgoing, incoming)
go l.futureCopyAndCloseWrite(ctx, o2iErr, dl, incoming, outgoing)
go l.futureCopyAndCloseWrite(ctx, i2oErr, ti.ul, outgoing, incoming)
go l.futureCopyAndCloseWrite(ctx, o2iErr, ti.dl, incoming, outgoing)

// do while we're listening to children channels
for i2oErr != nil || o2iErr != nil {
Expand All @@ -145,33 +204,3 @@ func (l *BWLimit) PairConnections(ctx context.Context, username string, incoming

return err
}

func hashUsername(s string, nslots uint64) uint64 {
if nslots == 0 {
panic("number of slots can't be zero")
}

hash := xxh3.New()
iv := []byte{0}

if nslots&(nslots-1) == 0 {
hash.Write(iv)
hash.Write([]byte(s))
return hash.Sum64() & (nslots - 1)
}

minBiased := -((-nslots) % nslots) // == 2**64 - (2**64%nslots)

var hv uint64
for {
hash.Write(iv)
hash.Write([]byte(s))
hv = hash.Sum64()
if hv < minBiased {
break
}
iv[0]++
hash.Reset()
}
return hv % nslots
}
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ toolchain go1.24.6

require (
github.com/Snawoot/uniqueslice v0.1.1
github.com/ajwerner/orderstat v0.0.0-20200914031159-0ebfd67afbea
github.com/coreos/go-systemd/v22 v22.6.0
github.com/dop251/goja v0.0.0-20251103141225-af2ceb9156d7
github.com/hashicorp/go-multierror v1.1.1
Expand All @@ -17,7 +18,6 @@ require (
github.com/refraction-networking/utls v1.8.1
github.com/tg123/go-htpasswd v1.2.4
github.com/things-go/go-socks5 v0.1.0
github.com/zeebo/xxh3 v1.0.2
golang.org/x/crypto v0.44.0
golang.org/x/crypto/x509roots/fallback v0.0.0-20251112184832-bcf6a849efcf
golang.org/x/net v0.47.0
Expand All @@ -34,7 +34,6 @@ require (
github.com/google/pprof v0.0.0-20251114195745-4902fdda35c8 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/klauspost/compress v1.18.1 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/term v0.37.0 // indirect
golang.org/x/text v0.31.0 // indirect
Expand Down
13 changes: 7 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0
github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
github.com/Snawoot/uniqueslice v0.1.1 h1:KEfv3FtAXiNEoxvcc79pFQDhnqwYXQyZIkxOM4e/qpw=
github.com/Snawoot/uniqueslice v0.1.1/go.mod h1:K9zIaHO43FGLHbqm6WCDFeY6+CN/du5eiio/vxvDVC8=
github.com/ajwerner/orderstat v0.0.0-20200914031159-0ebfd67afbea h1:eCQW3axgFSgzerNCCUc9E3W8sHIsol3D84SoxqLtRd0=
github.com/ajwerner/orderstat v0.0.0-20200914031159-0ebfd67afbea/go.mod h1:lBcZZIGVJZdjAenFzeOISQY+Gr2lTavurpq4QuJCdog=
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
Expand All @@ -14,6 +16,7 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-systemd/v22 v22.6.0 h1:aGVa/v8B7hpb0TKl0MWoAavPDmHvobFe5R5zn0bCJWo=
github.com/coreos/go-systemd/v22 v22.6.0/go.mod h1:iG+pp635Fo7ZmV/j14KUcmEyWF+0X7Lua8rrTWzYgWU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
Expand All @@ -24,6 +27,7 @@ github.com/dop251/goja v0.0.0-20251103141225-af2ceb9156d7 h1:jxmXU5V9tXxJnydU5v/
github.com/dop251/goja v0.0.0-20251103141225-af2ceb9156d7/go.mod h1:MxLav0peU43GgvwVgNbLAj1s/bSGboKkhuULvq/7hx4=
github.com/go-sourcemap/sourcemap v2.1.4+incompatible h1:a+iTbH5auLKxaNwQFg0B+TCYl6lbukKPc7b5x0n1s6Q=
github.com/go-sourcemap/sourcemap v2.1.4+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/pprof v0.0.0-20251114195745-4902fdda35c8 h1:3DsUAV+VNEQa2CUVLxCY3f87278uWfIDhJnbdvDjvmE=
github.com/google/pprof v0.0.0-20251114195745-4902fdda35c8/go.mod h1:I6V7YzU0XDpsHqbsyrghnFZLO1gwK6NPTNvmetQIk9U=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand All @@ -35,12 +39,11 @@ github.com/jellydator/ttlcache/v3 v3.4.0 h1:YS4P125qQS0tNhtL6aeYkheEaB/m8HCqdMMP
github.com/jellydator/ttlcache/v3 v3.4.0/go.mod h1:Hw9EgjymziQD3yGsQdf1FqFdpp7YjFMd4Srg5EJlgD4=
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQscQm2s=
github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU=
github.com/ncruces/go-dns v1.2.7 h1:NMA7vFqXUl+nBhGFlleLyo2ni3Lqv3v+qFWZidzRemI=
github.com/ncruces/go-dns v1.2.7/go.mod h1:SqmhVMBd8Wr7hsu3q6yTt6/Jno/xLMrbse/JLOMBo1Y=
github.com/petar/GoLLRB v0.0.0-20130427215148-53be0d36a84c/go.mod h1:HUpKUBZnpzkdx0kD/+Yfuft+uD3zHGtXF/XJB14TUr4=
github.com/pires/go-proxyproto v0.8.1 h1:9KEixbdJfhrbtjpz/ZwCdWDD2Xem0NZ38qMYaASJgp0=
github.com/pires/go-proxyproto v0.8.1/go.mod h1:ZKAAyp3cgy5Y5Mo4n9AlScrkCZwUy0g3Jf+slqQVcuU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -49,6 +52,8 @@ github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERS
github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
github.com/refraction-networking/utls v1.8.1 h1:yNY1kapmQU8JeM1sSw2H2asfTIwWxIkrMJI0pRUOCAo=
github.com/refraction-networking/utls v1.8.1/go.mod h1:jkSOEkLqn+S/jtpEHPOsVv/4V4EVnelwbMQl4vCWXAM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.11.0 h1:ib4sjIrwZKxE5u/Japgo/7SJV3PvgjGiRNAvTVGqQl8=
github.com/stretchr/testify v1.11.0/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tg123/go-htpasswd v1.2.4 h1:HgH8KKCjdmo7jjXWN9k1nefPBd7Be3tFCTjc2jPraPU=
Expand All @@ -57,10 +62,6 @@ github.com/things-go/go-socks5 v0.1.0 h1:4f5dz0iMQ6cA4wseFmyLmCHmg3SWJTW92ndrKS6
github.com/things-go/go-socks5 v0.1.0/go.mod h1:Riabiyu52kLsla0YmJqunt1c1JEl6iXSr4bRd7swFEA=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.44.0 h1:A97SsFvM3AIwEEmTBiaxPPTYpDC47w720rdiiUvgoAU=
Expand Down
3 changes: 0 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ type CLIArgs struct {
tlsALPNEnabled bool
bwLimit uint64
bwBurst int64
bwBuckets uint
bwSeparate bool
dnsServers []string
dnsPreferAddress dnsPreferenceArg
Expand Down Expand Up @@ -447,7 +446,6 @@ func parse_args() *CLIArgs {
flag.BoolVar(&args.tlsALPNEnabled, "tls-alpn-enabled", true, "enable application protocol negotiation with TLS ALPN extension")
flag.Uint64Var(&args.bwLimit, "bw-limit", 0, "per-user bandwidth limit in bytes per second")
flag.Int64Var(&args.bwBurst, "bw-limit-burst", 0, "allowed burst size for bandwidth limit, how many \"tokens\" can fit into leaky bucket")
flag.UintVar(&args.bwBuckets, "bw-limit-buckets", 1024*1024, "number of buckets of bandwidth limit")
flag.BoolVar(&args.bwSeparate, "bw-limit-separate", false, "separate upload and download bandwidth limits")
flag.Func("dns-server", "nameserver specification (udp://..., tcp://..., https://..., tls://..., doh://..., dot://..., default://). Option can be used multiple times for parallel use of multiple nameservers. Empty string resets the list", func(p string) error {
if p == "" {
Expand Down Expand Up @@ -646,7 +644,6 @@ func run() int {
forwarder = forward.NewBWLimit(
float64(args.bwLimit),
args.bwBurst,
args.bwBuckets,
args.bwSeparate,
).PairConnections
}
Expand Down
Loading
Loading