1616package daemon
1717
1818import (
19- "fmt "
19+ "errors "
2020 "io"
21+ "sync/atomic"
2122
2223 "github.com/arduino/arduino-cli/arduino/monitors"
2324 rpc "github.com/arduino/arduino-cli/rpc/monitor"
@@ -39,7 +40,7 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
3940 // ensure it's a config message and not data
4041 config := msg .GetMonitorConfig ()
4142 if config == nil {
42- return fmt . Errorf ("first message must contain monitor configuration, not data" )
43+ return errors . New ("first message must contain monitor configuration, not data" )
4344 }
4445
4546 // select which type of monitor we need
@@ -61,13 +62,34 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
6162 if mon , err = monitors .OpenSerialMonitor (config .GetTarget (), int (baudRate )); err != nil {
6263 return err
6364 }
65+
66+ case rpc .MonitorConfig_NULL :
67+ if addCfg , ok := config .GetAdditionalConfig ().AsMap ()["OutputRate" ]; ! ok {
68+ mon = monitors .OpenNullMonitor (100.0 ) // 100 bytes per second as default
69+ } else if outputRate , ok := addCfg .(float64 ); ! ok {
70+ return errors .New ("OutputRate in Null monitor must be a float64" )
71+ } else {
72+ // get the Monitor instance
73+ mon = monitors .OpenNullMonitor (outputRate )
74+ }
6475 }
6576
6677 // we'll use these channels to communicate with the goroutines
6778 // handling the stream and the target respectively
6879 streamClosed := make (chan error )
6980 targetClosed := make (chan error )
7081
82+ // set rate limiting window
83+ bufferSize := int (config .GetRecvRateLimitBuffer ())
84+ rateLimitEnabled := (bufferSize > 0 )
85+ if ! rateLimitEnabled {
86+ bufferSize = 1024
87+ }
88+ buffer := make ([]byte , bufferSize )
89+ bufferUsed := 0
90+
91+ var writeSlots int32
92+
7193 // now we can read the other messages and re-route to the monitor...
7294 go func () {
7395 for {
@@ -84,6 +106,11 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
84106 break
85107 }
86108
109+ if rateLimitEnabled {
110+ // Increase rate limiter write slots
111+ atomic .AddInt32 (& writeSlots , msg .GetRecvAcknowledge ())
112+ }
113+
87114 if _ , err := mon .Write (msg .GetData ()); err != nil {
88115 // error writing to target
89116 targetClosed <- err
@@ -94,27 +121,58 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
94121
95122 // ...and read from the monitor and forward to the output stream
96123 go func () {
97- buf := make ([]byte , 8 )
124+ dropBuffer := make ([]byte , 10240 )
125+ dropped := 0
98126 for {
99- n , err := mon .Read (buf )
100- if err != nil {
101- // error reading from target
102- targetClosed <- err
103- break
127+ if bufferUsed < bufferSize {
128+ if n , err := mon .Read (buffer [bufferUsed :]); err != nil {
129+ // error reading from target
130+ targetClosed <- err
131+ break
132+ } else if n == 0 {
133+ // target was closed
134+ targetClosed <- nil
135+ break
136+ } else {
137+ bufferUsed += n
138+ }
139+ } else {
140+ // FIXME: a very rare condition but still...
141+ // we may be waiting here while, in the meantime, a transmit slot is
142+ // freed: in this case the (filled) buffer will stay in the server
143+ // until the following Read exits (-> the next char arrives from the
144+ // monitor).
145+
146+ if n , err := mon .Read (dropBuffer ); err != nil {
147+ // error reading from target
148+ targetClosed <- err
149+ break
150+ } else if n == 0 {
151+ // target was closed
152+ targetClosed <- nil
153+ break
154+ } else {
155+ dropped += n
156+ }
104157 }
105158
106- if n == 0 {
107- // target was closed
108- targetClosed <- nil
109- break
110- }
111-
112- if err = stream .Send (& rpc.StreamingOpenResp {
113- Data : buf [:n ],
114- }); err != nil {
115- // error sending to stream
116- streamClosed <- err
117- break
159+ slots := atomic .LoadInt32 (& writeSlots )
160+ if ! rateLimitEnabled || slots > 0 {
161+ if err = stream .Send (& rpc.StreamingOpenResp {
162+ Data : buffer [:bufferUsed ],
163+ Dropped : int32 (dropped ),
164+ }); err != nil {
165+ // error sending to stream
166+ streamClosed <- err
167+ break
168+ }
169+ bufferUsed = 0
170+ dropped = 0
171+
172+ // Rate limit, filling all the available window
173+ if rateLimitEnabled {
174+ slots = atomic .AddInt32 (& writeSlots , - 1 )
175+ }
118176 }
119177 }
120178 }()
0 commit comments