diff --git a/redbot/cogs/streams/errors.py b/redbot/cogs/streams/errors.py index 732b146d9f5..b0d0be5ccad 100644 --- a/redbot/cogs/streams/errors.py +++ b/redbot/cogs/streams/errors.py @@ -27,6 +27,10 @@ class InvalidYoutubeCredentials(StreamsError): pass +class InvalidKickCredentials(StreamsError): + pass + + class YoutubeQuotaExceeded(StreamsError): pass diff --git a/redbot/cogs/streams/streams.py b/redbot/cogs/streams/streams.py index 54e32f6ec11..517247c2002 100644 --- a/redbot/cogs/streams/streams.py +++ b/redbot/cogs/streams/streams.py @@ -1,3 +1,4 @@ +from operator import is_ import discord from redbot.core.utils.chat_formatting import humanize_list from redbot.core.bot import Red @@ -7,6 +8,7 @@ from redbot.core.utils.chat_formatting import escape, inline, pagify from .streamtypes import ( + KickStream, PicartoStream, Stream, TwitchStream, @@ -14,6 +16,7 @@ ) from .errors import ( APIError, + InvalidKickCredentials, InvalidTwitchCredentials, InvalidYoutubeCredentials, OfflineStream, @@ -51,6 +54,7 @@ class Streams(commands.Cog): "tokens": {}, "streams": [], "notified_owner_missing_twitch_secret": False, + "notified_owner_missing_kick_secret": False, } guild_defaults = { @@ -70,6 +74,7 @@ def __init__(self, bot: Red): super().__init__() self.config: Config = Config.get_conf(self, 26262626) self.ttv_bearer_cache: dict = {} + self.kick_bearer_cache: dict = {} self.config.register_global(**self.global_defaults) self.config.register_guild(**self.guild_defaults) self.config.register_role(**self.role_defaults) @@ -105,6 +110,8 @@ async def cog_load(self) -> None: async def on_red_api_tokens_update(self, service_name, api_tokens): if service_name == "twitch": await self.get_twitch_bearer_token(api_tokens) + elif service_name == "kick": + await self.get_kick_bearer_token(api_tokens) async def move_api_keys(self) -> None: """Move the API keys from cog stored config to core bot config if they exist.""" @@ -126,7 +133,7 @@ async def _notify_owner_about_missing_twitch_secret(self) -> None: "1. Go to this page: {link}.\n" '2. Click "Manage" on your application.\n' '3. Click on "New secret".\n' - "5. Copy your client ID and your client secret into:\n" + "4. Copy your client ID and your client secret into:\n" "{command}" "\n\n" "Note: These tokens are sensitive and should only be used in a private channel " @@ -142,6 +149,28 @@ async def _notify_owner_about_missing_twitch_secret(self) -> None: await send_to_owners_with_prefix_replaced(self.bot, message) await self.config.notified_owner_missing_twitch_secret.set(True) + async def _notify_owner_about_missing_kick_secret(self) -> None: + message = _( + "You need a client secret key if you want to use the Kick API on this cog.\n" + "Follow these steps:\n" + "1. Go to this page: {link}.\n" + '2. Click "Manage" on your application.\n' + "3. Copy your client ID and your client secret into:\n" + "{command}" + "\n\n" + "Note: These tokens are sensitive and should only be used in a private channel " + "or in DM with the bot." + ).format( + link="https://kick.com/settings/developer", + command=inline( + "[p]set api twitch client_id {} client_secret {}".format( + _(""), _("") + ) + ), + ) + await send_to_owners_with_prefix_replaced(self.bot, message) + await self.config.notified_owner_missing_kick_secret.set(True) + async def get_twitch_bearer_token(self, api_tokens: Optional[Dict] = None) -> None: tokens = ( await self.bot.get_shared_api_tokens("twitch") if api_tokens is None else api_tokens @@ -198,9 +227,64 @@ async def get_twitch_bearer_token(self, api_tokens: Optional[Dict] = None) -> No self.ttv_bearer_cache["expires_at"] = datetime.now().timestamp() + data.get("expires_in") async def maybe_renew_twitch_bearer_token(self) -> None: - if self.ttv_bearer_cache: - if self.ttv_bearer_cache["expires_at"] - datetime.now().timestamp() <= 60: - await self.get_twitch_bearer_token() + if ( + self.ttv_bearer_cache + and self.ttv_bearer_cache["expires_at"] - datetime.now().timestamp() <= 60 + ): + await self.get_twitch_bearer_token() + + async def get_kick_bearer_token(self, api_tokens: Optional[Dict] = None) -> None: + tokens = await self.bot.get_shared_api_tokens("kick") if api_tokens is None else api_tokens + if tokens.get("client_id"): + notified_owner_missing_kick_secret = ( + await self.config.notified_owner_missing_kick_secret() + ) + try: + tokens["client_secret"] + if notified_owner_missing_kick_secret is True: + await self.config.notified_owner_missing_kick_secret.set(False) + except KeyError: + if notified_owner_missing_kick_secret is False: + asyncio.create_task(self._notify_owner_about_missing_kick_secret()) + async with aiohttp.ClientSession() as session: + async with session.post( + "https://id.kick.com/oauth/token", + params={ + "client_id": tokens.get("client_id", ""), + "client_secret": tokens.get("client_secret", ""), + "grant_type": "client_credentials", + }, + ) as req: + try: + data = await req.json() + except aiohttp.ContentTypeError: + data = {} + + if req.status == 200: + pass + elif req.status == 401 and data.get("error") == "invalid_client": + log.error("Kick API request failed authentication: set Client ID is invalid.") + elif "error" in data: + log.error( + "Kick OAuth2 API request failed with status code %s and error message: %s", + req.status, + data["error"], + ) + else: + log.error("Kick OAuth2 API request failed with status code %s", req.status) + + if req.status != 200: + return + + self.kick_bearer_cache = data + self.kick_bearer_cache["expires_at"] = datetime.now().timestamp() + data.get("expires_in") + + async def maybe_renew_kick_token(self) -> None: + if ( + self.kick_bearer_cache + and self.kick_bearer_cache["expires_at"] - datetime.now().timestamp() <= 60 + ): + await self.get_kick_bearer_token() @commands.guild_only() @commands.command() @@ -242,10 +326,19 @@ async def picarto(self, ctx: commands.Context, channel_name: str): stream = PicartoStream(_bot=self.bot, name=channel_name) await self.check_online(ctx, stream) + @commands.guild_only() + @commands.command() + async def kickstream(self, ctx: commands.Context, channel_name: str): + """Check if a Kick channel is live.""" + await self.maybe_renew_kick_token() + token = self.kick_bearer_cache.get("access_token") + stream = _streamtypes.KickStream(_bot=self.bot, name=channel_name, token=token) + await self.check_online(ctx, stream) + async def check_online( self, ctx: commands.Context, - stream: Union[PicartoStream, YoutubeStream, TwitchStream], + stream: Union[PicartoStream, YoutubeStream, TwitchStream, KickStream], ): try: info = await stream.is_online() @@ -265,6 +358,12 @@ async def check_online( "The YouTube API key is either invalid or has not been set. See {command}." ).format(command=inline(f"{ctx.clean_prefix}streamset youtubekey")) ) + except InvalidKickCredentials: + await ctx.send( + _("The Kick API key is either invalid or has not been set. See {command}.").format( + command=inline(f"{ctx.clean_prefix}streamset kicktoken") + ) + ) except YoutubeQuotaExceeded: await ctx.send( _( @@ -363,6 +462,18 @@ async def picarto_alert( """Toggle alerts in this channel for a Picarto stream.""" await self.stream_alert(ctx, PicartoStream, channel_name, discord_channel) + @streamalert.command(name="kick") + async def kick_alert( + self, + ctx: commands.Context, + channel_name: str, + discord_channel: Union[ + discord.TextChannel, discord.VoiceChannel, discord.StageChannel + ] = commands.CurrentChannel, + ): + """Toggle alerts in this channel for a Kick stream.""" + await self.stream_alert(ctx, KickStream, channel_name, discord_channel) + @streamalert.command(name="stop", usage="[disable_all=No]") async def streamalert_stop(self, ctx: commands.Context, _all: bool = False): """Disable all stream alerts in this channel or server. @@ -435,6 +546,7 @@ async def stream_alert(self, ctx: commands.Context, _class, channel_name, discor token = await self.bot.get_shared_api_tokens(_class.token_name) is_yt = _class.__name__ == "YoutubeStream" is_twitch = _class.__name__ == "TwitchStream" + is_kick = _class.__name__ == "KickStream" if is_yt and not self.check_name_or_id(channel_name): stream = _class(_bot=self.bot, id=channel_name, token=token, config=self.config) elif is_twitch: @@ -445,6 +557,10 @@ async def stream_alert(self, ctx: commands.Context, _class, channel_name, discor token=token.get("client_id"), bearer=self.ttv_bearer_cache.get("access_token", None), ) + elif is_kick: + await self.maybe_renew_kick_token() + token = self.kick_bearer_cache.get("access_token") + stream = _class(_bot=self.bot, name=channel_name, token=token) else: if is_yt: stream = _class( @@ -464,8 +580,7 @@ async def stream_alert(self, ctx: commands.Context, _class, channel_name, discor except InvalidYoutubeCredentials: await ctx.send( _( - "The YouTube API key is either invalid or has not been set. See " - "{command}." + "The YouTube API key is either invalid or has not been set. See {command}." ).format(command=inline(f"{ctx.clean_prefix}streamset youtubekey")) ) return @@ -476,6 +591,13 @@ async def stream_alert(self, ctx: commands.Context, _class, channel_name, discor " Try again later or contact the owner if this continues." ) ) + except InvalidKickCredentials: + await ctx.send( + _( + "The Kick API key is either invalid or has not been set. See {command}." + ).format(command=inline(f"{ctx.clean_prefix}streamset kicktoken")) + ) + return except APIError as e: log.error( "Something went wrong whilst trying to contact the stream service's API.\n" @@ -537,6 +659,30 @@ async def twitchtoken(self, ctx: commands.Context): await ctx.maybe_send_embed(message) + @streamset.command() + @commands.is_owner() + async def kicktoken(self, ctx: commands.Context): + """Explain how to set the Kick token.""" + message = _( + "To get one, do the following:\n" + "1. Go to this page: {link}.\n" + "2. Click on *Create new*.\n" + "3. Fill the name and description, for *Redirection URL* add *http://localhost*.\n" + "4. Click on *Create Application*.\n" + "5. Copy your client ID and your client secret into:\n" + "{command}" + "\n\n" + "Note: These tokens are sensitive and should only be used in a private channel\n" + "or in DM with the bot.\n" + ).format( + link="https://kick.com/settings/developer", + command="`{}set api kick client_id {} client_secret {}`".format( + ctx.clean_prefix, _(""), _("") + ), + ) + + await ctx.maybe_send_embed(message) + @streamset.command() @commands.is_owner() async def youtubekey(self, ctx: commands.Context): @@ -826,8 +972,7 @@ async def check_streams(self): for stream in self.streams: try: try: - is_rerun = False - is_schedule = False + is_rerun, is_schedule = False, False if stream.__class__.__name__ == "TwitchStream": await self.maybe_renew_twitch_bearer_token() embed, is_rerun = await stream.is_online() @@ -835,6 +980,10 @@ async def check_streams(self): elif stream.__class__.__name__ == "YoutubeStream": embed, is_schedule = await stream.is_online() + elif stream.__class__.__name__ == "KickStream": + await self.maybe_renew_kick_token() + embed = await stream.is_online() + else: embed = await stream.is_online() except StreamNotFound: @@ -1008,6 +1157,8 @@ async def load_streams(self): if _class.__name__ == "TwitchStream": raw_stream["token"] = token.get("client_id") raw_stream["bearer"] = self.ttv_bearer_cache.get("access_token", None) + elif _class.__name__ == "KickStream": + raw_stream["token"] = self.kick_bearer_cache.get("access_token", None) else: if _class.__name__ == "YoutubeStream": raw_stream["config"] = self.config diff --git a/redbot/cogs/streams/streamtypes.py b/redbot/cogs/streams/streamtypes.py index 650df3d2d2b..e4a67ba9659 100644 --- a/redbot/cogs/streams/streamtypes.py +++ b/redbot/cogs/streams/streamtypes.py @@ -3,26 +3,28 @@ import json import logging import time -from dateutil.parser import parse as parse_time +import xml.etree.ElementTree as ET +from datetime import datetime, timezone from random import choice from string import ascii_letters -from datetime import datetime, timedelta, timezone -import xml.etree.ElementTree as ET -from typing import ClassVar, Optional, List, Tuple +from typing import ClassVar, List, Optional, Tuple import aiohttp import discord +from dateutil.parser import parse as parse_time + +from redbot.core.i18n import Translator +from redbot.core.utils.chat_formatting import humanize_number from .errors import ( APIError, - OfflineStream, + InvalidKickCredentials, InvalidTwitchCredentials, InvalidYoutubeCredentials, + OfflineStream, StreamNotFound, YoutubeQuotaExceeded, ) -from redbot.core.i18n import Translator -from redbot.core.utils.chat_formatting import humanize_number, humanize_timedelta TWITCH_BASE_URL = "https://api.twitch.tv" TWITCH_ID_ENDPOINT = TWITCH_BASE_URL + "/helix/users" @@ -35,6 +37,10 @@ YOUTUBE_VIDEOS_ENDPOINT = YOUTUBE_BASE_URL + "/videos" YOUTUBE_CHANNEL_RSS = "https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}" +KICK_BASE_URL = "https://api.kick.com/public/v1/" +KICK_USERS_ENDPOINT = KICK_BASE_URL + "users" +KICK_CHANNELS_ENDPOINT = KICK_BASE_URL + "channels" + _ = Translator("Streams", __file__) log = logging.getLogger("red.core.cogs.Streams") @@ -509,3 +515,108 @@ def make_embed(self, data): embed.set_footer(text=_("{adult}Category: {category} | Tags: {tags}").format(**data)) return embed + + +class KickStream(Stream): + token_name = "kick" + platform_name = "Kick" + + def __init__(self, **kwargs): + self.id = kwargs.pop("id", None) + self._display_name = None + self._token = kwargs.pop("token", None) + super().__init__(**kwargs) + + @property + def display_name(self) -> Optional[str]: + return self._display_name or self.name + + @display_name.setter + def display_name(self, value: str) -> None: + self._display_name = value + + async def get_data(self, url: str, params: dict = {}) -> Tuple[Optional[int], dict]: + if self._token is None: + raise InvalidKickCredentials() + + headers = {"Authorization": f"Bearer {self._token}"} + async with aiohttp.ClientSession() as session: + try: + async with session.get(url, headers=headers, params=params, timeout=60) as resp: + if resp.status != 200: + return resp.status, {} + + data = await resp.json(encoding="utf-8") + return resp.status, data["data"][0] if data["data"] else [] + except (aiohttp.ClientConnectionError, asyncio.TimeoutError) as exc: + log.warning("Connection error occurred when fetching Kick stream", exc_info=exc) + return None, {} + + async def is_online(self): + channel_code, channel_data = await self.get_data( + KICK_CHANNELS_ENDPOINT, {"slug": self.name} + ) + if not channel_data: + raise StreamNotFound() + + if channel_code == 200: + if channel_data["stream"]["is_live"] is False: + raise OfflineStream() + + self.id = channel_data["broadcaster_user_id"] + user_profile_data = await self._fetch_user_profile() + + final_data = dict.fromkeys( + ("game_name", "followers", "name", "slug", "profile_picture", "view_count") + ) + + if user_profile_data is not None: + final_data["user_name"] = self.display_name = user_profile_data["name"] + final_data["profile_picture"] = user_profile_data["profile_picture"] + + stream_data = channel_data["stream"] + final_data["game_name"] = channel_data["category"]["name"] + final_data["title"] = channel_data["stream_title"] + final_data["thumbnail_url"] = stream_data["thumbnail"] + final_data["view_count"] = stream_data["viewer_count"] + final_data["slug"] = channel_data["slug"] + + return self.make_embed(final_data) + elif channel_code == 401: + raise InvalidKickCredentials() + elif channel_code == 400: + raise StreamNotFound() + else: + raise APIError(channel_code, stream_data) + + async def _fetch_user_profile(self): + code, data = await self.get_data(KICK_USERS_ENDPOINT, {"id": self.id}) + if code == 200: + if not data: + raise StreamNotFound() + return data + elif code == 400: + raise StreamNotFound() + elif code == 401: + raise InvalidKickCredentials() + else: + raise APIError(code, data) + + def make_embed(self, data): + url = f"https://www.kick.com/{data['slug']}" if data["slug"] is not None else None + logo = ( + data["profile_picture"] or "https://www.google.com/s2/favicons?domain=kick.com&sz=256" + ) + status = data["title"] or _("Untitled broadcast") + embed = discord.Embed(title=status, url=url, color=0x00E701) + embed.set_author(name=data["user_name"]) + embed.add_field(name=_("Total views"), value=humanize_number(data["view_count"])) + embed.set_thumbnail(url=logo) + if data["thumbnail_url"]: + embed.set_image(url=rnd(data["thumbnail_url"])) + if data["game_name"]: + embed.set_footer(text=_("Playing: ") + data["game_name"]) + return embed + + def __repr__(self): + return "<{0.__class__.__name__}: {0.name} (ID: {0.id})>".format(self)