FunkyBot/libs/Twitch.py

85 lines
3.2 KiB
Python
Raw Normal View History

import datetime
from datetime import timedelta
import json
import requests
class Twitch:
def __init__(self, client_id, client_secret):
self.access_token = None
self.access_token_expires = None
self.client_id = client_id
self.client_secret = client_secret
self.online_users = {}
self.config = {}
async def get_unix_time(self):
now = datetime.datetime.now()
future = now + timedelta(weeks=3)
unx_time = future.timestamp()
print("unix time:", unx_time)
return int(unx_time)
async def get_access_token(self):
params = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "client_credentials"
}
response = requests.post("https://id.twitch.tv/oauth2/token", params=params)
print("response:", response.json())
self.access_token = response.json()["access_token"]
self.access_token_expires = await self.get_unix_time()
return self.access_token, self.access_token_expires
async def get_users(self, login_names):
params = {"login": login_names}
headers = {
"Authorization": f"Bearer {self.access_token}",
"Client-Id": self.client_id
}
response = requests.get("https://api.twitch.tv/helix/users", params=params, headers=headers)
return {entry["login"]: entry["id"] for entry in response.json()["data"]}
async def get_notifications(self, watchlist):
users = await self.get_users(watchlist)
streams = await self.get_streams(users)
online_notifications = []
offline_notifications = []
for user_name in watchlist:
if user_name not in self.online_users:
self.online_users[user_name] = None
if user_name not in streams:
if self.online_users[user_name] is not None:
offline_notifications.append(user_name)
self.online_users[user_name] = None
else:
if self.online_users[user_name] is None:
self.online_users[user_name] = datetime.datetime.now(datetime.UTC)
started_at = datetime.datetime.fromisoformat(streams[user_name]["started_at"])
if started_at < self.online_users[user_name]:
online_notifications.append(streams[user_name])
self.online_users[user_name] = started_at
return online_notifications, offline_notifications
async def get_streams(self, users):
params = {"user_id": users.values()}
headers = {
"Authorization": f"Bearer {self.access_token}",
"Client-Id": self.client_id
}
response = requests.get("https://api.twitch.tv/helix/streams", params=params, headers=headers)
return {entry["user_login"]: entry for entry in response.json()["data"]}
async def check_access_token(self):
current_time = datetime.datetime.now().timestamp()
if int(current_time) >= self.access_token_expires:
await self.get_access_token()
return self.access_token, self.access_token_expires