Skip to content

Commit b4b0211

Browse files
committed
fix bug: insert to cache only when stream's kv cache is computed
1 parent 2b822b8 commit b4b0211

File tree

4 files changed

+18
-3
lines changed

4 files changed

+18
-3
lines changed

rtp_llm/cpp/engine_base/stream/GenerateStream.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,10 @@ bool GenerateStream::finished() {
680680
return generate_status_->status == StreamState::FINISHED;
681681
}
682682

683+
bool GenerateStream::isRemoteRunningWithoutLock() {
684+
return generate_status_->status == StreamState::REMOTE_RUNNING;
685+
}
686+
683687
bool GenerateStream::needRemoteGenerate() const {
684688
std::lock_guard<std::mutex> lock(*output_mutex_);
685689
return need_remote_generate_;

rtp_llm/cpp/engine_base/stream/GenerateStream.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ class GenerateStream {
221221
bool finishedWithoutLock();
222222
void cancelIfNotRunning();
223223
void setFinishedWithoutLock();
224+
bool isRemoteRunningWithoutLock();
224225
bool needRemoteGenerate() const;
225226
bool setRemoteGenerate();
226227
size_t iterCount() const;

rtp_llm/cpp/engine_base/stream/StreamCacheResource.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ void StreamCacheResource::freeBatchBlocks(size_t batch_id, vector<int>& blocks)
2121
bool should_reuse_cache =
2222
reuseCache() && (!stream_->hasNumBeams() || (!stream_->stoppedWithoutLock() && batch_id == 0));
2323
// TODO(zhangjianning.zjn) cache all beams of beam search
24-
if (blocks.size() == batch_resource_.blockSize(batch_id) && should_reuse_cache) {
24+
if (blocks.size() == batch_resource_.blockSize(batch_id) && should_reuse_cache
25+
&& (stream_->finishedWithoutLock() || stream_->isRemoteRunningWithoutLock())) {
2526
reConstructCacheKeys();
2627
auto tokens_id = stream_->completeTokenIdsVec(batch_id);
2728
const auto& cache_keys = stream_->cacheKeys(batch_id);
2829
vector<float> loss;
2930
if (stream_->getLoss()) {
3031
loss = rtp_llm::buffer2vector<float>(*(stream_->getLoss()));
3132
}
32-
// TODO(xinfei.sxf) 一些场景调用了cancel的地方,是否应该free with cache
3333
CacheManager::FreeInfo free_info(stream_->streamId(),
3434
tokens_id,
3535
cache_keys,

rtp_llm/cpp/engine_base/stream/test/StreamCacheResourceTest.cc

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ TEST_F(StreamCacheResourceTest, testReuseCache) {
307307
batch_tokens_2[stream_->seqLength() - 3] = 9;
308308
batch_tokens_2[stream_->seqLength() - 2] = 10;
309309

310-
// stream_->reConstructCacheKeys();
310+
stream_->setFinishedWithoutLock();
311311
stream_->releaseResource();
312312

313313
ASSERT_EQ(cache_manager_->freeBlockNums(), 3);
@@ -361,6 +361,7 @@ TEST_F(StreamCacheResourceTest, testReuseCache) {
361361
tokens_2[stream_->seqLength() - 2] = 14;
362362
tokens_2[stream_->seqLength() - 1] = 15;
363363

364+
stream_->setFinishedWithoutLock();
364365
stream_->releaseResource();
365366
ASSERT_EQ(cache_manager_->freeBlockNums(), 2);
366367
ASSERT_EQ(cache_manager_->cacheItemNum(), 3);
@@ -377,6 +378,7 @@ TEST_F(StreamCacheResourceTest, testReuseCacheWithFastGen) {
377378
stream_->setIsContextStream(false);
378379
ASSERT_TRUE(resource.incrKVBlock(token_capacity).ok());
379380
ASSERT_EQ(cache_manager_->freeBlockNums(), 3);
381+
stream_->setFinishedWithoutLock();
380382
stream_->releaseResource();
381383
ASSERT_EQ(cache_manager_->freeBlockNums(), 5);
382384

@@ -426,6 +428,7 @@ TEST_F(StreamCacheResourceTest, testReuseCacheWithFastGen) {
426428
ASSERT_EQ(cache_manager_->freeBlockNums(), 1);
427429

428430
// partial fallback
431+
stream_->setFinishedWithoutLock();
429432
stream_->tryReleaseKVBlock(2);
430433
stream_->setPaused();
431434
ASSERT_EQ(stream_->maxBlockSize(), 2);
@@ -440,6 +443,7 @@ TEST_F(StreamCacheResourceTest, testReuseCacheWithFastGen) {
440443
ASSERT_EQ(stream_->maxBlockSize(), 3);
441444

442445
// full fallback
446+
stream_->setFinishedWithoutLock();
443447
stream_->tryReleaseKVBlock(stream_->maxBlockSize());
444448
stream_->setPaused();
445449
ASSERT_EQ(cache_manager_->freeBlockNums(), 3);
@@ -456,6 +460,7 @@ TEST_F(StreamCacheResourceTest, testReuseCacheWithFastGen) {
456460
ASSERT_EQ(cache_manager_->freeBlockNums(), 1);
457461
ASSERT_EQ(stream_->maxBlockSize(), 4);
458462

463+
stream_->setFinishedWithoutLock();
459464
stream_->releaseResource();
460465
ASSERT_EQ(cache_manager_->availableBlockNums(), 8);
461466
}
@@ -472,6 +477,7 @@ TEST_F(StreamCacheResourceTest, testTryReleaseKVBlock) {
472477
ASSERT_EQ(allocator_->blockRefCounter().getRefCounter(2), 2);
473478
ASSERT_EQ(allocator_->blockRefCounter().getRefCounter(3), 2);
474479

480+
stream_->setFinishedWithoutLock();
475481
resource.tryReleaseKVBlock(1);
476482
ASSERT_EQ(cache_manager_->freeBlockNums(), 6);
477483
ASSERT_EQ(resource.maxBlockSize(), 2);
@@ -480,6 +486,7 @@ TEST_F(StreamCacheResourceTest, testTryReleaseKVBlock) {
480486
ASSERT_EQ(allocator_->blockRefCounter().getRefCounter(2), 2);
481487
ASSERT_EQ(allocator_->blockRefCounter().getRefCounter(3), 0);
482488

489+
stream_->setFinishedWithoutLock();
483490
resource.tryReleaseKVBlock(2);
484491
ASSERT_EQ(cache_manager_->freeBlockNums(), 8);
485492
ASSERT_EQ(resource.maxBlockSize(), 0);
@@ -499,6 +506,7 @@ TEST_F(StreamCacheResourceTest, testTryReleaseKVBlock) {
499506
ASSERT_EQ(allocator_->blockRefCounter().getRefCounter(5), 0);
500507
ASSERT_EQ(allocator_->blockRefCounter().getRefCounter(6), 0);
501508

509+
stream_->setFinishedWithoutLock();
502510
resource.tryReleaseKVBlock(2);
503511
ASSERT_EQ(cache_manager_->freeBlockNums(), 7);
504512
ASSERT_EQ(resource.maxBlockSize(), 1);
@@ -509,11 +517,13 @@ TEST_F(StreamCacheResourceTest, testTryReleaseKVBlock) {
509517
auto tokens_2 = stream_->complete_token_ids_->data(1);
510518
tokens_2[0] = 2;
511519

520+
stream_->setFinishedWithoutLock();
512521
resource.tryReleaseKVBlock(1);
513522
ASSERT_EQ(cache_manager_->freeBlockNums(), 7);
514523
ASSERT_EQ(resource.maxBlockSize(), 0);
515524
ASSERT_EQ(cache_manager_->cacheItemNum(), 2);
516525

526+
stream_->setFinishedWithoutLock();
517527
resource.tryReleaseKVBlock(1);
518528
ASSERT_EQ(cache_manager_->freeBlockNums(), 7);
519529
ASSERT_EQ(resource.maxBlockSize(), 0);

0 commit comments

Comments
 (0)