Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ require (
github.com/sirupsen/logrus v1.9.3
github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6
github.com/skycoin/skycoin v0.28.1-0.20250823221707-c533551dfabd //DO NOT MODIFY OR UPDATE v0.28.1-0.20241105130348-39b49a2d0a7f
github.com/skycoin/skywire v1.3.31-0.20250724153549-ec7ca3554d42
github.com/skycoin/skywire v1.3.31-0.20250810155428-30d83a379b39
github.com/spf13/cobra v1.9.1
github.com/stretchr/testify v1.10.0
golang.org/x/net v0.43.0
golang.org/x/sys v0.35.0
golang.org/x/term v0.34.0
)

require github.com/xtaci/smux v1.5.34

require (
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
github.com/bytedance/sonic v1.14.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6 h1:1Nc5EBY6pjfw1kwW0
github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:UXghlricA7J3aRD/k7p/zBObQfmBawwCxIVPVjz2Q3o=
github.com/skycoin/skycoin v0.28.1-0.20250823221707-c533551dfabd h1:yKo1t3+P78TcCZvWqEJDV7DAB162C3qVHDKLjB8b2hA=
github.com/skycoin/skycoin v0.28.1-0.20250823221707-c533551dfabd/go.mod h1:9w5J+CJ7fWwkmpttrQ2SFksiSPc0t0DtwsCdXLdl4Qg=
github.com/skycoin/skywire v1.3.31-0.20250724153549-ec7ca3554d42 h1:9Hr/ht404g8fDo80Bw9YIPwu0IuDKrG3mRkZeH6y/Vc=
github.com/skycoin/skywire v1.3.31-0.20250724153549-ec7ca3554d42/go.mod h1:JnR5EJHpryaFFILpPpFJybtUT+0+2/aQxSxgjKvsPZs=
github.com/skycoin/skywire v1.3.31-0.20250810155428-30d83a379b39 h1:6+YIdW2rrU9ZDCvigTP9j/oUGcWgEzPPNFM0yo7Z2F0=
github.com/skycoin/skywire v1.3.31-0.20250810155428-30d83a379b39/go.mod h1:8fUvhqqo54SR0lMlUpGX/qnXSjKZEQw6TFYsXzwBAqg=
github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g=
github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo=
github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0=
Expand All @@ -145,6 +145,8 @@ github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
github.com/xtaci/smux v1.5.34 h1:OUA9JaDFHJDT8ZT3ebwLWPAgEfE6sWo2LaTy3anXqwg=
github.com/xtaci/smux v1.5.34/go.mod h1:OMlQbT5vcgl2gb49mFkYo6SMf+zP3rcjcwQz7ZU7IGY=
golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c=
golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk=
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
Expand Down
3 changes: 3 additions & 0 deletions pkg/disc/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ type Entry struct {

// Signature for proving authenticity of an Entry.
Signature string `json:"signature,omitempty"`

// Protocol is the lib that use for multiplexing.
Protocol string `json:"protocol,omitempty"`
}

func (e *Entry) String() string {
Expand Down
32 changes: 31 additions & 1 deletion pkg/dmsg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"sync"
"time"

"github.com/hashicorp/yamux"
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher"
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/logging"
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/netutil"
"github.com/xtaci/smux"
"golang.org/x/net/proxy"

"github.com/skycoin/dmsg/pkg/disc"
Expand Down Expand Up @@ -47,6 +49,7 @@ type Config struct {
Callbacks *ClientCallbacks
ClientType string
ConnectedServersType string
Protocol string
}

// Ensure ensures all config values are set.
Expand Down Expand Up @@ -156,6 +159,8 @@ func (ce *Client) Serve(ctx context.Context) {

updateEntryLoopOnce := new(sync.Once)

needInitialPost := true

for {
if isClosed(ce.done) {
return
Expand Down Expand Up @@ -208,6 +213,18 @@ func (ce *Client) Serve(ctx context.Context) {
rand.Shuffle(len(entries), func(i, j int) {
entries[i], entries[j] = entries[j], entries[i]
})

if needInitialPost {
// use this for put protocol type of client to disc, for dicision part of dmsg-server
err = ce.initilizeClientEntry(cancellabelCtx, ce.conf.ClientType, ce.conf.Protocol)
if err != nil {
ce.log.WithError(err).Warn("Initial post entry failed")
} else {
ce.log.WithError(err).Info("Initial post entry successed")
}
needInitialPost = false
}

for n, entry := range entries {
if isClosed(ce.done) {
return
Expand Down Expand Up @@ -490,7 +507,7 @@ func (ce *Client) EnsureSession(ctx context.Context, entry *disc.Entry) error {
ce.log.WithField("remote_pk", entry.Static).Debug("Session already exists...")
return nil
}

entry.Protocol = ce.conf.Protocol
// Dial session.
_, err := ce.dialSession(ctx, entry)
return err
Expand Down Expand Up @@ -537,6 +554,19 @@ func (ce *Client) dialSession(ctx context.Context, entry *disc.Entry) (cs Client
if err != nil {
return ClientSession{}, err
}
if entry.Protocol == "smux" {
dSes.sm.smux, err = smux.Client(conn, smux.DefaultConfig())
if err != nil {
return ClientSession{}, err
}
ce.log.Infof("smux stream session initial for %s", dSes.RemotePK().String())
} else {
dSes.sm.yamux, err = yamux.Client(conn, yamux.DefaultConfig())
if err != nil {
return ClientSession{}, err
}
ce.log.Infof("yamux stream session initial for %s", dSes.RemotePK().String())
}

if !ce.setSession(ctx, dSes.SessionCommon) {
_ = dSes.Close() //nolint:errcheck
Expand Down
34 changes: 34 additions & 0 deletions pkg/dmsg/entity_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,29 @@ func (c *EntityCommon) updateServerEntryLoop(ctx context.Context, addr string, m
}
}

func (c *EntityCommon) initilizeClientEntry(ctx context.Context, clientType string, protocol string) (err error) {
// Record last update on success.
defer func() {
if err == nil {
c.recordUpdate()
}
}()

srvPKs := make([]cipher.PubKey, 0, len(c.sessions))

_, err = c.dc.Entry(ctx, c.pk)
if err != nil {
entry := disc.NewClientEntry(c.pk, 0, srvPKs)
entry.ClientType = clientType
entry.Protocol = protocol
if err := entry.Sign(c.sk); err != nil {
return err
}
return c.dc.PostEntry(ctx, entry)
}
return nil
}

func (c *EntityCommon) updateClientEntry(ctx context.Context, done chan struct{}, clientType string) (err error) {
if isClosed(done) {
return nil
Expand Down Expand Up @@ -295,6 +318,17 @@ func (c *EntityCommon) updateClientEntryLoop(ctx context.Context, done chan stru
}
}

func (c *EntityCommon) entryProtocol(ctx context.Context, pk cipher.PubKey) string {
entry, err := c.dc.Entry(ctx, pk)
if err != nil {
c.log.WithField("entry", entry).WithError(err).Warn("Entry not found, so return empty as protocol.\n")
return ""
}

c.log.WithField("entry", entry).Debug("Entry's protocol fetch.\n")
return entry.Protocol
}

func (c *EntityCommon) delEntry(ctx context.Context) (err error) {

entry, err := c.dc.Entry(ctx, c.pk)
Expand Down
24 changes: 23 additions & 1 deletion pkg/dmsg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"sync"
"time"

"github.com/hashicorp/yamux"
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher"
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/logging"
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/netutil"
"github.com/xtaci/smux"

"github.com/skycoin/dmsg/internal/servermetrics"
"github.com/skycoin/dmsg/pkg/disc"
Expand Down Expand Up @@ -214,7 +216,6 @@ func (s *Server) handleSession(conn net.Conn) {
}
return
}

log = log.WithField("remote_pk", dSes.RemotePK())
log.Info("Started session.")

Expand All @@ -223,6 +224,27 @@ func (s *Server) handleSession(conn net.Conn) {
awaitDone(ctx, s.done)
log.WithError(dSes.Close()).Info("Stopped session.")
}()
// detect visor protocol for dmsg
protocol := s.entryProtocol(ctx, dSes.RemotePK())

// based on protocol, create smux or yamux stream session
if protocol == "smux" {
dSes.sm.smux, err = smux.Server(conn, smux.DefaultConfig())
if err != nil {
cancel()
return
}
dSes.sm.addr = dSes.sm.smux.RemoteAddr()
log.Infof("smux stream session initial for %s", dSes.RemotePK().String())
} else {
dSes.sm.yamux, err = yamux.Server(conn, yamux.DefaultConfig())
if err != nil {
cancel()
return
}
dSes.sm.addr = dSes.sm.yamux.RemoteAddr()
log.Infof("yamux stream session initial for %s", dSes.RemotePK().String())
}

if s.setSession(ctx, dSes.SessionCommon) {
dSes.Serve()
Expand Down
84 changes: 57 additions & 27 deletions pkg/dmsg/server_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/hashicorp/yamux"
"github.com/sirupsen/logrus"
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/netutil"
"github.com/xtaci/smux"

"github.com/skycoin/dmsg/internal/servermetrics"
"github.com/skycoin/dmsg/pkg/noise"
Expand Down Expand Up @@ -44,30 +45,54 @@ func (ss *ServerSession) Close() error {
func (ss *ServerSession) Serve() {
ss.m.RecordSession(servermetrics.DeltaConnect) // record successful connection
defer ss.m.RecordSession(servermetrics.DeltaDisconnect) // record disconnection

for {
yStr, err := ss.ys.AcceptStream()
if err != nil {
switch err {
case yamux.ErrSessionShutdown, io.EOF:
ss.log.WithError(err).Info("Stopping session...")
default:
ss.log.WithError(err).Warn("Failed to accept stream, stopping session...")
if ss.sm.smux != nil {
for {
sStr, err := ss.sm.smux.AcceptStream()
if err != nil {
switch err {
case io.EOF:
ss.log.WithError(err).Info("Stopping session...")
default:
ss.log.WithError(err).Warn("Failed to accept stream, stopping session...")
}
return
}
return

log := ss.log.WithField("smux_id", sStr.ID())
log.Info("Initiating stream.")

go func(sStr *smux.Stream) {
err := ss.serveStream(log, sStr, ss.sm.addr)
log.WithError(err).Info("Stopped stream.")
}(sStr)
}
} else {
for {
yStr, err := ss.sm.yamux.AcceptStream()
if err != nil {
switch err {
case yamux.ErrSessionShutdown, io.EOF:
ss.log.WithError(err).Info("Stopping session...")
default:
ss.log.WithError(err).Warn("Failed to accept stream, stopping session...")
}
return
}

log := ss.log.WithField("yamux_id", yStr.StreamID())
log.Info("Initiating stream.")
log := ss.log.WithField("yamux_id", yStr.StreamID())
log.Info("Initiating stream.")

go func(yStr *yamux.Stream) {
err := ss.serveStream(log, yStr)
log.WithError(err).Info("Stopped stream.")
}(yStr)
go func(yStr *yamux.Stream) {
err := ss.serveStream(log, yStr, ss.sm.addr)
log.WithError(err).Info("Stopped stream.")
}(yStr)
}
}
}

func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr *yamux.Stream) error {
// struct

func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCloser, addr net.Addr) error {
readRequest := func() (StreamRequest, error) {
obj, err := ss.readObject(yStr)
if err != nil {
Expand Down Expand Up @@ -102,7 +127,7 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr *yamux.Stream)
if req.IPinfo && req.DstAddr.PK == ss.entity.LocalPK() {
log.Debug("Received IP stream request.")

ip, err := addrToIP(yStr.RemoteAddr())
ip, err := addrToIP(addr)
if err != nil {
ss.m.RecordStream(servermetrics.DeltaFailed) // record failed stream
return err
Expand Down Expand Up @@ -164,22 +189,27 @@ func addrToIP(addr net.Addr) (net.IP, error) {
}
}

func (ss *ServerSession) forwardRequest(req StreamRequest) (yStr *yamux.Stream, respObj SignedObject, err error) {
func (ss *ServerSession) forwardRequest(req StreamRequest) (mStr io.ReadWriteCloser, respObj SignedObject, err error) {
defer func() {
if err != nil && yStr != nil {
if err != nil && mStr != nil {
ss.log.
WithError(yStr.Close()).
WithError(mStr.Close()).
Debugf("After forwardRequest failed, the yamux stream is closed.")
}
}()

if yStr, err = ss.ys.OpenStream(); err != nil {
return nil, nil, err
if ss.sm.smux != nil {
if mStr, err = ss.sm.smux.OpenStream(); err != nil {
return nil, nil, err
}
} else {
if mStr, err = ss.sm.yamux.OpenStream(); err != nil {
return nil, nil, err
}
}
if err = ss.writeObject(yStr, req.raw); err != nil {
if err = ss.writeObject(mStr, req.raw); err != nil {
return nil, nil, err
}
if respObj, err = ss.readObject(yStr); err != nil {
if respObj, err = ss.readObject(mStr); err != nil {
return nil, nil, err
}
var resp StreamResponse
Expand All @@ -189,5 +219,5 @@ func (ss *ServerSession) forwardRequest(req StreamRequest) (yStr *yamux.Stream,
if err = resp.Verify(req); err != nil {
return nil, nil, err
}
return yStr, respObj, nil
return mStr, respObj, nil
}
Loading
Loading