Skip to content

Commit b54cbe9

Browse files
authored
[IMPROVE/#343] RSS reader 수집 경로를 bounded executor 기반으로 개선 (#344)
* improve: RSS 크롤링 시 WEB Client 동작을 taskExecutor 방식으로 개선 * test: 관련 테스트 진행 * test: 통합 테스트를 관련 클래스를 상속하도록 변경
1 parent b24b786 commit b54cbe9

File tree

4 files changed

+426
-17
lines changed

4 files changed

+426
-17
lines changed

src/main/java/com/techfork/domain/source/batch/RssFeedReader.java

Lines changed: 103 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,37 +12,81 @@
1212
import com.techfork.domain.source.entity.TechBlog;
1313
import com.techfork.domain.source.repository.TechBlogRepository;
1414
import com.techfork.global.util.ContentCleaner;
15-
import lombok.RequiredArgsConstructor;
1615
import lombok.extern.slf4j.Slf4j;
1716
import org.springframework.batch.core.configuration.annotation.StepScope;
1817
import org.springframework.batch.item.ItemReader;
18+
import org.springframework.beans.factory.annotation.Autowired;
19+
import org.springframework.beans.factory.annotation.Qualifier;
20+
import org.springframework.core.task.AsyncTaskExecutor;
1921
import org.springframework.stereotype.Component;
2022
import org.springframework.web.reactive.function.client.WebClient;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
2125

2226
import java.io.ByteArrayInputStream;
2327
import java.io.InputStream;
2428
import java.time.LocalDateTime;
2529
import java.time.ZoneId;
2630
import java.util.Date;
31+
import java.util.LinkedHashMap;
2732
import java.util.List;
33+
import java.util.Map;
2834
import java.util.Set;
35+
import java.util.concurrent.ExecutionException;
36+
import java.util.concurrent.Future;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.TimeoutException;
2939
import java.util.regex.Matcher;
3040
import java.util.regex.Pattern;
3141
import java.util.stream.Stream;
3242

33-
@Slf4j
3443
@Component
3544
@StepScope
36-
@RequiredArgsConstructor
45+
@Slf4j
3746
public class RssFeedReader implements ItemReader<RssFeedItem> {
3847

48+
private static final int RSS_FETCH_TASK_TIMEOUT_SECONDS = 45;
49+
3950
private final TechBlogRepository techBlogRepository;
4051
private final PostRepository postRepository;
4152
private final WebClient webClient;
53+
@Qualifier("rssFetchTaskExecutor")
54+
private final AsyncTaskExecutor rssFetchTaskExecutor;
55+
private final int rssFetchTaskTimeoutSeconds;
4256

4357
private List<RssFeedItem> items;
4458
private int currentIndex = 0;
4559

60+
@Autowired
61+
public RssFeedReader(
62+
TechBlogRepository techBlogRepository,
63+
PostRepository postRepository,
64+
WebClient webClient,
65+
@Qualifier("rssFetchTaskExecutor") AsyncTaskExecutor rssFetchTaskExecutor
66+
) {
67+
this(
68+
techBlogRepository,
69+
postRepository,
70+
webClient,
71+
rssFetchTaskExecutor,
72+
RSS_FETCH_TASK_TIMEOUT_SECONDS
73+
);
74+
}
75+
76+
RssFeedReader(
77+
TechBlogRepository techBlogRepository,
78+
PostRepository postRepository,
79+
WebClient webClient,
80+
@Qualifier("rssFetchTaskExecutor") AsyncTaskExecutor rssFetchTaskExecutor,
81+
int rssFetchTaskTimeoutSeconds
82+
) {
83+
this.techBlogRepository = techBlogRepository;
84+
this.postRepository = postRepository;
85+
this.webClient = webClient;
86+
this.rssFetchTaskExecutor = rssFetchTaskExecutor;
87+
this.rssFetchTaskTimeoutSeconds = rssFetchTaskTimeoutSeconds;
88+
}
89+
4690
@Override
4791
public RssFeedItem read() {
4892
if (items == null) {
@@ -61,30 +105,69 @@ private void initializeItems() {
61105
List<TechBlog> techBlogs = techBlogRepository.findAll();
62106
log.info("총 {}개 테크 블로그 RSS 수집 시작", techBlogs.size());
63107

64-
List<RssFeedItem> allItems = techBlogs.parallelStream()
65-
.flatMap(techBlog -> {
66-
try {
67-
List<RssFeedItem> feedItems = fetchRssFeed(techBlog);
68-
log.info("[{}] RSS 수집 성공: {}개", techBlog.getCompanyName(), feedItems.size());
69-
return feedItems.stream();
70-
} catch (Exception e) {
71-
log.error("[{}] RSS 수집 실패: {}", techBlog.getCompanyName(), e.getMessage());
72-
return Stream.empty();
73-
}
74-
})
108+
List<FeedFetchTask> fetchTasks = techBlogs.stream()
109+
.map(this::submitFetchTask)
110+
.toList();
111+
112+
List<RssFeedItem> allItems = fetchTasks.stream()
113+
.flatMap(this::collectFeedItems)
75114
.toList();
76115

77116
Set<String> existingUrls = postRepository.findExistingUrls(
78117
allItems.stream().map(RssFeedItem::url).toList()
79118
);
80119

81-
items = allItems.stream()
82-
.filter(item -> !existingUrls.contains(item.url()))
83-
.toList();
120+
Map<String, RssFeedItem> uniqueItemsByUrl = new LinkedHashMap<>();
121+
for (RssFeedItem item : allItems) {
122+
if (!existingUrls.contains(item.url())) {
123+
uniqueItemsByUrl.putIfAbsent(item.url(), item);
124+
}
125+
}
126+
127+
items = List.copyOf(uniqueItemsByUrl.values());
84128

85129
log.info("RSS 수집 초기화 완료: 총 {}개 아이템", items.size());
86130
}
87131

132+
private FeedFetchTask submitFetchTask(TechBlog techBlog) {
133+
Future<List<RssFeedItem>> future = rssFetchTaskExecutor.submit(() -> fetchFeedSafely(techBlog));
134+
return new FeedFetchTask(techBlog, future);
135+
}
136+
137+
private List<RssFeedItem> fetchFeedSafely(TechBlog techBlog) {
138+
try {
139+
List<RssFeedItem> feedItems = fetchRssFeed(techBlog);
140+
log.info("[{}] RSS 수집 성공: {}개", techBlog.getCompanyName(), feedItems.size());
141+
return feedItems;
142+
} catch (Exception e) {
143+
log.error("[{}] RSS 수집 실패: {}", techBlog.getCompanyName(), e.getMessage());
144+
return List.of();
145+
}
146+
}
147+
148+
private Stream<RssFeedItem> collectFeedItems(FeedFetchTask fetchTask) {
149+
try {
150+
return fetchTask.future()
151+
.get(rssFetchTaskTimeoutSeconds, TimeUnit.SECONDS)
152+
.stream();
153+
} catch (InterruptedException e) {
154+
Thread.currentThread().interrupt();
155+
log.error("[{}] RSS 수집 대기 중 인터럽트 발생", fetchTask.techBlog().getCompanyName(), e);
156+
return Stream.empty();
157+
} catch (TimeoutException e) {
158+
boolean cancelled = fetchTask.future().cancel(true);
159+
log.error("[{}] RSS 수집 타임아웃: {}초 (cancelled={})",
160+
fetchTask.techBlog().getCompanyName(),
161+
rssFetchTaskTimeoutSeconds,
162+
cancelled);
163+
return Stream.empty();
164+
} catch (ExecutionException e) {
165+
Throwable cause = e.getCause() != null ? e.getCause() : e;
166+
log.error("[{}] RSS 수집 Future 처리 실패: {}", fetchTask.techBlog().getCompanyName(), cause.getMessage());
167+
return Stream.empty();
168+
}
169+
}
170+
88171
private List<RssFeedItem> fetchRssFeed(TechBlog techBlog) throws Exception {
89172
// WebClient로 RSS 피드 다운로드
90173
byte[] responseBytes = webClient.get()
@@ -259,4 +342,7 @@ private String extractImageFromHtml(String htmlContent) {
259342

260343
return null;
261344
}
345+
346+
private record FeedFetchTask(TechBlog techBlog, Future<List<RssFeedItem>> future) {
347+
}
262348
}

src/main/java/com/techfork/domain/source/config/RssCrawlingJobConfig.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,15 @@
2222
import org.springframework.batch.item.ItemWriter;
2323
import org.springframework.context.annotation.Bean;
2424
import org.springframework.context.annotation.Configuration;
25+
import org.springframework.core.task.AsyncTaskExecutor;
2526
import org.springframework.core.task.TaskExecutor;
2627
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
2728
import org.springframework.transaction.PlatformTransactionManager;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2831

2932
import java.util.concurrent.Future;
33+
import java.util.concurrent.ThreadPoolExecutor;
3034

3135
/**
3236
* RSS 크롤링 Job 설정
@@ -162,6 +166,21 @@ public ItemWriter<Future<PostDocument>> asyncEmbeddingWriter() {
162166
return asyncItemWriter;
163167
}
164168

169+
@Bean(name = "rssFetchTaskExecutor")
170+
public AsyncTaskExecutor rssFetchTaskExecutor() {
171+
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
172+
executor.setCorePoolSize(4);
173+
executor.setMaxPoolSize(8);
174+
executor.setQueueCapacity(32);
175+
executor.setThreadNamePrefix("rss-fetch-");
176+
executor.setTaskDecorator(new MdcTaskDecorator());
177+
executor.setWaitForTasksToCompleteOnShutdown(true);
178+
executor.setAwaitTerminationSeconds(60);
179+
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
180+
executor.initialize();
181+
return executor;
182+
}
183+
165184
@Bean
166185
public TaskExecutor summaryTaskExecutor() {
167186
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

0 commit comments

Comments
 (0)