mirror of
https://github.com/chubin/cheat.sh.git
synced 2026-06-20 13:16:44 +02:00
big routing refactoring
This commit is contained in:
@@ -0,0 +1,71 @@
|
||||
import sys
|
||||
import os
|
||||
import glob
|
||||
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
from globals import PATH_CHEAT_SHEETS
|
||||
|
||||
def _remove_initial_underscore(filename):
|
||||
if filename.startswith('_'):
|
||||
filename = filename[1:]
|
||||
return filename
|
||||
|
||||
def _sanitize_dirname(dirname):
|
||||
dirname = os.path.basename(dirname)
|
||||
dirname = _remove_initial_underscore(dirname)
|
||||
return dirname
|
||||
|
||||
def _format_answer(dirname, filename):
|
||||
return "%s/%s" % (_sanitize_dirname(dirname), filename)
|
||||
|
||||
def _get_answer_files_from_folder():
|
||||
topics = map(os.path.split, glob.glob(PATH_CHEAT_SHEETS + "*/*"))
|
||||
return [_format_answer(dirname, filename)
|
||||
for dirname, filename in topics if filename not in ['_info.yaml']]
|
||||
def _isdir(topic):
|
||||
return os.path.isdir(topic)
|
||||
def _get_answers_and_dirs():
|
||||
topics = glob.glob(PATH_CHEAT_SHEETS + "*")
|
||||
answer_dirs = [_remove_initial_underscore(os.path.split(topic)[1]).rstrip('/')+'/'
|
||||
for topic in topics if _isdir(topic)]
|
||||
answers = [os.path.split(topic)[1] for topic in topics if not _isdir(topic)]
|
||||
return answers, answer_dirs
|
||||
|
||||
def _update_cheat_sheets_topics():
|
||||
answers = _get_answer_files_from_folder()
|
||||
cheatsheet_answers, cheatsheet_dirs = _get_answers_and_dirs()
|
||||
return answers+cheatsheet_answers, cheatsheet_dirs
|
||||
|
||||
def get_list():
|
||||
return _update_cheat_sheets_topics()[0]
|
||||
|
||||
def get_dirs_list():
|
||||
return _update_cheat_sheets_topics()[1]
|
||||
|
||||
_CHEAT_SHEETS_LIST = get_list()
|
||||
def is_found(topic):
|
||||
return topic in _CHEAT_SHEETS_LIST
|
||||
|
||||
_CHEAT_SHEETS_DIRS = get_dirs_list()
|
||||
def is_dir_found(topic):
|
||||
return topic in _CHEAT_SHEETS_DIRS
|
||||
|
||||
def get_page(topic, request_options=None):
|
||||
"""
|
||||
Get the cheat sheet topic from the own repository (cheat.sheets).
|
||||
It's possible that topic directory starts with omitted underscore
|
||||
"""
|
||||
filename = PATH_CHEAT_SHEETS + "%s" % topic
|
||||
if not os.path.exists(filename):
|
||||
filename = PATH_CHEAT_SHEETS + "_%s" % topic
|
||||
if os.path.isdir(filename):
|
||||
return ""
|
||||
else:
|
||||
return open(filename, "r").read().decode('utf-8')
|
||||
|
||||
def get_dir(topic, request_options=None):
|
||||
answer = []
|
||||
for f_name in glob.glob(PATH_CHEAT_SHEETS + "%s/*" % topic.rstrip('/')):
|
||||
answer.append(os.path.basename(f_name))
|
||||
topics = sorted(answer)
|
||||
return "\n".join(topics) + "\n"
|
||||
@@ -0,0 +1,67 @@
|
||||
from gevent.monkey import patch_all
|
||||
from gevent.subprocess import Popen, PIPE
|
||||
patch_all()
|
||||
|
||||
import sys
|
||||
import os
|
||||
import glob
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
|
||||
from globals import PATH_TLDR_PAGES, PATH_CHEAT_PAGES
|
||||
|
||||
def _get_filenames(path):
|
||||
return [os.path.split(topic)[1] for topic in glob.glob(path)]
|
||||
|
||||
def get_tldr_list():
|
||||
return [filename[:-3]
|
||||
for filename in _get_filenames(PATH_TLDR_PAGES) if filename.endswith('.md')]
|
||||
|
||||
_TLDR_LIST = get_tldr_list()
|
||||
def tldr_is_found(topic):
|
||||
return topic in _TLDR_LIST
|
||||
|
||||
def get_tldr(topic, request_options=None):
|
||||
cmd = ["tldr", topic]
|
||||
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
|
||||
answer = proc.communicate()[0]
|
||||
|
||||
fixed_answer = []
|
||||
for line in answer.splitlines():
|
||||
line = line[2:]
|
||||
if line.startswith('-'):
|
||||
line = '# '+line[2:]
|
||||
elif not line.startswith(' '):
|
||||
line = "# "+line
|
||||
else:
|
||||
pass
|
||||
|
||||
fixed_answer.append(line)
|
||||
|
||||
answer = "\n".join(fixed_answer) + "\n"
|
||||
return answer.decode('utf-8')
|
||||
|
||||
def get_cheat_list():
|
||||
return _get_filenames(PATH_CHEAT_PAGES)
|
||||
|
||||
_CHEAT_LIST = get_cheat_list()
|
||||
def cheat_is_found(topic):
|
||||
return topic in _CHEAT_LIST
|
||||
|
||||
def get_cheat(topic, request_options=None):
|
||||
cmd = ["cheat", topic]
|
||||
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
|
||||
answer = proc.communicate()[0].decode('utf-8')
|
||||
return answer
|
||||
|
||||
def get_translation(topic, request_options=None):
|
||||
from_, topic = topic.split('/', 1)
|
||||
to_ = request_options.get('lang', 'en')
|
||||
if '-' in from_:
|
||||
from_, to_ = from_.split('-', 1)
|
||||
|
||||
cmd = ["/home/igor/cheat.sh/bin/get_translation",
|
||||
from_, to_, topic.replace('+', ' ')]
|
||||
print("calling:", cmd)
|
||||
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
|
||||
answer = proc.communicate()[0].decode('utf-8')
|
||||
return answer
|
||||
@@ -0,0 +1,6 @@
|
||||
class Adapter(object):
|
||||
pass
|
||||
|
||||
class cheatAdapter(Adapter):
|
||||
pass
|
||||
|
||||
@@ -0,0 +1,116 @@
|
||||
import sys
|
||||
import os
|
||||
import glob
|
||||
import collections
|
||||
|
||||
from fuzzywuzzy import process, fuzz
|
||||
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
from globals import MYDIR, COLOR_STYLES
|
||||
from colorize_internal import colorize_internal
|
||||
|
||||
_INTERNAL_TOPICS = [
|
||||
":cht.sh",
|
||||
":bash_completion",
|
||||
":emacs",
|
||||
":emacs-ivy",
|
||||
":firstpage",
|
||||
":firstpage-v1",
|
||||
":firstpage-v2",
|
||||
":fish",
|
||||
":help",
|
||||
":intro",
|
||||
":list",
|
||||
":post",
|
||||
":styles",
|
||||
":styles-demo",
|
||||
":vim",
|
||||
":zsh",
|
||||
":share",
|
||||
]
|
||||
|
||||
_COLORIZED_INTERNAL_TOPICS = [
|
||||
':intro',
|
||||
]
|
||||
|
||||
class InternalPages(object):
|
||||
|
||||
def __init__(self, get_topic_type=None, get_topics_list=None):
|
||||
self.get_topic_type = get_topic_type
|
||||
self.get_topics_list = get_topics_list
|
||||
|
||||
def _get_stat(self):
|
||||
stat = collections.Counter([
|
||||
self.get_topic_type(topic)
|
||||
for topic in self.get_topics_list()
|
||||
])
|
||||
|
||||
answer = ""
|
||||
for key, val in stat.items():
|
||||
answer += "%s %s\n" % (key, val)
|
||||
return answer
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_list():
|
||||
return _INTERNAL_TOPICS
|
||||
|
||||
def _get_list_answer(self, topic, request_options=None):
|
||||
if '/' in topic:
|
||||
topic_type, topic_name = topic.split('/', 1)
|
||||
if topic_name == ":list":
|
||||
topic_list = [x[len(topic_type)+1:]
|
||||
for x in self.get_topics_list()
|
||||
if x.startswith(topic_type + "/")]
|
||||
return "\n".join(topic_list)+"\n"
|
||||
|
||||
answer = ""
|
||||
if topic == ":list":
|
||||
answer = "\n".join(x for x in self.get_topics_list()) + "\n"
|
||||
|
||||
return answer
|
||||
|
||||
def get_page(self, topic, request_options=None):
|
||||
if topic.endswith('/:list') or topic.lstrip('/') == ':list':
|
||||
return self._get_list_answer(topic)
|
||||
|
||||
answer = ""
|
||||
if topic == ':styles':
|
||||
answer = "\n".join(COLOR_STYLES) + "\n"
|
||||
elif topic == ":stat":
|
||||
answer = self._get_stat()+"\n"
|
||||
elif topic in _INTERNAL_TOPICS:
|
||||
answer = open(os.path.join(MYDIR, "share", topic[1:]+".txt"), "r").read()
|
||||
if topic in _COLORIZED_INTERNAL_TOPICS:
|
||||
answer = colorize_internal(answer)
|
||||
|
||||
return answer
|
||||
|
||||
def is_found(self, topic):
|
||||
return topic in self.get_list()
|
||||
|
||||
class UnknownPages(InternalPages):
|
||||
|
||||
@staticmethod
|
||||
def get_list():
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def is_found(topic):
|
||||
return False
|
||||
|
||||
def get_page(self, topic, request_options=None):
|
||||
topics_list = self.get_topics_list()
|
||||
if topic.startswith(':'):
|
||||
topics_list = [x for x in topics_list if x.startswith(':')]
|
||||
else:
|
||||
topics_list = [x for x in topics_list if not x.startswith(':')]
|
||||
|
||||
possible_topics = process.extract(topic, topics_list, scorer=fuzz.ratio)[:3]
|
||||
possible_topics_text = "\n".join([(" * %s %s" % x) for x in possible_topics])
|
||||
return """
|
||||
Unknown topic.
|
||||
Do you mean one of these topics maybe?
|
||||
|
||||
%s
|
||||
""" % possible_topics_text
|
||||
@@ -0,0 +1,16 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
|
||||
from globals import PATH_LATENZ
|
||||
|
||||
def get_answer(topic, request_options=None):
|
||||
sys.path.append(PATH_LATENZ)
|
||||
import latencies
|
||||
return latencies.render()
|
||||
|
||||
def get_list():
|
||||
return ['latencies']
|
||||
|
||||
def is_found(topic):
|
||||
return topic.lower() in ['latencies', 'late.nz', 'latency']
|
||||
@@ -0,0 +1,63 @@
|
||||
from gevent.monkey import patch_all
|
||||
from gevent.subprocess import Popen, PIPE
|
||||
patch_all()
|
||||
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
|
||||
from polyglot.detect import Detector
|
||||
from polyglot.detect.base import UnknownLanguage
|
||||
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
from globals import MYDIR
|
||||
|
||||
def get_page(topic, request_options=None):
|
||||
"""
|
||||
Find answer for the `topic` question.
|
||||
"""
|
||||
|
||||
# if there is a language name in the section name,
|
||||
# cut it off (de:python => python)
|
||||
if '/' in topic:
|
||||
section_name, topic = topic.split('/', 1)
|
||||
if ':' in section_name:
|
||||
_, section_name = section_name.split(':', 1)
|
||||
topic = "%s/%s" % (section_name, topic)
|
||||
|
||||
# some clients send queries with - instead of + so we have to rewrite them to
|
||||
topic = re.sub(r"(?<!-)-", ' ', topic)
|
||||
|
||||
topic_words = topic.split()
|
||||
|
||||
topic = " ".join(topic_words)
|
||||
|
||||
lang = 'en'
|
||||
try:
|
||||
query_text = topic # " ".join(topic)
|
||||
query_text = re.sub('^[^/]*/+', '', query_text.rstrip('/'))
|
||||
query_text = re.sub('/[0-9]+$', '', query_text)
|
||||
query_text = re.sub('/[0-9]+$', '', query_text)
|
||||
detector = Detector(query_text)
|
||||
supposed_lang = detector.languages[0].code
|
||||
if len(topic_words) > 2 or supposed_lang in ['az', 'ru', 'uk', 'de', 'fr', 'es', 'it', 'nl']:
|
||||
lang = supposed_lang
|
||||
if supposed_lang.startswith('zh_') or supposed_lang == 'zh':
|
||||
lang = 'zh'
|
||||
elif supposed_lang.startswith('pt_'):
|
||||
lang = 'pt'
|
||||
if supposed_lang in ['ja', 'ko']:
|
||||
lang = supposed_lang
|
||||
|
||||
except UnknownLanguage:
|
||||
print("Unknown language (%s)" % query_text)
|
||||
|
||||
if lang != 'en':
|
||||
topic = ['--human-language', lang, topic]
|
||||
else:
|
||||
topic = [topic]
|
||||
|
||||
cmd = [os.path.join(MYDIR, "bin/get-answer-for-question")] + topic
|
||||
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
|
||||
answer = proc.communicate()[0].decode('utf-8')
|
||||
return answer
|
||||
+143
-331
@@ -6,356 +6,171 @@ Exports:
|
||||
get_topics_list()
|
||||
get_topic_type()
|
||||
get_answer()
|
||||
find_answer_by_keyword()
|
||||
"""
|
||||
from __future__ import print_function
|
||||
|
||||
from gevent.monkey import patch_all
|
||||
from gevent.subprocess import Popen, PIPE
|
||||
patch_all()
|
||||
|
||||
# pylint: disable=wrong-import-position,wrong-import-order
|
||||
import sys
|
||||
import collections
|
||||
import glob
|
||||
import os
|
||||
import re
|
||||
import redis
|
||||
from fuzzywuzzy import process, fuzz
|
||||
from polyglot.detect import Detector
|
||||
from polyglot.detect.base import UnknownLanguage
|
||||
import time
|
||||
|
||||
import beautifier
|
||||
from globals import MYDIR, PATH_TLDR_PAGES, PATH_CHEAT_PAGES, PATH_CHEAT_SHEETS, PATH_LATENZ, \
|
||||
COLOR_STYLES, REDISHOST
|
||||
from adapter_learnxiny import get_learnxiny, get_learnxiny_list, is_valid_learnxy
|
||||
from globals import REDISHOST, MAX_SEARCH_LEN
|
||||
from languages_data import LANGUAGE_ALIAS, SO_NAME, rewrite_editor_section_name
|
||||
from colorize_internal import colorize_internal
|
||||
# pylint: enable=wrong-import-position,wrong-import-order
|
||||
|
||||
REDIS = redis.StrictRedis(host=REDISHOST, port=6379, db=0)
|
||||
from adapter_learnxiny import get_learnxiny, get_learnxiny_list, is_valid_learnxy
|
||||
import adapter.cheat_sheets
|
||||
import adapter.cmd
|
||||
import adapter.latenz
|
||||
import adapter.question
|
||||
import adapter.internal
|
||||
|
||||
MAX_SEARCH_LEN = 20
|
||||
class Router(object):
|
||||
|
||||
INTERNAL_TOPICS = [
|
||||
':cht.sh',
|
||||
':bash_completion',
|
||||
':emacs',
|
||||
':emacs-ivy',
|
||||
":firstpage",
|
||||
":firstpage-v1",
|
||||
":firstpage-v2",
|
||||
':fish',
|
||||
':help',
|
||||
":intro",
|
||||
":list",
|
||||
':post',
|
||||
':styles',
|
||||
':styles-demo',
|
||||
':vim',
|
||||
':zsh',
|
||||
':share',
|
||||
]
|
||||
|
||||
COLORIZED_INTERNAL_TOPICS = [
|
||||
':intro',
|
||||
]
|
||||
|
||||
def _get_filenames(path):
|
||||
return [os.path.split(topic)[1] for topic in glob.glob(path)]
|
||||
|
||||
def _update_tldr_topics():
|
||||
return [filename[:-3]
|
||||
for filename in _get_filenames(PATH_TLDR_PAGES) if filename.endswith('.md')]
|
||||
|
||||
def _update_cheat_topics():
|
||||
return _get_filenames(PATH_CHEAT_PAGES)
|
||||
|
||||
|
||||
|
||||
TLDR_TOPICS = _update_tldr_topics()
|
||||
CHEAT_TOPICS = _update_cheat_topics()
|
||||
|
||||
def _remove_initial_underscore(filename):
|
||||
if filename.startswith('_'):
|
||||
filename = filename[1:]
|
||||
return filename
|
||||
|
||||
def _sanitize_dirname(dirname):
|
||||
dirname = os.path.basename(dirname)
|
||||
dirname = _remove_initial_underscore(dirname)
|
||||
return dirname
|
||||
|
||||
def _format_answer(dirname, filename):
|
||||
return "%s/%s" % (_sanitize_dirname(dirname), filename)
|
||||
|
||||
def _get_answer_files_from_folder():
|
||||
topics = map(os.path.split, glob.glob(PATH_CHEAT_SHEETS + "*/*"))
|
||||
return [_format_answer(dirname, filename)
|
||||
for dirname, filename in topics if filename not in ['_info.yaml']]
|
||||
def _isdir(topic):
|
||||
return os.path.isdir(topic)
|
||||
def _get_answers_and_dirs():
|
||||
topics = glob.glob(PATH_CHEAT_SHEETS + "*")
|
||||
answer_dirs = [_remove_initial_underscore(os.path.split(topic)[1]).rstrip('/')+'/'
|
||||
for topic in topics if _isdir(topic)]
|
||||
answers = [os.path.split(topic)[1] for topic in topics if not _isdir(topic)]
|
||||
return answers, answer_dirs
|
||||
|
||||
def _update_cheat_sheets_topics():
|
||||
answers = _get_answer_files_from_folder()
|
||||
cheatsheet_answers, cheatsheet_dirs = _get_answers_and_dirs()
|
||||
return answers+cheatsheet_answers, cheatsheet_dirs
|
||||
|
||||
CHEAT_SHEETS_TOPICS, CHEAT_SHEETS_DIRS = _update_cheat_sheets_topics()
|
||||
|
||||
CACHED_TOPICS_LIST = [[]]
|
||||
|
||||
def get_topics_list(skip_dirs=False, skip_internal=False):
|
||||
"""
|
||||
List of topics returned on /:list
|
||||
Implementation of query routing.
|
||||
Routing is done basing on the data exported by the adapters.
|
||||
(mainly by functions get_list() and is_found()).
|
||||
|
||||
Function get_topics_list() returns available topics
|
||||
(that are accessible at /:list).
|
||||
|
||||
Function get_topic_type() delivers name of the adapter,
|
||||
that will process the query.
|
||||
|
||||
Refactoring of the class is almost done.
|
||||
The next steps to do:
|
||||
* _topic_list, _topic_found and topic_getters should be merged together.
|
||||
"""
|
||||
|
||||
if CACHED_TOPICS_LIST[0] != []:
|
||||
return CACHED_TOPICS_LIST[0]
|
||||
def __init__(self):
|
||||
|
||||
answer = CHEAT_TOPICS + TLDR_TOPICS + CHEAT_SHEETS_TOPICS
|
||||
answer = sorted(set(answer))
|
||||
self._cached_topics_list = []
|
||||
self._cached_topic_type = {}
|
||||
|
||||
# doing it in this strange way to save the order of the topics
|
||||
for topic in get_learnxiny_list():
|
||||
if topic not in answer:
|
||||
answer.append(topic)
|
||||
self.adapter_internal = adapter.internal.InternalPages(
|
||||
get_topic_type=self.get_topic_type,
|
||||
get_topics_list=self.get_topics_list)
|
||||
self.adapter_unknown = adapter.internal.UnknownPages(
|
||||
get_topic_type=self.get_topic_type,
|
||||
get_topics_list=self.get_topics_list)
|
||||
|
||||
if not skip_dirs:
|
||||
answer += CHEAT_SHEETS_DIRS
|
||||
if not skip_internal:
|
||||
answer += INTERNAL_TOPICS
|
||||
self._topic_list = {
|
||||
"late.nz": adapter.latenz.get_list(),
|
||||
"internal": self.adapter_internal.get_list(),
|
||||
"tldr": adapter.cmd.get_tldr_list(),
|
||||
"cheat": adapter.cmd.get_cheat_list(),
|
||||
"cheat.sheets": adapter.cheat_sheets.get_list(),
|
||||
"cheat.sheets dir": adapter.cheat_sheets.get_dirs_list(),
|
||||
"learnxiny": get_learnxiny_list(),
|
||||
}
|
||||
|
||||
CACHED_TOPICS_LIST[0] = answer
|
||||
return answer
|
||||
self._topic_found = {
|
||||
"late.nz": adapter.latenz.is_found,
|
||||
"internal": self.adapter_internal.is_found,
|
||||
"tldr": adapter.cmd.tldr_is_found,
|
||||
"cheat": adapter.cmd.cheat_is_found,
|
||||
"cheat.sheets": adapter.cheat_sheets.is_found,
|
||||
"cheat.sheets dir": adapter.cheat_sheets.is_dir_found,
|
||||
"learnxiny": is_valid_learnxy,
|
||||
}
|
||||
|
||||
def _get_topics_dirs():
|
||||
return set([x.split('/', 1)[0] for x in get_topics_list() if '/' in x])
|
||||
|
||||
|
||||
def _get_stat():
|
||||
stat = collections.Counter([
|
||||
get_topic_type(topic) for topic in get_topics_list()
|
||||
])
|
||||
|
||||
answer = ""
|
||||
for key, val in stat.items():
|
||||
answer += "%s %s\n" % (key, val)
|
||||
return answer
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
TOPIC_TYPE_CACHE = {}
|
||||
def get_topic_type(topic): # pylint: disable=too-many-locals,too-many-branches,too-many-statements
|
||||
"""
|
||||
Return topic type for `topic` or "unknown" if topic can't be determined.
|
||||
"""
|
||||
if topic in TOPIC_TYPE_CACHE:
|
||||
return TOPIC_TYPE_CACHE[topic]
|
||||
|
||||
result = 'unknown'
|
||||
|
||||
if topic == "":
|
||||
result = "search"
|
||||
elif topic.startswith(":"):
|
||||
result = "internal"
|
||||
elif '/' in topic:
|
||||
topic_type, topic_name = topic.split('/', 1)
|
||||
if '+' in topic_name:
|
||||
result = 'question'
|
||||
else:
|
||||
if (topic_name in [':list', ':learn']):
|
||||
result = "internal"
|
||||
if topic_name == ':learn' and is_valid_learnxy(topic):
|
||||
result = 'learnxiny'
|
||||
else:
|
||||
# let us activate the 'question' feature for all subsections
|
||||
result = 'question'
|
||||
|
||||
if topic.lower() in ['latencies', 'late.nz', 'latency']:
|
||||
result = 'late.nz'
|
||||
|
||||
if result == 'unknown' or result == 'question':
|
||||
print(CHEAT_SHEETS_TOPICS)
|
||||
if topic in CHEAT_SHEETS_TOPICS:
|
||||
result = "cheat.sheets"
|
||||
elif topic.rstrip('/') in CHEAT_SHEETS_DIRS and topic.endswith('/'):
|
||||
result = "cheat.sheets dir"
|
||||
elif topic in CHEAT_TOPICS:
|
||||
result = "cheat"
|
||||
elif topic in TLDR_TOPICS:
|
||||
result = "tldr"
|
||||
elif '/' not in topic:
|
||||
result = "unknown"
|
||||
|
||||
TOPIC_TYPE_CACHE[topic] = result
|
||||
|
||||
return result
|
||||
|
||||
#
|
||||
# Various cheat sheets getters
|
||||
#
|
||||
#
|
||||
#def registered_answer_getter(func):
|
||||
# REGISTERED_ANSWER_GETTERS.append(funct)
|
||||
# return cls
|
||||
def _get_internal(topic):
|
||||
if '/' in topic:
|
||||
topic_type, topic_name = topic.split('/', 1)
|
||||
if topic_name == ":list":
|
||||
topic_list = [x[len(topic_type)+1:]
|
||||
for x in get_topics_list()
|
||||
if x.startswith(topic_type + "/")]
|
||||
return "\n".join(topic_list)+"\n"
|
||||
|
||||
answer = ""
|
||||
if topic == ":list":
|
||||
answer = "\n".join(x for x in get_topics_list()) + "\n"
|
||||
elif topic == ':styles':
|
||||
answer = "\n".join(COLOR_STYLES) + "\n"
|
||||
elif topic == ":stat":
|
||||
answer = _get_stat()+"\n"
|
||||
elif topic in INTERNAL_TOPICS:
|
||||
answer = open(os.path.join(MYDIR, "share", topic[1:]+".txt"), "r").read()
|
||||
if topic in COLORIZED_INTERNAL_TOPICS:
|
||||
answer = colorize_internal(answer)
|
||||
|
||||
return answer
|
||||
|
||||
def _get_tldr(topic):
|
||||
cmd = ["tldr", topic]
|
||||
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
|
||||
answer = proc.communicate()[0]
|
||||
|
||||
fixed_answer = []
|
||||
for line in answer.splitlines():
|
||||
line = line[2:]
|
||||
if line.startswith('-'):
|
||||
line = '# '+line[2:]
|
||||
elif not line.startswith(' '):
|
||||
line = "# "+line
|
||||
else:
|
||||
pass
|
||||
|
||||
fixed_answer.append(line)
|
||||
|
||||
answer = "\n".join(fixed_answer) + "\n"
|
||||
return answer.decode('utf-8')
|
||||
|
||||
def _get_cheat(topic):
|
||||
cmd = ["cheat", topic]
|
||||
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
|
||||
answer = proc.communicate()[0].decode('utf-8')
|
||||
return answer
|
||||
|
||||
def _get_cheat_sheets(topic):
|
||||
"""
|
||||
Get the cheat sheet topic from the own repository (cheat.sheets).
|
||||
It's possible that topic directory starts with omitted underscore
|
||||
"""
|
||||
filename = PATH_CHEAT_SHEETS + "%s" % topic
|
||||
if not os.path.exists(filename):
|
||||
filename = PATH_CHEAT_SHEETS + "_%s" % topic
|
||||
if os.path.isdir(filename):
|
||||
return ""
|
||||
else:
|
||||
return open(filename, "r").read().decode('utf-8')
|
||||
|
||||
def _get_cheat_sheets_dir(topic):
|
||||
answer = []
|
||||
for f_name in glob.glob(PATH_CHEAT_SHEETS + "%s/*" % topic.rstrip('/')):
|
||||
answer.append(os.path.basename(f_name))
|
||||
topics = sorted(answer)
|
||||
return "\n".join(topics) + "\n"
|
||||
|
||||
def _get_answer_for_question(topic):
|
||||
"""
|
||||
Find answer for the `topic` question.
|
||||
"""
|
||||
|
||||
topic_words = topic.replace('+', ' ').strip()
|
||||
# some clients send queries with - instead of + so we have to rewrite them to
|
||||
topic = re.sub(r"(?<!-)-", ' ', topic)
|
||||
|
||||
topic_words = topic.split()
|
||||
|
||||
topic = " ".join(topic_words)
|
||||
|
||||
lang = 'en'
|
||||
try:
|
||||
query_text = topic # " ".join(topic)
|
||||
query_text = re.sub('^[^/]*/+', '', query_text.rstrip('/'))
|
||||
query_text = re.sub('/[0-9]+$', '', query_text)
|
||||
query_text = re.sub('/[0-9]+$', '', query_text)
|
||||
detector = Detector(query_text)
|
||||
supposed_lang = detector.languages[0].code
|
||||
if len(topic_words) > 2 or supposed_lang in ['az', 'ru', 'uk', 'de', 'fr', 'es', 'it', 'nl']:
|
||||
lang = supposed_lang
|
||||
if supposed_lang.startswith('zh_') or supposed_lang == 'zh':
|
||||
lang = 'zh'
|
||||
elif supposed_lang.startswith('pt_'):
|
||||
lang = 'pt'
|
||||
if supposed_lang in ['ja', 'ko']:
|
||||
lang = supposed_lang
|
||||
|
||||
except UnknownLanguage:
|
||||
print("Unknown language (%s)" % query_text)
|
||||
|
||||
if lang != 'en':
|
||||
topic = ['--human-language', lang, topic]
|
||||
else:
|
||||
topic = [topic]
|
||||
|
||||
cmd = [os.path.join(MYDIR, "bin/get-answer-for-question")] + topic
|
||||
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
|
||||
answer = proc.communicate()[0].decode('utf-8')
|
||||
return answer
|
||||
|
||||
def _get_unknown(topic):
|
||||
topics_list = get_topics_list()
|
||||
if topic.startswith(':'):
|
||||
topics_list = [x for x in topics_list if x.startswith(':')]
|
||||
else:
|
||||
topics_list = [x for x in topics_list if not x.startswith(':')]
|
||||
|
||||
possible_topics = process.extract(topic, topics_list, scorer=fuzz.ratio)[:3]
|
||||
possible_topics_text = "\n".join([(" * %s %s" % x) for x in possible_topics])
|
||||
return """
|
||||
Unknown topic.
|
||||
Do you mean one of these topics maybe?
|
||||
|
||||
%s
|
||||
""" % possible_topics_text
|
||||
|
||||
def _get_latenz(topic):
|
||||
sys.path.append(PATH_LATENZ)
|
||||
import latencies
|
||||
return latencies.render()
|
||||
|
||||
# pylint: disable=bad-whitespace
|
||||
#
|
||||
# topic_type, function_getter
|
||||
# should be replaced with a decorator
|
||||
TOPIC_GETTERS = (
|
||||
('late.nz', _get_latenz),
|
||||
("cheat.sheets", _get_cheat_sheets),
|
||||
("cheat.sheets dir", _get_cheat_sheets_dir),
|
||||
("tldr", _get_tldr),
|
||||
("internal", _get_internal),
|
||||
("cheat", _get_cheat),
|
||||
("learnxiny", get_learnxiny),
|
||||
("question", _get_answer_for_question),
|
||||
("unknown", _get_unknown),
|
||||
)
|
||||
# pylint: disable=bad-whitespace
|
||||
self.topic_getters = (
|
||||
("late.nz", adapter.latenz.get_answer),
|
||||
("cheat.sheets", adapter.cheat_sheets.get_page),
|
||||
("cheat.sheets dir", adapter.cheat_sheets.get_dir),
|
||||
("tldr", adapter.cmd.get_tldr),
|
||||
("internal", self.adapter_internal.get_page),
|
||||
("cheat", adapter.cmd.get_cheat),
|
||||
("learnxiny", get_learnxiny),
|
||||
("translation", adapter.cmd.get_translation),
|
||||
("question", adapter.question.get_page),
|
||||
("unknown", self.adapter_unknown.get_page),
|
||||
)
|
||||
# pylint: enable=bad-whitespace
|
||||
|
||||
def get_topics_list(self, skip_dirs=False, skip_internal=False):
|
||||
"""
|
||||
List of topics returned on /:list
|
||||
"""
|
||||
|
||||
if self._cached_topics_list:
|
||||
return self._cached_topics_list
|
||||
|
||||
# merging all top level lists
|
||||
sources_to_merge = ['tldr', 'cheat', 'cheat.sheets', 'learnxiny']
|
||||
if not skip_dirs:
|
||||
sources_to_merge.append("cheat.sheets dir")
|
||||
if not skip_internal:
|
||||
sources_to_merge.append("internal")
|
||||
|
||||
answer = {}
|
||||
for key in sources_to_merge:
|
||||
answer.update({name:key for name in self._topic_list[key]})
|
||||
answer = sorted(set(answer.keys()))
|
||||
|
||||
# doing it in this strange way to save the order of the topics
|
||||
for topic in get_learnxiny_list():
|
||||
if topic not in answer:
|
||||
answer.append(topic)
|
||||
|
||||
self._cached_topics_list = answer
|
||||
return answer
|
||||
|
||||
def get_topic_type(self, topic): # pylint: disable=too-many-locals,too-many-branches,too-many-statements
|
||||
"""
|
||||
Return topic type for `topic` or "unknown" if topic can't be determined.
|
||||
"""
|
||||
|
||||
def __get_topic_type(topic):
|
||||
|
||||
if topic == "":
|
||||
return "search"
|
||||
if topic.startswith(":"):
|
||||
return "internal"
|
||||
if topic.endswith("/:list"):
|
||||
return "internal"
|
||||
if topic.endswith('/'):
|
||||
topic = topic.rstrip('/')
|
||||
if self._topic_found['cheat.sheets dir'](topic):
|
||||
return "cheat.sheets dir"
|
||||
|
||||
for source in ['cheat.sheets', 'cheat', 'tldr', 'late.nz']:
|
||||
if self._topic_found[source](topic):
|
||||
return source
|
||||
|
||||
if '/' not in topic:
|
||||
#if '+' in topic_name:
|
||||
# return 'question'
|
||||
return "unknown"
|
||||
|
||||
# topic contains '/'
|
||||
#
|
||||
if is_valid_learnxy(topic):
|
||||
return 'learnxiny'
|
||||
topic_type = topic.split('/', 1)[0]
|
||||
if topic_type in ['ru', 'fr'] or re.match(r'[a-z][a-z]-[a-z][a-z]$', topic_type):
|
||||
return 'translation'
|
||||
return 'question'
|
||||
|
||||
if topic not in self._cached_topic_type:
|
||||
self._cached_topic_type[topic] = __get_topic_type(topic)
|
||||
return self._cached_topic_type[topic]
|
||||
|
||||
if os.environ.get('REDIS_HOST', '').lower() != 'none':
|
||||
REDIS = redis.StrictRedis(host=REDISHOST, port=6379, db=0)
|
||||
else:
|
||||
REDIS = None
|
||||
|
||||
_ROUTER = Router()
|
||||
|
||||
get_topic_type = _ROUTER.get_topic_type
|
||||
get_topics_list = _ROUTER.get_topics_list
|
||||
TOPIC_GETTERS = _ROUTER.topic_getters
|
||||
|
||||
def get_answer(topic, keyword, options="", request_options=None): # pylint: disable=too-many-locals,too-many-branches,too-many-statements
|
||||
"""
|
||||
Find cheat sheet for the topic.
|
||||
@@ -495,11 +310,11 @@ def get_answer(topic, keyword, options="", request_options=None): # pylint: disa
|
||||
|
||||
for topic_getter_type, topic_getter in TOPIC_GETTERS:
|
||||
if topic_type == topic_getter_type:
|
||||
answer = topic_getter(topic)
|
||||
answer = topic_getter(topic, request_options=request_options)
|
||||
break
|
||||
if not answer:
|
||||
topic_type = "unknown"
|
||||
answer = _get_unknown(topic)
|
||||
answer = dict(TOPIC_GETTERS)['unknown'](topic)
|
||||
|
||||
# saving answers in the cache
|
||||
if REDIS:
|
||||
@@ -545,9 +360,6 @@ def find_answer_by_keyword(directory, keyword, options="", request_options=None)
|
||||
|
||||
answer_paragraphs = []
|
||||
for topic in get_topics_list(skip_internal=True, skip_dirs=True):
|
||||
# skip the internal pages, don't show them in search
|
||||
if topic in INTERNAL_TOPICS:
|
||||
continue
|
||||
|
||||
if not topic.startswith(directory):
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user