feat: Add ability to bundle all records from one micro-batch into PutRecords#86
feat: Add ability to bundle all records from one micro-batch into PutRecords#86leslieyanyan wants to merge 6 commits intoqubole:masterfrom
Conversation
|
@leslieyanyan - thanks for the PR. Kinesis Sink was indeed very slow. @abhishekd0907 has taken a shot in reducing the latency in this PR #81. We are using KPL underneath which takes care of aggregation and sending multiple records in the same API call. Can you try the last master code and see if the new change in making a difference. |
|
@itsvikramagr Thank you for the suggestion.
|
…utRecords request
|
|
||
| private def bundleExecute(iterator: Iterator[InternalRow]): Unit = { | ||
|
|
||
| val groupedIterator: iterator.GroupedIterator[InternalRow] = iterator.grouped(490) |
There was a problem hiding this comment.
what is 490 here? Should it be configurable?
README.md
Outdated
| | kinesis.executor.aggregationEnabled | true | Specify if records should be aggregated before sending them to Kinesis | | ||
| | kniesis.executor.flushwaittimemillis | 100 | Wait time while flushing records to Kinesis on Task End | | ||
| | kinesis.executor.flushwaittimemillis | 100 | Wait time while flushing records to Kinesis on Task End | | ||
| | kinesis.executor.sink.bundle.records | false | Bundle all records from one micro-batch into PutRecords | |
There was a problem hiding this comment.
we have added "kinesis.executor.recordTtl" - can we add details about this config here
|
|
||
| Futures.addCallback(future, kinesisCallBack) | ||
|
|
||
| producer.flushSync() |
There was a problem hiding this comment.
@leslieyanyan @itsvikramagr
The slowness is on account of this function call producer.flushSync(). Please refer my comment here: #81 (review)
The new code in this PR is showing improved performance because method sendBundledData() doesn't have this function call producer.flushSync()
We'll need to separately evaluate how much performance impact we're getting by using GroupedIterator instead of normal iterator.
The current Kinesis sink only sends one record into Kinesis stream each time, the writing speed is very slow.
With the changes in this PR, we could bundle all records from one micro-batch into PutRecords. We've tested the changes in our production environment, the writing speed and efficiency improved a lot when enabling
kinesis.executor.sink.bundle.records