Adding maxBytesPerTrigger tag for Pulsar Admission Control#151
Adding maxBytesPerTrigger tag for Pulsar Admission Control#151nlu90 merged 38 commits intostreamnative:masterfrom
Conversation
|
@ericm-db Thanks for picking this idea up! Since we do not use Pulsar anymore my work on the other PR was seriously down-prioritized. I'm glad though that this might be present in the connector in some form, since I personally think that this is very useful for eg. some CDC usecases. |
072e45b to
e7e87b6
Compare
src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
Outdated
Show resolved
Hide resolved
fdfa436 to
dbbbb68
Compare
src/test/scala/org/apache/spark/sql/pulsar/PulsarSourceSuiteBase.scala
Outdated
Show resolved
Hide resolved
|
Please also update documentation for admin url and maxBytesPerTrigger in README.md |
| } | ||
| val newTopics = topicPartitions.toSet.diff(existingStartOffsets.keySet) | ||
| val startPartitionOffsets = existingStartOffsets ++ newTopics.map(topicPartition | ||
| => topicPartition -> MessageId.earliest) |
There was a problem hiding this comment.
I still have some concern here.
For newly discovered topic, admission control start from Message.earliest(-1, -1, -1), this assume that all ledgers exposed by the stats are readable. Is this assumption valid? @nlu90
src/test/scala/org/apache/spark/sql/pulsar/PulsarAdmissionControlHelper.scala
Outdated
Show resolved
Hide resolved
c72e42e to
305fb2c
Compare
| val (subscription, _) = extractSubscription(predefinedSubscription, tp) | ||
| CachedConsumer.getOrCreate(tp, subscription, client).seek(mid) | ||
| val consumer = CachedConsumer.getOrCreate(tp, subscription, client) | ||
| if (!consumer.isConnected) consumer.getLastMessageId |
There was a problem hiding this comment.
I know this is a bug that pulsar consumer do not attempt to reconnect when doing seek(), can you leave a comment here explaining why this change is needed and TODO that we will get rid of this once we upgraded to a version that has the fix?
Motivation
Some users that request Pulsar Spark connector also request that the Pulsar source has ratelimit functionality. They would like to control the rate of data processing and resource consumption of streaming queries that use the Pulsar source. This can be achieved by implementing admission control in pulsar source.
Modifications
Added a config called
maxBytesPerTriggerwhich allows users to configure how many bytes are consumed for each microbatch and shared between topic-partitionsVerifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
This change is already covered by existing tests, such as:
This change added tests and can be verified as follows: Run the PulsarAdmissionControlSuite
Documentation
Check the box below.
Need to update docs?
doc-requiredno-need-docdoc