Skip to content

Commit 4444129

Browse files
authored
fix: set stream_ack_deadline to max_duration_per_lease_extension or 60 s, set ack_deadline to min_duration_per_lease_extension or 10 s (#760)
1 parent b986393 commit 4444129

File tree

2 files changed

+106
-14
lines changed

2 files changed

+106
-14
lines changed

‎google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py‎

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import logging
2121
import threading
2222
import typing
23-
from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple, Union
23+
from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple
2424
import uuid
2525

2626
import grpc # type: ignore
@@ -74,6 +74,15 @@
7474
a subscription. We do this to reduce premature ack expiration.
7575
"""
7676

77+
_DEFAULT_STREAM_ACK_DEADLINE: float = 60
78+
"""The default stream ack deadline in seconds."""
79+
80+
_MAX_STREAM_ACK_DEADLINE: float = 600
81+
"""The maximum stream ack deadline in seconds."""
82+
83+
_MIN_STREAM_ACK_DEADLINE: float = 10
84+
"""The minimum stream ack deadline in seconds."""
85+
7786
_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = {
7887
code_pb2.DEADLINE_EXCEEDED,
7988
code_pb2.RESOURCE_EXHAUSTED,
@@ -270,7 +279,36 @@ def __init__(
270279
self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
271280
self._ack_histogram = histogram.Histogram()
272281
self._last_histogram_size = 0
273-
self._ack_deadline: Union[int, float] = histogram.MIN_ACK_DEADLINE
282+
283+
# If max_duration_per_lease_extension is the default
284+
# we set the stream_ack_deadline to the default of 60
285+
if self._flow_control.max_duration_per_lease_extension == 0:
286+
self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE
287+
# We will not be able to extend more than the default minimum
288+
elif (
289+
self._flow_control.max_duration_per_lease_extension
290+
< _MIN_STREAM_ACK_DEADLINE
291+
):
292+
self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE
293+
# Will not be able to extend past the max
294+
elif (
295+
self._flow_control.max_duration_per_lease_extension
296+
> _MAX_STREAM_ACK_DEADLINE
297+
):
298+
self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE
299+
else:
300+
self._stream_ack_deadline = (
301+
self._flow_control.max_duration_per_lease_extension
302+
)
303+
304+
self._ack_deadline = max(
305+
min(
306+
self._flow_control.min_duration_per_lease_extension,
307+
histogram.MAX_ACK_DEADLINE,
308+
),
309+
histogram.MIN_ACK_DEADLINE,
310+
)
311+
274312
self._rpc: Optional[bidi.ResumableBidiRpc] = None
275313
self._callback: Optional[functools.partial] = None
276314
self._closing = threading.Lock()
@@ -741,10 +779,10 @@ def heartbeat(self) -> bool:
741779

742780
if send_new_ack_deadline:
743781
request = gapic_types.StreamingPullRequest(
744-
stream_ack_deadline_seconds=self.ack_deadline
782+
stream_ack_deadline_seconds=self._stream_ack_deadline
745783
)
746784
_LOGGER.debug(
747-
"Sending new ack_deadline of %d seconds.", self.ack_deadline
785+
"Sending new ack_deadline of %d seconds.", self._stream_ack_deadline
748786
)
749787
else:
750788
request = gapic_types.StreamingPullRequest()
@@ -796,7 +834,7 @@ def open(
796834

797835
_LOGGER.debug(
798836
"Creating a stream, default ACK deadline set to {} seconds.".format(
799-
stream_ack_deadline_seconds
837+
self._stream_ack_deadline
800838
)
801839
)
802840

@@ -928,6 +966,8 @@ def _get_initial_request(
928966
suitable for any other purpose).
929967
"""
930968
# Put the request together.
969+
# We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt
970+
# anyway. Set to some big-ish value in case we modack late.
931971
request = gapic_types.StreamingPullRequest(
932972
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
933973
modify_deadline_ack_ids=[],

‎tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py‎

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,16 +107,61 @@ def test_constructor_and_default_state():
107107
assert manager._client_id is not None
108108

109109

110-
def test_constructor_with_options():
110+
def test_constructor_with_default_options():
111+
flow_control_ = types.FlowControl()
111112
manager = streaming_pull_manager.StreamingPullManager(
112113
mock.sentinel.client,
113114
mock.sentinel.subscription,
114-
flow_control=mock.sentinel.flow_control,
115+
flow_control=flow_control_,
115116
scheduler=mock.sentinel.scheduler,
116117
)
117118

118-
assert manager.flow_control == mock.sentinel.flow_control
119+
assert manager.flow_control == flow_control_
119120
assert manager._scheduler == mock.sentinel.scheduler
121+
assert manager._ack_deadline == 10
122+
assert manager._stream_ack_deadline == 60
123+
124+
125+
def test_constructor_with_min_and_max_duration_per_lease_extension_():
126+
flow_control_ = types.FlowControl(
127+
min_duration_per_lease_extension=15, max_duration_per_lease_extension=20
128+
)
129+
manager = streaming_pull_manager.StreamingPullManager(
130+
mock.sentinel.client,
131+
mock.sentinel.subscription,
132+
flow_control=flow_control_,
133+
scheduler=mock.sentinel.scheduler,
134+
)
135+
assert manager._ack_deadline == 15
136+
assert manager._stream_ack_deadline == 20
137+
138+
139+
def test_constructor_with_min_duration_per_lease_extension_too_low():
140+
flow_control_ = types.FlowControl(
141+
min_duration_per_lease_extension=9, max_duration_per_lease_extension=9
142+
)
143+
manager = streaming_pull_manager.StreamingPullManager(
144+
mock.sentinel.client,
145+
mock.sentinel.subscription,
146+
flow_control=flow_control_,
147+
scheduler=mock.sentinel.scheduler,
148+
)
149+
assert manager._ack_deadline == 10
150+
assert manager._stream_ack_deadline == 10
151+
152+
153+
def test_constructor_with_max_duration_per_lease_extension_too_high():
154+
flow_control_ = types.FlowControl(
155+
max_duration_per_lease_extension=601, min_duration_per_lease_extension=601
156+
)
157+
manager = streaming_pull_manager.StreamingPullManager(
158+
mock.sentinel.client,
159+
mock.sentinel.subscription,
160+
flow_control=flow_control_,
161+
scheduler=mock.sentinel.scheduler,
162+
)
163+
assert manager._ack_deadline == 600
164+
assert manager._stream_ack_deadline == 600
120165

121166

122167
def make_manager(**kwargs):
@@ -164,9 +209,13 @@ def test__obtain_ack_deadline_no_custom_flow_control_setting():
164209
manager._flow_control = types.FlowControl(
165210
min_duration_per_lease_extension=0, max_duration_per_lease_extension=0
166211
)
212+
assert manager._stream_ack_deadline == 60
213+
assert manager._ack_deadline == 10
214+
assert manager._obtain_ack_deadline(maybe_update=False) == 10
167215

168216
deadline = manager._obtain_ack_deadline(maybe_update=True)
169217
assert deadline == histogram.MIN_ACK_DEADLINE
218+
assert manager._stream_ack_deadline == 60
170219

171220
# When we get some historical data, the deadline is adjusted.
172221
manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 2)
@@ -186,11 +235,14 @@ def test__obtain_ack_deadline_with_max_duration_per_lease_extension():
186235
manager._flow_control = types.FlowControl(
187236
max_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE + 1
188237
)
238+
assert manager._ack_deadline == 10
239+
189240
manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large
190241

191242
# The deadline configured in flow control should prevail.
192243
deadline = manager._obtain_ack_deadline(maybe_update=True)
193244
assert deadline == histogram.MIN_ACK_DEADLINE + 1
245+
assert manager._stream_ack_deadline == 60
194246

195247

196248
def test__obtain_ack_deadline_with_min_duration_per_lease_extension():
@@ -292,12 +344,12 @@ def test__obtain_ack_deadline_no_value_update():
292344

293345
def test_client_id():
294346
manager1 = make_manager()
295-
request1 = manager1._get_initial_request(stream_ack_deadline_seconds=10)
347+
request1 = manager1._get_initial_request(stream_ack_deadline_seconds=60)
296348
client_id_1 = request1.client_id
297349
assert client_id_1
298350

299351
manager2 = make_manager()
300-
request2 = manager2._get_initial_request(stream_ack_deadline_seconds=10)
352+
request2 = manager2._get_initial_request(stream_ack_deadline_seconds=60)
301353
client_id_2 = request2.client_id
302354
assert client_id_2
303355

@@ -308,7 +360,7 @@ def test_streaming_flow_control():
308360
manager = make_manager(
309361
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)
310362
)
311-
request = manager._get_initial_request(stream_ack_deadline_seconds=10)
363+
request = manager._get_initial_request(stream_ack_deadline_seconds=60)
312364
assert request.max_outstanding_messages == 10
313365
assert request.max_outstanding_bytes == 1000
314366

@@ -318,7 +370,7 @@ def test_streaming_flow_control_use_legacy_flow_control():
318370
flow_control=types.FlowControl(max_messages=10, max_bytes=1000),
319371
use_legacy_flow_control=True,
320372
)
321-
request = manager._get_initial_request(stream_ack_deadline_seconds=10)
373+
request = manager._get_initial_request(stream_ack_deadline_seconds=60)
322374
assert request.max_outstanding_messages == 0
323375
assert request.max_outstanding_bytes == 0
324376

@@ -1046,12 +1098,12 @@ def test_heartbeat_stream_ack_deadline_seconds(caplog):
10461098
result = manager.heartbeat()
10471099

10481100
manager._rpc.send.assert_called_once_with(
1049-
gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=10)
1101+
gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=60)
10501102
)
10511103
assert result
10521104
# Set to false after a send is initiated.
10531105
assert not manager._send_new_ack_deadline
1054-
assert "Sending new ack_deadline of 10 seconds." in caplog.text
1106+
assert "Sending new ack_deadline of 60 seconds." in caplog.text
10551107

10561108

10571109
@mock.patch("google.api_core.bidi.ResumableBidiRpc", autospec=True)

0 commit comments

Comments
 (0)