1
0
Fork 0
mirror of https://github.com/Findus23/RadioStats.git synced 2024-09-09 04:23:47 +02:00
RadioStats/spotify.py

104 lines
3.1 KiB
Python

import json
import re
import sys
from time import sleep
import requests
import sentry_sdk
from redis import Redis
from spotipy import CacheHandler, Spotify, SpotifyException
from spotipy.oauth2 import SpotifyClientCredentials
import config
from models import *
if config.sentryDSN:
client = sentry_sdk.init(dsn=config.sentryDSN)
class RedisCacheHandler(CacheHandler):
"""
based on https://github.com/plamere/spotipy/pull/747
"""
def __init__(self, redis):
self.redis = redis
def get_cached_token(self):
token_info = self.redis.get('token_info')
if token_info:
return json.loads(token_info)
def save_token_to_cache(self, token_info):
self.redis.set('token_info', json.dumps(token_info))
class CustomSpotify(Spotify):
def __del__(self):
if requests is not None and requests.Session is not None and isinstance(self._session, requests.Session):
self._session.close()
class CustomSpotifyClientCredentials(SpotifyClientCredentials):
def __del__(self):
if requests is not None and requests.Session is not None and isinstance(self._session, requests.Session):
self._session.close()
r = Redis(db=config.redisDB)
crm = CustomSpotifyClientCredentials(**config.spotify, cache_handler=RedisCacheHandler(r))
sp = CustomSpotify(client_credentials_manager=crm)
if len(sys.argv) > 1:
limit = int(sys.argv[1])
else:
limit = 50
query = Song.select().where((Song.show == 0))
if not len(sys.argv) > 3 or sys.argv[2] != "force":
query = query.where(Song.spotify_data.is_null())
else:
starting = int(sys.argv[3])
print("fetching empty starting from {id}".format(id=starting))
query = query.where((Song.spotify_data == 0) & (Song.id >= starting))
for song in query.limit(limit):
song.title = song.title.replace("+", " ")
print(song.title)
if song.artist.isupper():
song.artist = song.artist.title()
if song.title.isupper():
song.title = song.title.title()
print(song.id)
sleep(0.1)
searchtitle = re.sub("[\(\[].*?[\)\]]", "", song.title)
searchartist = song.artist.replace("&", "").replace("Feat.", "")
try:
results = sp.search(q=searchtitle + ' ' + searchartist, type='track', limit=1)
except SpotifyException:
if len(searchtitle + ' ' + searchartist) > 50:
song.spotify_data = False
song.save()
continue
else:
raise
if len(results["tracks"]["items"]) == 0:
song.spotify_data = False
print("not found")
else:
print("found")
track = results["tracks"]["items"][0]
song.spotify_url = track["external_urls"]["spotify"]
song.preview_url = track["preview_url"]
images = track["album"]["images"]
if len(images):
song.image_large = images[0]["url"]
song.image_small = images[-1]["url"]
song.spotify_data = True
# print(song.title)
# print(track["name"])
# print(song.artist)
# print(", ".join([a["name"] for a in track["artists"]]))
song.save()