1515package storage
1616
1717import (
18+ "container/list"
1819 "context"
1920 "errors"
2021 "fmt"
@@ -34,6 +35,10 @@ import (
3435const (
3536 mrdCommandChannelSize = 1
3637 mrdResponseChannelSize = 100
38+ // This should never be hit in practice, but is a safety valve to prevent
39+ // unbounded memory usage if the user is adding ranges faster than they
40+ // can be processed.
41+ mrdAddInternalQueueMaxSize = 50000
3742)
3843
3944// --- internalMultiRangeDownloader Interface ---
@@ -83,18 +88,19 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
8388
8489 // Create the manager
8590 manager := & multiRangeDownloaderManager {
86- ctx : mCtx ,
87- cancel : cancel ,
88- client : c ,
89- settings : s ,
90- params : params ,
91- cmds : make (chan mrdCommand , mrdCommandChannelSize ),
92- sessionResps : make (chan mrdSessionResult , mrdResponseChannelSize ),
93- pendingRanges : make (map [int64 ]* rangeRequest ),
94- readIDCounter : 1 ,
95- readSpec : readSpec ,
96- attrsReady : make (chan struct {}),
97- spanCtx : ctx ,
91+ ctx : mCtx ,
92+ cancel : cancel ,
93+ client : c ,
94+ settings : s ,
95+ params : params ,
96+ cmds : make (chan mrdCommand , mrdCommandChannelSize ),
97+ sessionResps : make (chan mrdSessionResult , mrdResponseChannelSize ),
98+ pendingRanges : make (map [int64 ]* rangeRequest ),
99+ readIDCounter : 1 ,
100+ readSpec : readSpec ,
101+ attrsReady : make (chan struct {}),
102+ spanCtx : ctx ,
103+ unsentRequests : newRequestQueue (),
98104 }
99105
100106 mrd := & MultiRangeDownloader {
@@ -227,6 +233,7 @@ type multiRangeDownloaderManager struct {
227233 attrsOnce sync.Once
228234 spanCtx context.Context
229235 callbackWg sync.WaitGroup
236+ unsentRequests * requestQueue
230237}
231238
232239type rangeRequest struct {
@@ -374,10 +381,29 @@ func (m *multiRangeDownloaderManager) eventLoop() {
374381 }
375382
376383 for {
384+ var nextReq * storagepb.BidiReadObjectRequest
385+ var targetChan chan <- * storagepb.BidiReadObjectRequest
386+
387+ // Only try to send if we have queued requests
388+ if m .unsentRequests .Len () > 0 && m .currentSession != nil {
389+ nextReq = m .unsentRequests .Front ()
390+ if nextReq != nil {
391+ targetChan = m .currentSession .reqC
392+ }
393+ }
394+ // Only read from cmds if we have space in the unsentRequests queue.
395+ var cmdsChan chan mrdCommand
396+ if m .unsentRequests .Len () < mrdAddInternalQueueMaxSize {
397+ cmdsChan = m .cmds
398+ }
377399 select {
378400 case <- m .ctx .Done ():
379401 return
380- case cmd := <- m .cmds :
402+ // This path only triggers if space is available in the channel.
403+ // It never blocks the eventLoop.
404+ case targetChan <- nextReq :
405+ m .unsentRequests .RemoveFront ()
406+ case cmd := <- cmdsChan :
381407 cmd .apply (m .ctx , m )
382408 if _ , ok := cmd .(* mrdCloseCmd ); ok {
383409 return
@@ -386,7 +412,7 @@ func (m *multiRangeDownloaderManager) eventLoop() {
386412 m .processSessionResult (result )
387413 }
388414
389- if len (m .pendingRanges ) == 0 {
415+ if len (m .pendingRanges ) == 0 && m . unsentRequests . Len () == 0 {
390416 for _ , waiter := range m .waiters {
391417 close (waiter )
392418 }
@@ -512,7 +538,7 @@ func (m *multiRangeDownloaderManager) handleAddCmd(ctx context.Context, cmd *mrd
512538 ReadId : req .readID ,
513539 }},
514540 }
515- m .currentSession . SendRequest (protoReq )
541+ m .unsentRequests . PushBack (protoReq )
516542}
517543
518544func (m * multiRangeDownloaderManager ) convertToPositiveOffset (req * rangeRequest ) error {
@@ -655,7 +681,8 @@ func (m *multiRangeDownloaderManager) ensureSession(ctx context.Context) error {
655681 }
656682 }
657683 if len (rangesToResend ) > 0 {
658- m .currentSession .SendRequest (& storagepb.BidiReadObjectRequest {ReadRanges : rangesToResend })
684+ retryReq := & storagepb.BidiReadObjectRequest {ReadRanges : rangesToResend }
685+ m .unsentRequests .PushFront (retryReq )
659686 }
660687 return nil
661688 }, m .settings .retry , true )
@@ -900,3 +927,28 @@ func readerAttrsFromObject(o *ObjectAttrs) ReaderObjectAttrs {
900927 CRC32C : o .CRC32C ,
901928 }
902929}
930+
931+ type requestQueue struct {
932+ l * list.List
933+ }
934+
935+ func newRequestQueue () * requestQueue {
936+ return & requestQueue {l : list .New ()}
937+ }
938+
939+ func (q * requestQueue ) PushBack (r * storagepb.BidiReadObjectRequest ) { q .l .PushBack (r ) }
940+ func (q * requestQueue ) PushFront (r * storagepb.BidiReadObjectRequest ) { q .l .PushFront (r ) }
941+ func (q * requestQueue ) Len () int { return q .l .Len () }
942+
943+ func (q * requestQueue ) Front () * storagepb.BidiReadObjectRequest {
944+ if f := q .l .Front (); f != nil {
945+ return f .Value .(* storagepb.BidiReadObjectRequest )
946+ }
947+ return nil
948+ }
949+
950+ func (q * requestQueue ) RemoveFront () {
951+ if f := q .l .Front (); f != nil {
952+ q .l .Remove (f )
953+ }
954+ }
0 commit comments