209 lines
6.7 KiB
Python
209 lines
6.7 KiB
Python
# Copyright 2017 Mycroft AI Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
import json
|
|
from copy import copy
|
|
|
|
import requests
|
|
|
|
|
|
MOPIDY_API = '/mopidy/rpc'
|
|
|
|
_base_dict = {'jsonrpc': '2.0', 'id': 1, 'params': {}}
|
|
|
|
|
|
class Mopidy:
|
|
def __init__(self, url):
|
|
self.is_playing = False
|
|
self.url = url + MOPIDY_API
|
|
self.volume = None
|
|
self.clear_list(force=True)
|
|
self.volume_low = 3
|
|
self.volume_high = 100
|
|
|
|
def find_artist(self, artist):
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.library.search'
|
|
d['params'] = {'artist': [artist]}
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
return r.json()['result'][1]['artists']
|
|
|
|
def get_playlists(self, filter=None):
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.playlists.as_list'
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
if filter is None:
|
|
return r.json()['result']
|
|
else:
|
|
return [l for l in r.json()['result'] if filter + ':' in l['uri']]
|
|
|
|
def find_album(self, album, filter=None):
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.library.search'
|
|
d['params'] = {'album': [album]}
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
lst = [res['albums'] for res in r.json()['result'] if 'albums' in res]
|
|
if filter is None:
|
|
return lst
|
|
else:
|
|
return [i for sl in lst for i in sl if filter + ':' in i['uri']]
|
|
|
|
def find_exact(self, uris='null'):
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.library.find_exact'
|
|
d['params'] = {'uris': uris}
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
return r.json()
|
|
|
|
def browse(self, uri):
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.library.browse'
|
|
d['params'] = {'uri': uri}
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
if 'result' in r.json():
|
|
return r.json()['result']
|
|
else:
|
|
return None
|
|
|
|
def clear_list(self, force=False):
|
|
if self.is_playing or force:
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.tracklist.clear'
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
return r
|
|
|
|
def add_list(self, uri):
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.tracklist.add'
|
|
if isinstance(uri, str):
|
|
d['params'] = {'uri': uri}
|
|
elif type(uri) == list:
|
|
d['params'] = {'uris': uri}
|
|
else:
|
|
return None
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
return r
|
|
|
|
def play(self):
|
|
self.is_playing = True
|
|
self.restore_volume()
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.playback.play'
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
|
|
def next(self):
|
|
if self.is_playing:
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.playback.next'
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
|
|
def previous(self):
|
|
if self.is_playing:
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.playback.previous'
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
|
|
def stop(self):
|
|
if self.is_playing:
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.playback.stop'
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
self.is_playing = False
|
|
|
|
def currently_playing(self):
|
|
if self.is_playing:
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.playback.get_current_track'
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
return r.json()['result']
|
|
else:
|
|
return None
|
|
|
|
def set_volume(self, percent):
|
|
if self.is_playing:
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.mixer.set_volume'
|
|
d['params'] = {'volume': percent}
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
|
|
def lower_volume(self):
|
|
self.set_volume(self.volume_low)
|
|
|
|
def restore_volume(self):
|
|
self.set_volume(self.volume_high)
|
|
|
|
def pause(self):
|
|
if self.is_playing:
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.playback.pause'
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
|
|
def resume(self):
|
|
if self.is_playing:
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.playback.resume'
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
|
|
def get_items(self, uri):
|
|
d = copy(_base_dict)
|
|
d['method'] = 'core.playlists.get_items'
|
|
d['params'] = {'uri': uri}
|
|
r = requests.post(self.url, data=json.dumps(d))
|
|
if 'result' in r.json():
|
|
return [e['uri'] for e in r.json()['result']]
|
|
else:
|
|
return None
|
|
|
|
def get_tracks(self, uri):
|
|
tracks = self.browse(uri)
|
|
ret = [t['uri'] for t in tracks if t['type'] == 'track']
|
|
|
|
sub_tracks = [t['uri'] for t in tracks if t['type'] != 'track']
|
|
for t in sub_tracks:
|
|
ret = ret + self.get_tracks(t)
|
|
return ret
|
|
|
|
def get_local_albums(self):
|
|
p = self.browse('local:directory?type=album')
|
|
return {e['name']: e for e in p if e['type'] == 'album'}
|
|
|
|
def get_local_artists(self):
|
|
p = self.browse('local:directory?type=artist')
|
|
return {e['name']: e for e in p if e['type'] == 'artist'}
|
|
|
|
def get_local_genres(self):
|
|
p = self.browse('local:directory?type=genre')
|
|
return {e['name']: e for e in p if e['type'] == 'directory'}
|
|
|
|
def get_local_playlists(self):
|
|
p = self.get_playlists('m3u')
|
|
return {e['name']: e for e in p}
|
|
|
|
def get_spotify_playlists(self):
|
|
p = self.get_playlists('spotify')
|
|
return {e['name'].split('(by')[0].strip().lower(): e for e in p}
|
|
|
|
def get_gmusic_albums(self):
|
|
p = self.browse('gmusic:album')
|
|
p = {e['name']: e for e in p if e['type'] == 'directory'}
|
|
return {e.split(' - ')[1]: p[e] for e in p}
|
|
|
|
def get_gmusic_artists(self):
|
|
p = self.browse('gmusic:artist')
|
|
return {e['name']: e for e in p if e['type'] == 'directory'}
|
|
|
|
def get_gmusic_radio(self):
|
|
p = self.browse('gmusic:radio')
|
|
return {e['name']: e for e in p if e['type'] == 'directory'}
|