@@ -66,7 +66,7 @@ func TestStreamingPullMultipleFetches(t *testing.T) {
66
66
67
67
func testStreamingPullIteration (t * testing.T , client * Client , server * mockServer , msgs []* pb.ReceivedMessage ) {
68
68
sub := client .Subscription ("S" )
69
- gotMsgs , err := pullN (context .Background (), sub , len (msgs ), func (_ context.Context , m * Message ) {
69
+ gotMsgs , err := pullN (context .Background (), sub , len (msgs ), 0 , func (_ context.Context , m * Message ) {
70
70
id , err := strconv .Atoi (msgAckID (m ))
71
71
if err != nil {
72
72
t .Fatalf ("pullN err: %v" , err )
@@ -196,7 +196,7 @@ func TestStreamingPullRetry(t *testing.T) {
196
196
197
197
sub := client .Subscription ("S" )
198
198
sub .ReceiveSettings .NumGoroutines = 1
199
- gotMsgs , err := pullN (context .Background (), sub , len (testMessages ), func (_ context.Context , m * Message ) {
199
+ gotMsgs , err := pullN (context .Background (), sub , len (testMessages ), 0 , func (_ context.Context , m * Message ) {
200
200
id , err := strconv .Atoi (msgAckID (m ))
201
201
if err != nil {
202
202
t .Fatalf ("pullN err: %v" , err )
@@ -297,7 +297,7 @@ func TestStreamingPullConcurrent(t *testing.T) {
297
297
sub := client .Subscription ("S" )
298
298
ctx , cancel := context .WithTimeout (context .Background (), time .Second )
299
299
defer cancel ()
300
- gotMsgs , err := pullN (ctx , sub , nMessages , func (ctx context.Context , m * Message ) {
300
+ gotMsgs , err := pullN (ctx , sub , nMessages , 0 , func (ctx context.Context , m * Message ) {
301
301
m .Ack ()
302
302
})
303
303
if c := status .Convert (err ); err != nil && c .Code () != codes .Canceled {
@@ -513,7 +513,8 @@ func newMock(t *testing.T) (*Client, *mockServer) {
513
513
}
514
514
515
515
// pullN calls sub.Receive until at least n messages are received.
516
- func pullN (ctx context.Context , sub * Subscription , n int , f func (context.Context , * Message )) ([]* Message , error ) {
516
+ // Wait a provided duration before cancelling.
517
+ func pullN (ctx context.Context , sub * Subscription , n int , wait time.Duration , f func (context.Context , * Message )) ([]* Message , error ) {
517
518
var (
518
519
mu sync.Mutex
519
520
msgs []* Message
@@ -526,6 +527,9 @@ func pullN(ctx context.Context, sub *Subscription, n int, f func(context.Context
526
527
mu .Unlock ()
527
528
f (ctx , m )
528
529
if nSeen >= n {
530
+ // Wait a specified amount of time so that for exactly once delivery,
531
+ // Acks aren't cancelled immediately.
532
+ time .Sleep (wait )
529
533
cancel ()
530
534
}
531
535
})
0 commit comments