@@ -89,7 +89,6 @@ type Options struct {
8989type Client interface {
9090 Manifest (ctx context.Context ) (agentsdk.Manifest , error )
9191 Listen (ctx context.Context ) (drpc.Conn , error )
92- DERPMapUpdates (ctx context.Context ) (<- chan agentsdk.DERPMapUpdate , io.Closer , error )
9392 ReportStats (ctx context.Context , log slog.Logger , statsChan <- chan * agentsdk.Stats , setInterval func (time.Duration )) (io.Closer , error )
9493 PostLifecycle (ctx context.Context , state agentsdk.PostLifecycleRequest ) error
9594 PostAppHealth (ctx context.Context , req agentsdk.PostAppHealthsRequest ) error
@@ -822,10 +821,22 @@ func (a *agent) run(ctx context.Context) error {
822821 network .SetBlockEndpoints (manifest .DisableDirectConnections )
823822 }
824823
824+ // Listen returns the dRPC connection we use for both Coordinator and DERPMap updates
825+ conn , err := a .client .Listen (ctx )
826+ if err != nil {
827+ return err
828+ }
829+ defer func () {
830+ cErr := conn .Close ()
831+ if cErr != nil {
832+ a .logger .Debug (ctx , "error closing drpc connection" , slog .Error (err ))
833+ }
834+ }()
835+
825836 eg , egCtx := errgroup .WithContext (ctx )
826837 eg .Go (func () error {
827838 a .logger .Debug (egCtx , "running tailnet connection coordinator" )
828- err := a .runCoordinator (egCtx , network )
839+ err := a .runCoordinator (egCtx , conn , network )
829840 if err != nil {
830841 return xerrors .Errorf ("run coordinator: %w" , err )
831842 }
@@ -834,7 +845,7 @@ func (a *agent) run(ctx context.Context) error {
834845
835846 eg .Go (func () error {
836847 a .logger .Debug (egCtx , "running derp map subscriber" )
837- err := a .runDERPMapSubscriber (egCtx , network )
848+ err := a .runDERPMapSubscriber (egCtx , conn , network )
838849 if err != nil {
839850 return xerrors .Errorf ("run derp map subscriber: %w" , err )
840851 }
@@ -1056,21 +1067,8 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t
10561067
10571068// runCoordinator runs a coordinator and returns whether a reconnect
10581069// should occur.
1059- func (a * agent ) runCoordinator (ctx context.Context , network * tailnet.Conn ) error {
1060- ctx , cancel := context .WithCancel (ctx )
1061- defer cancel ()
1062-
1063- conn , err := a .client .Listen (ctx )
1064- if err != nil {
1065- return err
1066- }
1067- defer func () {
1068- cErr := conn .Close ()
1069- if cErr != nil {
1070- a .logger .Debug (ctx , "error closing drpc connection" , slog .Error (err ))
1071- }
1072- }()
1073-
1070+ func (a * agent ) runCoordinator (ctx context.Context , conn drpc.Conn , network * tailnet.Conn ) error {
1071+ defer a .logger .Debug (ctx , "disconnected from coordination RPC" )
10741072 tClient := tailnetproto .NewDRPCTailnetClient (conn )
10751073 coordinate , err := tClient .Coordinate (ctx )
10761074 if err != nil {
@@ -1082,7 +1080,7 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10821080 a .logger .Debug (ctx , "error closing Coordinate client" , slog .Error (err ))
10831081 }
10841082 }()
1085- a .logger .Info (ctx , "connected to coordination endpoint " )
1083+ a .logger .Info (ctx , "connected to coordination RPC " )
10861084 coordination := tailnet .NewRemoteCoordination (a .logger , coordinate , network , uuid .Nil )
10871085 select {
10881086 case <- ctx .Done ():
@@ -1093,30 +1091,29 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10931091}
10941092
10951093// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
1096- func (a * agent ) runDERPMapSubscriber (ctx context.Context , network * tailnet.Conn ) error {
1094+ func (a * agent ) runDERPMapSubscriber (ctx context.Context , conn drpc.Conn , network * tailnet.Conn ) error {
1095+ defer a .logger .Debug (ctx , "disconnected from derp map RPC" )
10971096 ctx , cancel := context .WithCancel (ctx )
10981097 defer cancel ()
1099-
1100- updates , closer , err := a . client . DERPMapUpdates (ctx )
1098+ tClient := tailnetproto . NewDRPCTailnetClient ( conn )
1099+ stream , err := tClient . StreamDERPMaps (ctx , & tailnetproto. StreamDERPMapsRequest {} )
11011100 if err != nil {
1102- return err
1101+ return xerrors . Errorf ( "stream DERP Maps: %w" , err )
11031102 }
1104- defer closer .Close ()
1105-
1106- a .logger .Info (ctx , "connected to derp map endpoint" )
1103+ defer func () {
1104+ cErr := stream .Close ()
1105+ if cErr != nil {
1106+ a .logger .Debug (ctx , "error closing DERPMap stream" , slog .Error (err ))
1107+ }
1108+ }()
1109+ a .logger .Info (ctx , "connected to derp map RPC" )
11071110 for {
1108- select {
1109- case <- ctx .Done ():
1110- return ctx .Err ()
1111- case update := <- updates :
1112- if update .Err != nil {
1113- return update .Err
1114- }
1115- if update .DERPMap != nil && ! tailnet .CompareDERPMaps (network .DERPMap (), update .DERPMap ) {
1116- a .logger .Info (ctx , "updating derp map due to detected changes" )
1117- network .SetDERPMap (update .DERPMap )
1118- }
1111+ dmp , err := stream .Recv ()
1112+ if err != nil {
1113+ return xerrors .Errorf ("recv DERPMap error: %w" , err )
11191114 }
1115+ dm := tailnet .DERPMapFromProto (dmp )
1116+ network .SetDERPMap (dm )
11201117 }
11211118}
11221119
0 commit comments