mycroft-core/mycroft/util/__init__.py

342 lines
10 KiB
Python

# Copyright 2016 Mycroft AI, Inc.
#
# This file is part of Mycroft Core.
#
# Mycroft Core is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Mycroft Core is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Mycroft Core. If not, see <http://www.gnu.org/licenses/>.
import socket
import subprocess
import tempfile
import time
import os
import os.path
import time
from stat import S_ISREG, ST_MTIME, ST_MODE, ST_SIZE
import psutil
from os.path import dirname
from mycroft.util.log import getLogger
import mycroft.configuration
__author__ = 'jdorleans'
LOGGER = getLogger(__name__)
def resolve_resource_file(res_name):
"""Convert a resource into an absolute filename.
Resource names are in the form: 'filename.ext'
or 'path/filename.ext'
The system wil look for ~/.mycroft/res_name first, and
if not found will look at /opt/mycroft/res_name,
then finally it will look for res_name in the 'mycroft/res'
folder of the source code package.
Example:
With mycroft running as the user 'bob', if you called
resolve_resource_file('snd/beep.wav')
it would return either '/home/bob/.mycroft/snd/beep.wav' or
'/opt/mycroft/snd/beep.wav' or '.../mycroft/res/snd/beep.wav',
where the '...' is replaced by the path where the package has
been installed.
Args:
res_name (str): a resource path/name
"""
# First look for fully qualified file (e.g. a user setting)
if os.path.isfile(res_name):
return res_name
# Now look for ~/.mycroft/res_name (in user folder)
filename = os.path.expanduser("~/.mycroft/" + res_name)
if os.path.isfile(filename):
return filename
# Next look for /opt/mycroft/res/res_name
filename = os.path.expanduser("/opt/mycroft/" + res_name)
if os.path.isfile(filename):
return filename
# Finally look for it in the source package
filename = os.path.join(os.path.dirname(__file__), '..', 'res', res_name)
filename = os.path.abspath(os.path.normpath(filename))
if os.path.isfile(filename):
return filename
return None # Resource cannot be resolved
def play_wav(uri):
config = mycroft.configuration.ConfigurationManager.get()
play_cmd = config.get("play_wav_cmdline")
play_wav_cmd = str(play_cmd).split(" ")
for index, cmd in enumerate(play_wav_cmd):
if cmd == "%1":
play_wav_cmd[index] = (get_http(uri))
return subprocess.Popen(play_wav_cmd)
def play_mp3(uri):
config = mycroft.configuration.ConfigurationManager.get()
play_cmd = config.get("play_mp3_cmdline")
play_mp3_cmd = str(play_cmd).split(" ")
for index, cmd in enumerate(play_mp3_cmd):
if cmd == "%1":
play_mp3_cmd[index] = (get_http(uri))
return subprocess.Popen(play_mp3_cmd)
def record(file_path, duration, rate, channels):
if duration > 0:
return subprocess.Popen(
["arecord", "-r", str(rate), "-c", str(channels), "-d",
str(duration), file_path])
else:
return subprocess.Popen(
["arecord", "-r", str(rate), "-c", str(channels), file_path])
def get_http(uri):
return uri.replace("https://", "http://")
def remove_last_slash(url):
if url and url.endswith('/'):
url = url[:-1]
return url
def read_stripped_lines(filename):
with open(filename, 'r') as f:
return [line.strip() for line in f]
def read_dict(filename, div='='):
d = {}
with open(filename, 'r') as f:
for line in f:
(key, val) = line.split(div)
d[key.strip()] = val.strip()
return d
def create_file(filename):
try:
os.makedirs(dirname(filename))
except OSError:
pass
with open(filename, 'w') as f:
f.write('')
def kill(names):
print psutil.pids()
for name in names:
for p in psutil.process_iter():
try:
if p.name() == name:
p.kill()
break
except:
pass
def connected(host="8.8.8.8", port=53, timeout=3):
"""
Thanks to 7h3rAm on
Host: 8.8.8.8 (google-public-dns-a.google.com)
OpenPort: 53/tcp
Service: domain (DNS/TCP)
NOTE:
This is no longer in use by this version
New method checks for a connection using ConnectionError only when
a question is asked
"""
try:
socket.setdefaulttimeout(timeout)
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
return True
except IOError:
try:
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect(
("8.8.4.4", port))
return True
except IOError:
return False
def curate_cache(dir, min_free_percent=5.0):
"""Clear out the directory if needed
This assumes all the files in the directory can be deleted as freely
Args:
dir (str): directory path that holds cached files
min_free_percent (float): percentage (0.0-100.0) of drive to keep free
"""
# Simpleminded implementation -- keep a certain percentage of the
# disk available.
# TODO: Would be easy to add more options, like whitelisted files, etc.
space = psutil.disk_usage(dir)
# space.percent = space.used/space.total*100.0
percent_free = 100.0-space.percent
if percent_free < min_free_percent:
# calculate how many bytes we need to delete
bytes_needed = (min_free_percent - percent_free) / 100.0 * space.total
bytes_needed = int(bytes_needed + 1.0)
# get all entries in the directory w/ stats
entries = (os.path.join(dir, fn) for fn in os.listdir(dir))
entries = ((os.stat(path), path) for path in entries)
# leave only regular files, insert modification date
entries = ((stat[ST_MTIME], stat[ST_SIZE], path)
for stat, path in entries if S_ISREG(stat[ST_MODE]))
# delete files with oldest modification date until space is freed
space_freed = 0
for moddate, fsize, path in sorted(entries):
try:
os.remove(path)
space_freed += fsize
except:
pass
if space_freed > bytes_needed:
return # deleted enough!
def get_cache_directory(domain=None):
"""Get a directory for caches data
This directory can be used to hold temporary caches of data to
speed up performance. This directory will likely be part of a
small RAM disk and may be cleared at any time. So code that
uses these cached files must be able to fallback and regenerate
the file.
Args:
domain (str): The cache domain. Basically just a subdirectory.
Return:
str: a path to the directory where you can cache data
"""
dir = mycroft.configuration.ConfigurationManager.get().get("cache_path")
if not dir:
# If not defined, use /tmp/mycroft/cache
dir = os.path.join(tempfile.gettempdir(), "mycroft", "cache")
return _ensure_directory_exists(dir, domain)
def get_ipc_directory(domain=None):
"""Get the directory used for Inter Process Communication
Files in this folder can be accessed by different processes on the
machine. Useful for communication. This is often a small RAM disk.
Args:
domain (str): The IPC domain. Basically a subdirectory to prevent
overlapping signal filenames.
Returns:
str: a path to the IPC directory
"""
dir = mycroft.configuration.ConfigurationManager.get().get("ipc_path")
if not dir:
# If not defined, use /tmp/mycroft/ipc
dir = os.path.join(tempfile.gettempdir(), "mycroft", "ipc")
return _ensure_directory_exists(dir, domain)
def _ensure_directory_exists(dir, domain=None):
""" Create a directory and give access rights to all
Args:
domain (str): The IPC domain. Basically a subdirectory to prevent
overlapping signal filenames.
Returns:
str: a path to the directory
"""
if domain:
dir = os.path.join(dir, domain)
dir = os.path.normpath(dir)
if not os.path.isdir(dir):
try:
save = os.umask(0)
os.makedirs(dir, 0777) # give everyone rights to r/w here
except OSError:
LOGGER.warn("Failed to create: " + dir)
pass
finally:
os.umask(save)
return dir
def create_signal(signal_name):
"""Create a named signal
Args:
signal_name (str): The signal's name. Must only contain characters
valid in filenames.
"""
try:
with open(os.path.join(get_ipc_directory(), "signal", signal_name),
'w'):
return True
except IOError:
return False
def check_for_signal(signal_name, sec_lifetime=0):
"""See if a named signal exists
Args:
signal_name (str): The signal's name. Must only contain characters
valid in filenames.
sec_lifetime (int, optional): How many seconds the signal should
remain valid. If 0 or not specified, it is a single-use signal.
Returns:
bool: True if the signal is defined, False otherwise
"""
path = os.path.join(get_ipc_directory(), "signal", signal_name)
if os.path.isfile(path):
if sec_lifetime == 0:
# consume this single-use signal
os.remove(path)
elif int(os.path.getctime(path) + sec_lifetime) < int(time.time()):
# remove once expired
os.remove(path)
return False
return True
# No such signal exists
return False
def validate_param(value, name):
if not value:
raise ValueError("Missing or empty %s in mycroft.conf " % name)