Skip to content

Commit fabcf90

Browse files
Merge pull request #29 from geraldohomero/refactor
Refactor transcript and video processing functionality
2 parents 8b86929 + 747d45a commit fabcf90

File tree

11 files changed

+614
-451
lines changed

11 files changed

+614
-451
lines changed

api/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .youtube_client import YouTubeAPIClient
2+
3+
__all__ = ['YouTubeAPIClient']

api/youtube_client.py

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
import logging
2+
import time
3+
from datetime import datetime
4+
from typing import Optional, List, Dict, Any
5+
6+
from googleapiclient.discovery import build
7+
from googleapiclient.errors import HttpError
8+
9+
from config import get_api_key, rotate_api_key
10+
from models.data_models import ChannelDetails
11+
12+
class YouTubeAPIClient:
13+
def __init__(self):
14+
self.youtube = build('youtube', 'v3', developerKey=get_api_key())
15+
16+
def safe_execute(self, request) -> Dict[str, Any]:
17+
"""
18+
Executes a YouTube API request.
19+
Rotates API key and reattempts the request if quota is exceeded.
20+
"""
21+
try:
22+
return request.execute()
23+
except HttpError as e:
24+
error_content = e.content.decode('utf-8')
25+
if e.resp.status == 403 and "quotaExceeded" in error_content:
26+
logging.info("Quota exceeded. Rotating API key...")
27+
new_api_key = rotate_api_key()
28+
self.youtube = build('youtube', 'v3', developerKey=new_api_key)
29+
return request.execute()
30+
else:
31+
logging.error("YouTube API error: %s", e)
32+
raise
33+
34+
def get_channel_details(self, channel_id: str) -> Optional[ChannelDetails]:
35+
"""
36+
Fetch channel details from YouTube API by channel ID.
37+
Returns ChannelDetails or None on error.
38+
"""
39+
try:
40+
request = self.youtube.channels().list(
41+
part="snippet,statistics",
42+
id=channel_id
43+
)
44+
response = self.safe_execute(request)
45+
items = response.get('items', [])
46+
if items:
47+
channel = items[0]
48+
return ChannelDetails(
49+
channel_id=channel_id,
50+
channel_name=channel['snippet']['title'],
51+
subscriber_count=int(channel['statistics']['subscriberCount'])
52+
)
53+
else:
54+
logging.warning("No channel found with id: %s", channel_id)
55+
except Exception as e:
56+
logging.error("Error fetching channel details for id %s: %s", channel_id, e)
57+
return None
58+
59+
def get_video_details(self, video_id: str, channel_id: str) -> Optional[Dict[str, Any]]:
60+
"""
61+
Get detailed video information from YouTube API.
62+
Returns a dictionary with video details or None on error.
63+
Adds a 'commentsEnabled' flag to indicate if comments are available.
64+
"""
65+
try:
66+
channel_id = channel_id.strip() if channel_id else channel_id
67+
68+
request = self.youtube.videos().list(
69+
part="snippet,statistics",
70+
id=video_id
71+
)
72+
response = self.safe_execute(request)
73+
if response.get('items'):
74+
video = response['items'][0]
75+
published_at = datetime.strptime(video['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%SZ')
76+
comments_enabled = 'commentCount' in video['statistics']
77+
78+
return {
79+
'videoId': video_id,
80+
'channelId': channel_id,
81+
'videoTitle': video['snippet']['title'],
82+
'videoAudio': None,
83+
'viewCount': int(video['statistics'].get('viewCount', 0)),
84+
'likeCount': int(video['statistics'].get('likeCount', 0)),
85+
'commentCount': int(video['statistics']['commentCount']) if comments_enabled else 0,
86+
'publishedAt': published_at.strftime('%Y-%m-%d %H:%M:%S'),
87+
'commentsEnabled': comments_enabled
88+
}
89+
else:
90+
logging.warning("No video details found for video id: %s", video_id)
91+
return None
92+
except Exception as e:
93+
logging.error("Error fetching video details for video id %s: %s", video_id, e)
94+
return None
95+
96+
def get_video_comments(self, video_id: str) -> List[Dict[str, Any]]:
97+
"""
98+
Fetch video comments (and their replies) from YouTube API.
99+
Returns a list of comment dictionaries.
100+
If comments are disabled, logs a concise message and returns an empty list.
101+
"""
102+
comments = []
103+
next_page_token = None
104+
current_date = datetime.now().date()
105+
106+
try:
107+
while True:
108+
try:
109+
request = self.youtube.commentThreads().list(
110+
part="snippet,replies",
111+
videoId=video_id,
112+
maxResults=100,
113+
pageToken=next_page_token
114+
)
115+
response = self.safe_execute(request)
116+
except HttpError as e:
117+
error_message = e.content.decode("utf-8") if e.content else ""
118+
if e.resp.status == 403 and "commentsDisabled" in error_message:
119+
logging.info("Comments are disabled for video %s. Skipping.", video_id)
120+
return []
121+
else:
122+
logging.error("Error fetching comments for video %s. Skipping.", video_id)
123+
return []
124+
125+
for item in response.get("items", []):
126+
top_comment = item["snippet"]["topLevelComment"]
127+
comment_id = top_comment["id"]
128+
comment_snippet = top_comment["snippet"]
129+
comment_data = {
130+
"commentId": comment_id,
131+
"videoId": video_id,
132+
"parentCommentId": None,
133+
"userId": comment_snippet["authorChannelId"]["value"],
134+
"userName": comment_snippet["authorDisplayName"],
135+
"content": comment_snippet["textDisplay"],
136+
"likeCount": comment_snippet["likeCount"],
137+
"publishedAt": datetime.strptime(comment_snippet["publishedAt"], '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d %H:%M:%S'),
138+
"collectedDate": current_date
139+
}
140+
comments.append(comment_data)
141+
142+
# Process replies if any
143+
for reply in item.get("replies", {}).get("comments", []):
144+
reply_snippet = reply["snippet"]
145+
comments.append({
146+
"commentId": reply["id"],
147+
"videoId": video_id,
148+
"parentCommentId": comment_id,
149+
"userId": reply_snippet["authorChannelId"]["value"],
150+
"userName": reply_snippet["authorDisplayName"],
151+
"content": reply_snippet["textDisplay"],
152+
"likeCount": reply_snippet["likeCount"],
153+
"publishedAt": datetime.strptime(reply_snippet["publishedAt"], '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d %H:%M:%S'),
154+
"collectedDate": current_date
155+
})
156+
157+
next_page_token = response.get("nextPageToken")
158+
if not next_page_token:
159+
break
160+
161+
except Exception:
162+
logging.error("Unexpected error fetching comments for video %s. Skipping.", video_id)
163+
return []
164+
165+
return comments
166+
167+
def get_channel_videos(self, channel_id: str) -> List[str]:
168+
"""
169+
Get list of all video IDs for a given channel using the uploads playlist.
170+
"""
171+
video_ids = []
172+
next_page_token = None
173+
174+
try:
175+
# uploads playlist ID for this channel
176+
channel_request = self.youtube.channels().list(
177+
part="contentDetails",
178+
id=channel_id
179+
)
180+
channel_response = self.safe_execute(channel_request)
181+
182+
if not channel_response.get('items'):
183+
logging.error("Could not find channel with ID: %s", channel_id)
184+
return []
185+
186+
# Extract the uploads playlist ID
187+
uploads_playlist_id = channel_response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
188+
logging.info("Found uploads playlist ID: %s for channel: %s", uploads_playlist_id, channel_id)
189+
190+
# fetch all videos from this playlist
191+
while True:
192+
playlist_request = self.youtube.playlistItems().list(
193+
part="snippet",
194+
playlistId=uploads_playlist_id,
195+
maxResults=50,
196+
pageToken=next_page_token
197+
)
198+
playlist_response = self.safe_execute(playlist_request)
199+
200+
for item in playlist_response.get('items', []):
201+
video_ids.append(item['snippet']['resourceId']['videoId'])
202+
203+
next_page_token = playlist_response.get('nextPageToken')
204+
if not next_page_token:
205+
break
206+
207+
# Add a slight delay to respect API quota
208+
time.sleep(0.5)
209+
210+
# Retrieve and log channel name along with video count
211+
channel_response = self.youtube.channels().list(
212+
part="snippet",
213+
id=channel_id
214+
).execute()
215+
if channel_response.get("items"):
216+
channel_name = channel_response["items"][0]["snippet"]["title"]
217+
logging.info("Found %d videos for channel '%s' (ID: %s)", len(video_ids), channel_name, channel_id)
218+
else:
219+
logging.info("Found %d videos for channel (ID: %s)", len(video_ids), channel_id)
220+
except Exception as e:
221+
logging.error("Error fetching channel videos for channel %s: %s", channel_id, e)
222+
223+
return video_ids

data/transcriptions/transcript.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,47 @@ def get_transcript(video_id: str) -> tuple:
8383
except TranscriptsDisabled:
8484
return False, "Transcripts are disabled for this video", None
8585
except Exception as e:
86-
return False, f"Error retrieving transcript: {str(e)}", None
86+
# If an error occurs, try one more time unless it's a known non-retriable error
87+
error_msg = str(e).lower()
88+
if "video is no longer available" in error_msg or "video unavailable" in error_msg:
89+
return False, "Video is no longer available", None
90+
if "age-restricted" in error_msg or "age restricted" in error_msg:
91+
return False, "Video is age restricted", None
92+
try:
93+
# Retry once
94+
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
95+
# Repeat the same logic as above for retry
96+
manual_transcript = None
97+
for transcript in transcript_list:
98+
if not transcript.is_generated:
99+
if transcript.language_code == 'pt-BR' or transcript.language_code == 'pt':
100+
transcript_data = transcript.fetch()
101+
return True, format_transcript_text(transcript_data), transcript.language_code
102+
elif transcript.language_code.startswith('en'):
103+
manual_transcript = (transcript.fetch(), transcript.language_code)
104+
if manual_transcript:
105+
return True, format_transcript_text(manual_transcript[0]), manual_transcript[1]
106+
try:
107+
transcript = transcript_list.find_transcript(['pt', 'pt-BR'])
108+
transcript_data = transcript.fetch()
109+
return True, format_transcript_text(transcript_data), transcript.language_code
110+
except:
111+
try:
112+
transcript = transcript_list.find_transcript(['en', 'en-US', 'en-GB'])
113+
transcript_data = transcript.fetch()
114+
return True, format_transcript_text(transcript_data), transcript.language_code
115+
except:
116+
transcript = transcript_list.find_transcript(['pt', 'en', 'es'])
117+
transcript_data = transcript.fetch()
118+
return True, format_transcript_text(transcript_data), transcript.language_code
119+
except Exception as retry_e:
120+
retry_msg = str(retry_e).lower()
121+
if "video is unavailable" in retry_msg or "video unavailable" in retry_msg:
122+
return False, "Video is no longer available", None
123+
if "age-restricted" in retry_msg or "age restricted" in retry_msg:
124+
return False, "Video is age restricted", None
125+
return False, f"Error retrieving transcript after retry: {str(retry_e)}", None
126+
return False, f"Error retrieving transcript for {video_id}: {str(e)}", None
87127

88128
def format_transcript_text(transcript_data: list) -> str:
89129
"""Format transcript data into readable text with timestamps."""
@@ -217,7 +257,7 @@ def main():
217257
processed_count = 0
218258

219259
# Use a thread pool with max 5 workers
220-
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
260+
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
221261
# Submit all tasks to the executor
222262
future_to_video = {
223263
executor.submit(process_video_transcript, video_id): video_id

database/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .db_manager import DatabaseManager
2+
3+
__all__ = ['DatabaseManager']

0 commit comments

Comments
 (0)