Skip to content

Commit 9288814

Browse files
liferoadtvalentynsergiitk
authored
optimize grpc settings (#36528)
* increase grpc keepalive timeout and adjust ping settings Adjust GRPC channel settings to reduce ping frequency and allow more flexible keepalive behavior. This improves performance by reducing unnecessary network traffic while maintaining connection stability. * yapf * perf(subprocess_server): add grpc keepalive options to improve connection stability Add various grpc keepalive and ping-related options to prevent connection drops during long-running operations. The new settings help maintain active connections and detect failures faster. * perf(grpc): increase keepalive and ping intervals to reduce frequency Increase grpc.keepalive_time_ms from 30s to 60s and grpc.http2.min_sent_ping_interval_without_data_ms from 10s to 30s to reduce network overhead and improve performance * format * more changes * fix(milvus): increase timeout to 60s for container startup * fix(io): handle empty init_result in FileBasedSink by falling back to temp dir Add fallback logic when initialization result is EmptySideInput to create a temporary directory instead. This prevents potential issues when the pipeline initialization phase returns an empty collection. * retry Milvus * style: use string formatting in milvus search logging * fixed external tests * tests * fix(enrichment_test): sort output and expected values before comparison Ensure test passes when output order differs from expected order * docs(filebasedsink): add TODO comment for prism issue Add reference to GitHub issue #36563 for Prism compatibility * more tunes on the grpc options * addressed some comments * removed some options * keep 300000 for keepalive_timeout_ms * fixed the comments * added keepalive_time_ms back * Update sdks/python/apache_beam/utils/subprocess_server.py Co-authored-by: Sergii Tkachenko <[email protected]> * address comments. --------- Co-authored-by: tvalentyn <[email protected]> Co-authored-by: Sergii Tkachenko <[email protected]>
1 parent 63a8be9 commit 9288814

File tree

2 files changed

+22
-4
lines changed

2 files changed

+22
-4
lines changed

sdks/python/apache_beam/runners/worker/channel_factory.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,14 @@
2323

2424
class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
2525
DEFAULT_OPTIONS = [
26-
("grpc.keepalive_time_ms", 20000),
27-
("grpc.keepalive_timeout_ms", 300000),
26+
# Setting keepalive_time_ms is needed for other options to work.
27+
("grpc.keepalive_time_ms", 20_000),
28+
# Default: 20s. Increasing to 5 min.
29+
("grpc.keepalive_timeout_ms", 300_000),
30+
# Default: 2, set to 0 to allow unlimited pings without data
31+
("grpc.http2.max_pings_without_data", 0),
32+
# Default: False, set to True to allow keepalive pings when no calls
33+
("grpc.keepalive_permit_without_calls", True),
2834
]
2935

3036
def __init__(self):

sdks/python/apache_beam/utils/subprocess_server.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,20 @@ def start(self):
185185
try:
186186
process, endpoint = self.start_process()
187187
wait_secs = .1
188-
channel_options = [("grpc.max_receive_message_length", -1),
189-
("grpc.max_send_message_length", -1)]
188+
channel_options = [
189+
("grpc.max_receive_message_length", -1),
190+
("grpc.max_send_message_length", -1),
191+
# Default: 20000ms (20s), increased to 10 minutes for stability
192+
("grpc.keepalive_timeout_ms", 600_000),
193+
# Default: 2, set to 0 to allow unlimited pings without data
194+
("grpc.http2.max_pings_without_data", 0),
195+
# Default: False, set to True to allow keepalive pings when no calls
196+
("grpc.keepalive_permit_without_calls", True),
197+
# Default: 2, set to 0 to allow unlimited ping strikes
198+
("grpc.http2.max_ping_strikes", 0),
199+
# Default: 0 (disabled), enable socket reuse for better handling
200+
("grpc.so_reuseport", 1),
201+
]
190202
self._grpc_channel = grpc.insecure_channel(
191203
endpoint, options=channel_options)
192204
channel_ready = grpc.channel_ready_future(self._grpc_channel)

0 commit comments

Comments
 (0)