Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,43 @@ function getProxy(req, dst, username) {
>
> This shouldn't be much of concern, though, if `getProxy` function doesn't use dst.resolvedHost and returns consistent values across invocations with the rest of inputs having same values.

### Bandwidth limit definition by JS script

dumbproxy can retrieve bandwidth limit parameters defined by result of `bwLimit` JS function from file specified by `-js-bw-limit` option.

`bwLimit` function is invoked with the [same parameters](#access-filter-by-js-script) as the `access` function. But unlike `access` function it must return object defining bandwidth limit parameters.

Example, equivalent to `-bw-limit 100000 -bw-limit-burst 1000000`:

```js
function bwLimit(req, dst, username) {
return {
uploadBPS: 100000,
uploadBurst: 1000000,
downloadBPS: 100000,
downloadBurst: 1000000,
};
}
```

Following properties of returned object are recognized:

* **uploadBPS** *(Number)* - upload limit rate value.
* **uploadBurst** *(Number)* - upload limit burst value.
* **downloadBPS** *(Number)* - download limit rate value. Ignored if `separate` property is false.
* **downloadBurst** *(Number)* - download limit burst value. Ignored if `separate` property is false.
* **groupKey** *(String)* - aggregation key for limits. All connections with the same `groupKey` value are accounted by one exact rate limit. If value is `null` or not set, username will be used as aggregation key.
* **separate** *(Boolean)* - account upload and download in separate limiters.

> [!NOTE]
> `bwLimit` can be invoked more than once per request.

> [!NOTE]
> Numeric values of limit parameters are used to (re-)create rate limit object, but do not update existing object unless it is evicted from cache naturally (which happens only if it has full bucket and no connections holding a shared lock on it).

> [!NOTE]
> `resolvedHost` property of destination address object is always null for `bwLimit` invocations because this information is not available in this context. Only `originalHost` and `port` are available.

### Scripting functions

Following builtin functions are addionally available within JS scripts:
Expand Down Expand Up @@ -515,7 +552,7 @@ Usage of /home/user/go/bin/dumbproxy:
allow multiple server instances on the same port
-bind-unix-socket value
Unix domain socket to listen to, overrides bind-address if set.
-bw-limit uint
-bw-limit value
per-user bandwidth limit in bytes per second
-bw-limit-burst int
allowed burst size for bandwidth limit, how many "tokens" can fit into leaky bucket
Expand Down Expand Up @@ -555,6 +592,10 @@ Usage of /home/user/go/bin/dumbproxy:
path to JS script file with the "access" filter function
-js-access-filter-instances int
number of JS VM instances to handle access filter requests (default 4)
-js-bw-limit value
path to JS script with "bwLimit" function. Overrides every other BW limit option
-js-bw-limit-instances int
number of JS VM instances to handle requests for BW limit parameters (default 4)
-js-proxy-router value
path to JS script file with the "getProxy" function
-js-proxy-router-instances int
Expand All @@ -570,7 +611,7 @@ Usage of /home/user/go/bin/dumbproxy:
-min-tls-version value
minimum TLS version accepted by server (default TLS12)
-mode value
proxy operation mode (http/socks5/stdio) (default http)
proxy operation mode (http/socks5/stdio/port-forward) (default http)
-passwd string
update given htpasswd file and add/set password for username. Username and password can be passed as positional arguments or requested interactively
-passwd-cost int
Expand Down
111 changes: 93 additions & 18 deletions forward/bwlimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,86 @@ package forward
import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"

"github.com/Snawoot/secache"
"github.com/Snawoot/secache/randmap"

clog "github.com/SenseUnit/dumbproxy/log"
"github.com/SenseUnit/dumbproxy/rate"
)

const copyChunkSize = 128 * 1024

type LimitKind int

const (
LimitKindNone LimitKind = iota
LimitKindStatic
LimitKindJS
)

type LimitSpec struct {
Kind LimitKind
Spec any
}

type StaticLimitSpec struct {
BPS uint64
Burst int64
Separate bool
}

type JSLimitSpec struct {
Filename string
Instances int
}

type LimitParameters struct {
UploadBPS float64 `json:"uploadBPS"`
UploadBurst int64 `json:"uploadBurst"`
DownloadBPS float64 `json:"downloadBPS"`
DownloadBurst int64 `json:"downloadBurst"`
GroupKey *string `json:"groupKey"`
Separate bool `json:"separate"`
}

type LimitProvider = func(context.Context, string, string, string) (*LimitParameters, error)

func ProviderFromSpec(spec LimitSpec, logger *clog.CondLogger) (LimitProvider, error) {
switch spec.Kind {
case LimitKindStatic:
staticSpec, ok := spec.Spec.(StaticLimitSpec)
if !ok {
return nil, fmt.Errorf("incorrect payload type in BW limit spec: %T", spec)
}
return func(_ context.Context, username, _, _ string) (*LimitParameters, error) {
return &LimitParameters{
UploadBPS: float64(staticSpec.BPS),
UploadBurst: staticSpec.Burst,
DownloadBPS: float64(staticSpec.BPS),
DownloadBurst: staticSpec.Burst,
GroupKey: &username,
Separate: staticSpec.Separate,
}, nil
}, nil
case LimitKindJS:
jsSpec, ok := spec.Spec.(JSLimitSpec)
if !ok {
return nil, fmt.Errorf("incorrect payload type in BW limit spec: %T", spec)
}
j, err := NewJSLimitProvider(jsSpec.Filename, jsSpec.Instances, logger)
if err != nil {
return nil, err
}
return j.Parameters, nil
}
return nil, fmt.Errorf("unsupported BW limit kind %d", int(spec.Kind))
}

type cacheItem struct {
mux sync.RWMutex
ul *rate.Limiter
Expand All @@ -38,17 +106,13 @@ func (i *cacheItem) unlock() {
}

type BWLimit struct {
bps float64
burst int64
separate bool
cache secache.Cache[string, *cacheItem]
paramFn LimitProvider
cache secache.Cache[string, *cacheItem]
}

func NewBWLimit(bytesPerSecond float64, burst int64, separate bool) *BWLimit {
func NewBWLimit(p LimitProvider) *BWLimit {
return &BWLimit{
bps: bytesPerSecond,
burst: burst,
separate: separate,
paramFn: p,
cache: *(secache.New[string, *cacheItem](3, func(_ string, item *cacheItem) bool {
if item.tryLock() {
if item.ul.Tokens() >= float64(item.ul.Burst()) && item.dl.Tokens() >= float64(item.dl.Burst()) {
Expand Down Expand Up @@ -120,35 +184,46 @@ func (l *BWLimit) futureCopyAndCloseWrite(ctx context.Context, c chan<- error, r
close(c)
}

func (l *BWLimit) getRatelimiters(username string) (res *cacheItem) {
func (l *BWLimit) getRatelimiters(ctx context.Context, username, network, address string) (*cacheItem, error) {
params, err := l.paramFn(ctx, username, network, address)
if err != nil {
return nil, err
}
groupKey := username
if params.GroupKey != nil {
groupKey = *params.GroupKey
}
var res *cacheItem
l.cache.Do(func(m *randmap.RandMap[string, *cacheItem]) {
var ok bool
res, ok = m.Get(username)
res, ok = m.Get(groupKey)
if ok {
res.rLock()
} else {
ul := rate.NewLimiter(rate.Limit(l.bps), max(copyChunkSize, l.burst))
ul := rate.NewLimiter(rate.Limit(params.UploadBPS), max(copyChunkSize, params.UploadBurst))
dl := ul
if l.separate {
dl = rate.NewLimiter(rate.Limit(l.bps), max(copyChunkSize, l.burst))
if params.Separate {
dl = rate.NewLimiter(rate.Limit(params.DownloadBPS), max(copyChunkSize, params.DownloadBurst))
}
res = &cacheItem{
ul: ul,
dl: dl,
}
res.rLock()
l.cache.SetLocked(m, username, res)
l.cache.SetLocked(m, groupKey, res)
}
return
})
return
return res, nil
}

func (l *BWLimit) PairConnections(ctx context.Context, username string, incoming, outgoing io.ReadWriteCloser) error {
ci := l.getRatelimiters(username)
func (l *BWLimit) PairConnections(ctx context.Context, username string, incoming, outgoing io.ReadWriteCloser, network, address string) error {
ci, err := l.getRatelimiters(ctx, username, network, address)
if err != nil {
return fmt.Errorf("ratelimit parameter computarion failed for user %q: %w", username, err)
}
defer ci.rUnlock()

var err error
i2oErr := make(chan error, 1)
o2iErr := make(chan error, 1)
ctxErr := ctx.Done()
Expand Down
2 changes: 1 addition & 1 deletion forward/direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func futureCopyAndCloseWrite(c chan<- error, dst io.WriteCloser, src io.ReadClos
close(c)
}

func PairConnections(ctx context.Context, username string, incoming, outgoing io.ReadWriteCloser) error {
func PairConnections(ctx context.Context, username string, incoming, outgoing io.ReadWriteCloser, _, _ string) error {
var err error
i2oErr := make(chan error, 1)
o2iErr := make(chan error, 1)
Expand Down
107 changes: 107 additions & 0 deletions forward/jslimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package forward

import (
"context"
"errors"
"fmt"
"os"

"github.com/dop251/goja"
"golang.org/x/sync/errgroup"

"github.com/SenseUnit/dumbproxy/dialer/dto"
"github.com/SenseUnit/dumbproxy/jsext"
clog "github.com/SenseUnit/dumbproxy/log"
)

type JSLimitFunc = func(req *jsext.JSRequestInfo, dst *jsext.JSDstInfo, username string) (*LimitParameters, error)

type JSLimitProvider struct {
funcPool chan JSLimitFunc
logger *clog.CondLogger
}

func NewJSLimitProvider(filename string, instances int, logger *clog.CondLogger) (*JSLimitProvider, error) {
script, err := os.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("unable to load JS script file %q: %w", filename, err)
}

instances = max(1, instances)
pool := make(chan JSLimitFunc, instances)
initGroup, _ := errgroup.WithContext(context.Background())

for i := 0; i < instances; i++ {
initGroup.Go(func() error {
vm := goja.New()
err := jsext.AddPrinter(vm, logger)
if err != nil {
return fmt.Errorf("can't add print function to runtime: %w", err)
}
err = jsext.ConfigureRuntime(vm)
if err != nil {
return fmt.Errorf("can't configure runtime: %w", err)
}
_, err = vm.RunString(string(script))
if err != nil {
return fmt.Errorf("script run failed: %w", err)
}

var f JSLimitFunc
var limitFnJSVal goja.Value
if ex := vm.Try(func() {
limitFnJSVal = vm.Get("bwLimit")
}); ex != nil {
return fmt.Errorf("\"bwLimit\" function cannot be located in VM context: %w", err)
}
if limitFnJSVal == nil {
return errors.New("\"bwLimit\" function is not defined")
}
err = vm.ExportTo(limitFnJSVal, &f)
if err != nil {
return fmt.Errorf("can't export \"bwLimit\" function from JS VM: %w", err)
}

pool <- f
return nil
})
}

err = initGroup.Wait()
if err != nil {
return nil, err
}

return &JSLimitProvider{
funcPool: pool,
logger: logger,
}, nil
}

func (j *JSLimitProvider) Parameters(ctx context.Context, username, network, address string) (res *LimitParameters, err error) {
defer func() {
if err != nil {
j.logger.Error("%v", err)
}
}()
req, _ := dto.FilterParamsFromContext(ctx)
ri := jsext.JSRequestInfoFromRequest(req)
di, err := jsext.JSDstInfoFromContext(ctx, network, address)
if err != nil {
return nil, fmt.Errorf("unable to construct dst info: %w", err)
}
func() {
f := <-j.funcPool
defer func(pool chan JSLimitFunc, f JSLimitFunc) {
pool <- f
}(j.funcPool, f)
res, err = f(ri, di, username)
}()
if err != nil {
return nil, fmt.Errorf("JS limit script exception: %w", err)
}
if res == nil {
return nil, fmt.Errorf("JS limit script returned null object")
}
return res, nil
}
5 changes: 1 addition & 4 deletions handler/config.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package handler

import (
"context"
"io"

"github.com/SenseUnit/dumbproxy/auth"
clog "github.com/SenseUnit/dumbproxy/log"
)
Expand All @@ -19,7 +16,7 @@ type Config struct {
Logger *clog.CondLogger
// Forward optionally specifies custom connection pairing function
// which does actual data forwarding.
Forward func(ctx context.Context, username string, incoming, outgoing io.ReadWriteCloser) error
Forward ForwardFunc
// UserIPHints specifies whether allow IP hints set by user or not
UserIPHints bool
}
Loading